1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from const import reactor
11 from ktable import KTable, K
14 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.application import service, internet
23 from twisted.web import server
27 from random import randrange
29 import sqlite ## find this at http://pysqlite.sourceforge.net/
31 class KhashmirDBExcept(Exception):
34 # this is the base class, has base functionality and find node, no key-value mappings
35 class KhashmirBase(protocol.Factory):
36 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
38 def __init__(self, host, port, db='khashmir.db'):
39 self.setup(host, port, db)
41 def setup(self, host, port, db='khashmir.db'):
44 self.node = self._loadSelfNode(host, port)
45 self.table = KTable(self.node)
46 #self.app = service.Application("krpc")
47 self.udp = krpc.hostbroker(self)
48 self.udp.protocol = krpc.KRPC
49 self.listenport = reactor.listenUDP(port, self.udp)
50 self.last = time.time()
51 self._loadRoutingTable()
52 KeyExpirer(store=self.store)
53 self.refreshTable(force=1)
54 reactor.callLater(60, self.checkpoint, (1,))
62 self.listenport.stopListening()
64 def _loadSelfNode(self, host, port):
65 c = self.store.cursor()
66 c.execute('select id from self where num = 0;')
71 return self._Node().init(id, host, port)
73 def _saveSelfNode(self):
74 c = self.store.cursor()
75 c.execute('delete from self where num = 0;')
76 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
79 def checkpoint(self, auto=0):
81 self._dumpRoutingTable()
84 reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
86 def _findDB(self, db):
95 def _loadDB(self, db):
97 self.store = sqlite.connect(db=db)
98 #self.store.autocommit = 0
101 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
103 def _createNewDB(self, db):
104 self.store = sqlite.connect(db=db)
106 create table kv (key binary, value binary, time timestamp, primary key (key, value));
107 create index kv_key on kv(key);
108 create index kv_timestamp on kv(time);
110 create table nodes (id binary primary key, host text, port number);
112 create table self (num number primary key, id binary);
114 c = self.store.cursor()
118 def _dumpRoutingTable(self):
120 save routing table nodes to the database
122 c = self.store.cursor()
123 c.execute("delete from nodes where id not NULL;")
124 for bucket in self.table.buckets:
125 for node in bucket.l:
126 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
129 def _loadRoutingTable(self):
131 load routing table nodes from database
132 it's usually a good idea to call refreshTable(force=1) after loading the table
134 c = self.store.cursor()
135 c.execute("select * from nodes;")
136 for rec in c.fetchall():
137 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
138 n.conn = self.udp.connectionForAddr((n.host, n.port))
139 self.table.insertNode(n, contacted=0)
143 ####### LOCAL INTERFACE - use these methods!
144 def addContact(self, host, port, callback=None):
146 ping this node and add the contact info to the table on pong!
148 n =self.Node().init(const.NULL_ID, host, port)
149 n.conn = self.udp.connectionForAddr((n.host, n.port))
150 self.sendPing(n, callback=callback)
152 ## this call is async!
153 def findNode(self, id, callback, errback=None):
154 """ returns the contact info for node, or the k closest nodes, from the global table """
155 # get K nodes out of local table/cache, or the node we want
156 nodes = self.table.findNodes(id)
159 d.addCallbacks(callback, errback)
161 d.addCallback(callback)
162 if len(nodes) == 1 and nodes[0].id == id :
165 # create our search state
166 state = FindNode(self, id, d.callback)
167 reactor.callFromThread(state.goWithNodes, nodes)
169 def insertNode(self, n, contacted=1):
171 insert a node in our local table, pinging oldest contact in bucket, if necessary
173 If all you have is a host/port, then use addContact, which calls this method after
174 receiving the PONG from the remote node. The reason for the seperation is we can't insert
175 a node into the table without it's peer-ID. That means of course the node passed into this
176 method needs to be a properly formed Node object with a valid ID.
178 old = self.table.insertNode(n, contacted=contacted)
179 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
180 # the bucket is full, check to see if old node is still around and if so, replace it
182 ## these are the callbacks used when we ping the oldest node in a bucket
183 def _staleNodeHandler(oldnode=old, newnode = n):
184 """ called if the pinged node never responds """
185 self.table.replaceStaleNode(old, newnode)
187 def _notStaleNodeHandler(dict, old=old):
188 """ called when we get a pong from the old node """
190 if dict['id'] == old.id:
191 self.table.justSeenNode(old.id)
193 df = old.ping(self.node.id)
194 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
196 def sendPing(self, node, callback=None):
200 df = node.ping(self.node.id)
201 ## these are the callbacks we use when we issue a PING
202 def _pongHandler(dict, node=node, table=self.table, callback=callback):
203 _krpc_sender = dict['_krpc_sender']
205 sender = {'id' : dict['id']}
206 sender['host'] = _krpc_sender[0]
207 sender['port'] = _krpc_sender[1]
208 n = self.Node().initWithDict(sender)
209 n.conn = self.udp.connectionForAddr((n.host, n.port))
213 def _defaultPong(err, node=node, table=self.table, callback=callback):
214 table.nodeFailed(node)
218 df.addCallbacks(_pongHandler,_defaultPong)
220 def findCloseNodes(self, callback=lambda a: None):
222 This does a findNode on the ID one away from our own.
223 This will allow us to populate our table with nodes on our network closest to our own.
224 This is called as soon as we start up with an empty table
226 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
227 self.findNode(id, callback)
229 def refreshTable(self, force=0):
231 force=1 will refresh table regardless of last bucket access time
236 for bucket in self.table.buckets:
237 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
238 id = newIDInRange(bucket.min, bucket.max)
239 self.findNode(id, callback)
243 Returns (num_contacts, num_nodes)
244 num_contacts: number contacts in our routing table
245 num_nodes: number of nodes estimated in the entire dht
247 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
248 num_nodes = const.K * (2**(len(self.table.buckets) - 1))
249 return (num_contacts, num_nodes)
251 def krpc_ping(self, id, _krpc_sender):
253 sender['host'] = _krpc_sender[0]
254 sender['port'] = _krpc_sender[1]
255 n = self.Node().initWithDict(sender)
256 n.conn = self.udp.connectionForAddr((n.host, n.port))
257 self.insertNode(n, contacted=0)
258 return {"id" : self.node.id}
260 def krpc_find_node(self, target, id, _krpc_sender):
261 nodes = self.table.findNodes(target)
262 nodes = map(lambda node: node.senderDict(), nodes)
264 sender['host'] = _krpc_sender[0]
265 sender['port'] = _krpc_sender[1]
266 n = self.Node().initWithDict(sender)
267 n.conn = self.udp.connectionForAddr((n.host, n.port))
268 self.insertNode(n, contacted=0)
269 return {"nodes" : nodes, "id" : self.node.id}
272 ## This class provides read-only access to the DHT, valueForKey
273 ## you probably want to use this mixin and provide your own write methods
274 class KhashmirRead(KhashmirBase):
276 def retrieveValues(self, key):
277 c = self.store.cursor()
278 c.execute("select value from kv where key = %s;", sqlite.encode(key))
286 def valueForKey(self, key, callback, searchlocal = 1):
287 """ returns the values found for key in global table
288 callback will be called with a list of values for each peer that returns unique values
289 final callback will be an empty list - probably should change to 'more coming' arg
291 nodes = self.table.findNodes(key)
295 l = self.retrieveValues(key)
297 reactor.callLater(0, callback, (l))
301 # create our search state
302 state = GetValue(self, key, callback)
303 reactor.callFromThread(state.goWithNodes, nodes, l)
305 def krpc_find_value(self, key, id, _krpc_sender):
307 sender['host'] = _krpc_sender[0]
308 sender['port'] = _krpc_sender[1]
309 n = self.Node().initWithDict(sender)
310 n.conn = self.udp.connectionForAddr((n.host, n.port))
311 self.insertNode(n, contacted=0)
313 l = self.retrieveValues(key)
315 return {'values' : l, "id": self.node.id}
317 nodes = self.table.findNodes(key)
318 nodes = map(lambda node: node.senderDict(), nodes)
319 return {'nodes' : nodes, "id": self.node.id}
321 ### provides a generic write method, you probably don't want to deploy something that allows
322 ### arbitrary value storage
323 class KhashmirWrite(KhashmirRead):
325 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
326 def storeValueForKey(self, key, value, callback=None):
327 """ stores the value for key in the global table, returns immediately, no status
328 in this implementation, peers respond but don't indicate status to storing values
329 a key can have many values
331 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
334 def _storedValueHandler(sender):
336 response=_storedValueHandler
337 action = StoreValue(self.table, key, value, response)
338 reactor.callFromThread(action.goWithNodes, nodes)
340 # this call is asynch
341 self.findNode(key, _storeValueForKey)
343 def krpc_store_value(self, key, value, id, _krpc_sender):
344 t = "%0.6f" % time.time()
345 c = self.store.cursor()
347 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
348 except sqlite.IntegrityError, reason:
349 # update last insert time
350 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
353 sender['host'] = _krpc_sender[0]
354 sender['port'] = _krpc_sender[1]
355 n = self.Node().initWithDict(sender)
356 n.conn = self.udp.connectionForAddr((n.host, n.port))
357 self.insertNode(n, contacted=0)
358 return {"id" : self.node.id}
360 # the whole shebang, for testing
361 class Khashmir(KhashmirWrite):