1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 from random import randrange
6 import sqlite ## find this at http://pysqlite.sourceforge.net/
8 from twisted.internet.defer import Deferred
9 from twisted.internet import protocol
10 from twisted.internet import reactor
13 from ktable import KTable
14 from knode import KNodeBase, KNodeRead, KNodeWrite
15 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 class KhashmirDBExcept(Exception):
22 # this is the base class, has base functionality and find node, no key-value mappings
23 class KhashmirBase(protocol.Factory):
25 def __init__(self, host, port, db='khashmir.db'):
26 self.setup(host, port, db)
28 def setup(self, host, port, db='khashmir.db'):
31 self.node = self._loadSelfNode(host, port)
32 self.table = KTable(self.node)
33 #self.app = service.Application("krpc")
34 self.udp = krpc.hostbroker(self)
35 self.udp.protocol = krpc.KRPC
36 self.listenport = reactor.listenUDP(port, self.udp)
38 self._loadRoutingTable()
39 KeyExpirer(store=self.store)
40 self.refreshTable(force=1)
41 reactor.callLater(60, self.checkpoint, (1,))
49 self.listenport.stopListening()
51 def _loadSelfNode(self, host, port):
52 c = self.store.cursor()
53 c.execute('select id from self where num = 0;')
58 return self._Node().init(id, host, port)
60 def _saveSelfNode(self):
61 c = self.store.cursor()
62 c.execute('delete from self where num = 0;')
63 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
66 def checkpoint(self, auto=0):
68 self._dumpRoutingTable()
71 reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
73 def _findDB(self, db):
82 def _loadDB(self, db):
84 self.store = sqlite.connect(db=db)
85 #self.store.autocommit = 0
88 raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
90 def _createNewDB(self, db):
91 self.store = sqlite.connect(db=db)
93 create table kv (key binary, value binary, time timestamp, primary key (key, value));
94 create index kv_key on kv(key);
95 create index kv_timestamp on kv(time);
97 create table nodes (id binary primary key, host text, port number);
99 create table self (num number primary key, id binary);
101 c = self.store.cursor()
105 def _dumpRoutingTable(self):
107 save routing table nodes to the database
109 c = self.store.cursor()
110 c.execute("delete from nodes where id not NULL;")
111 for bucket in self.table.buckets:
112 for node in bucket.l:
113 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
116 def _loadRoutingTable(self):
118 load routing table nodes from database
119 it's usually a good idea to call refreshTable(force=1) after loading the table
121 c = self.store.cursor()
122 c.execute("select * from nodes;")
123 for rec in c.fetchall():
124 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
125 n.conn = self.udp.connectionForAddr((n.host, n.port))
126 self.table.insertNode(n, contacted=0)
130 ####### LOCAL INTERFACE - use these methods!
131 def addContact(self, host, port, callback=None):
133 ping this node and add the contact info to the table on pong!
135 n =self.Node().init(const.NULL_ID, host, port)
136 n.conn = self.udp.connectionForAddr((n.host, n.port))
137 self.sendPing(n, callback=callback)
139 ## this call is async!
140 def findNode(self, id, callback, errback=None):
141 """ returns the contact info for node, or the k closest nodes, from the global table """
142 # get K nodes out of local table/cache, or the node we want
143 nodes = self.table.findNodes(id)
146 d.addCallbacks(callback, errback)
148 d.addCallback(callback)
149 if len(nodes) == 1 and nodes[0].id == id :
152 # create our search state
153 state = FindNode(self, id, d.callback)
154 reactor.callLater(0, state.goWithNodes, nodes)
156 def insertNode(self, n, contacted=1):
158 insert a node in our local table, pinging oldest contact in bucket, if necessary
160 If all you have is a host/port, then use addContact, which calls this method after
161 receiving the PONG from the remote node. The reason for the seperation is we can't insert
162 a node into the table without it's peer-ID. That means of course the node passed into this
163 method needs to be a properly formed Node object with a valid ID.
165 old = self.table.insertNode(n, contacted=contacted)
166 if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
167 # the bucket is full, check to see if old node is still around and if so, replace it
169 ## these are the callbacks used when we ping the oldest node in a bucket
170 def _staleNodeHandler(oldnode=old, newnode = n):
171 """ called if the pinged node never responds """
172 self.table.replaceStaleNode(old, newnode)
174 def _notStaleNodeHandler(dict, old=old):
175 """ called when we get a pong from the old node """
177 if dict['id'] == old.id:
178 self.table.justSeenNode(old.id)
180 df = old.ping(self.node.id)
181 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
183 def sendPing(self, node, callback=None):
187 df = node.ping(self.node.id)
188 ## these are the callbacks we use when we issue a PING
189 def _pongHandler(dict, node=node, table=self.table, callback=callback):
190 _krpc_sender = dict['_krpc_sender']
192 sender = {'id' : dict['id']}
193 sender['host'] = _krpc_sender[0]
194 sender['port'] = _krpc_sender[1]
195 n = self.Node().initWithDict(sender)
196 n.conn = self.udp.connectionForAddr((n.host, n.port))
200 def _defaultPong(err, node=node, table=self.table, callback=callback):
201 table.nodeFailed(node)
205 df.addCallbacks(_pongHandler,_defaultPong)
207 def findCloseNodes(self, callback=lambda a: None):
209 This does a findNode on the ID one away from our own.
210 This will allow us to populate our table with nodes on our network closest to our own.
211 This is called as soon as we start up with an empty table
213 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
214 self.findNode(id, callback)
216 def refreshTable(self, force=0):
218 force=1 will refresh table regardless of last bucket access time
223 for bucket in self.table.buckets:
224 if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
225 id = newIDInRange(bucket.min, bucket.max)
226 self.findNode(id, callback)
230 Returns (num_contacts, num_nodes)
231 num_contacts: number contacts in our routing table
232 num_nodes: number of nodes estimated in the entire dht
234 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
235 num_nodes = const.K * (2**(len(self.table.buckets) - 1))
236 return (num_contacts, num_nodes)
238 def krpc_ping(self, id, _krpc_sender):
240 sender['host'] = _krpc_sender[0]
241 sender['port'] = _krpc_sender[1]
242 n = self.Node().initWithDict(sender)
243 n.conn = self.udp.connectionForAddr((n.host, n.port))
244 self.insertNode(n, contacted=0)
245 return {"id" : self.node.id}
247 def krpc_find_node(self, target, id, _krpc_sender):
248 nodes = self.table.findNodes(target)
249 nodes = map(lambda node: node.senderDict(), nodes)
251 sender['host'] = _krpc_sender[0]
252 sender['port'] = _krpc_sender[1]
253 n = self.Node().initWithDict(sender)
254 n.conn = self.udp.connectionForAddr((n.host, n.port))
255 self.insertNode(n, contacted=0)
256 return {"nodes" : nodes, "id" : self.node.id}
259 ## This class provides read-only access to the DHT, valueForKey
260 ## you probably want to use this mixin and provide your own write methods
261 class KhashmirRead(KhashmirBase):
263 def retrieveValues(self, key):
264 c = self.store.cursor()
265 c.execute("select value from kv where key = %s;", sqlite.encode(key))
273 def valueForKey(self, key, callback, searchlocal = 1):
274 """ returns the values found for key in global table
275 callback will be called with a list of values for each peer that returns unique values
276 final callback will be an empty list - probably should change to 'more coming' arg
278 nodes = self.table.findNodes(key)
282 l = self.retrieveValues(key)
284 reactor.callLater(0, callback, (l))
288 # create our search state
289 state = GetValue(self, key, callback)
290 reactor.callLater(0, state.goWithNodes, nodes, l)
292 def krpc_find_value(self, key, id, _krpc_sender):
294 sender['host'] = _krpc_sender[0]
295 sender['port'] = _krpc_sender[1]
296 n = self.Node().initWithDict(sender)
297 n.conn = self.udp.connectionForAddr((n.host, n.port))
298 self.insertNode(n, contacted=0)
300 l = self.retrieveValues(key)
302 return {'values' : l, "id": self.node.id}
304 nodes = self.table.findNodes(key)
305 nodes = map(lambda node: node.senderDict(), nodes)
306 return {'nodes' : nodes, "id": self.node.id}
308 ### provides a generic write method, you probably don't want to deploy something that allows
309 ### arbitrary value storage
310 class KhashmirWrite(KhashmirRead):
312 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
313 def storeValueForKey(self, key, value, callback=None):
314 """ stores the value for key in the global table, returns immediately, no status
315 in this implementation, peers respond but don't indicate status to storing values
316 a key can have many values
318 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
321 def _storedValueHandler(sender):
323 response=_storedValueHandler
324 action = StoreValue(self.table, key, value, response)
325 reactor.callLater(0, action.goWithNodes, nodes)
327 # this call is asynch
328 self.findNode(key, _storeValueForKey)
330 def krpc_store_value(self, key, value, id, _krpc_sender):
332 c = self.store.cursor()
334 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
335 except sqlite.IntegrityError, reason:
336 # update last insert time
337 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
340 sender['host'] = _krpc_sender[0]
341 sender['port'] = _krpc_sender[1]
342 n = self.Node().initWithDict(sender)
343 n.conn = self.udp.connectionForAddr((n.host, n.port))
344 self.insertNode(n, contacted=0)
345 return {"id" : self.node.id}
347 # the whole shebang, for testing
348 class Khashmir(KhashmirWrite):