1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from const import reactor
11 from ktable import KTable, K
14 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.application import service, internet
22 from twisted.web import server
25 from random import randrange
27 import sqlite ## find this at http://pysqlite.sourceforge.net/
29 class KhashmirDBExcept(Exception):
32 # this is the base class, has base functionality and find node, no key-value mappings
33 class KhashmirBase(protocol.Factory):
34 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
36 def __init__(self, host, port, db='khashmir.db'):
37 self.setup(host, port, db)
39 def setup(self, host, port, db='khashmir.db'):
42 self.node = self._loadSelfNode(host, port)
43 self.table = KTable(self.node)
44 #self.app = service.Application("krpc")
45 self.udp = krpc.hostbroker(self)
46 self.udp.protocol = krpc.KRPC
47 self.listenport = reactor.listenUDP(port, self.udp)
48 self.last = time.time()
49 self._loadRoutingTable()
50 KeyExpirer(store=self.store)
51 self.refreshTable(force=1)
52 reactor.callLater(60, self.checkpoint, (1,))
60 self.listenport.stopListening()
62 def _loadSelfNode(self, host, port):
63 c = self.store.cursor()
64 c.execute('select id from self where num = 0;')
69 return self._Node().init(id, host, port)
71 def _saveSelfNode(self):
72 c = self.store.cursor()
73 c.execute('delete from self where num = 0;')
74 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
77 def checkpoint(self, auto=0):
79 self._dumpRoutingTable()
82 reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
84 def _findDB(self, db):
93 def _loadDB(self, db):
95 self.store = sqlite.connect(db=db)
96 #self.store.autocommit = 0
99 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
101 def _createNewDB(self, db):
102 self.store = sqlite.connect(db=db)
104 create table kv (key binary, value binary, time timestamp, primary key (key, value));
105 create index kv_key on kv(key);
106 create index kv_timestamp on kv(time);
108 create table nodes (id binary primary key, host text, port number);
110 create table self (num number primary key, id binary);
112 c = self.store.cursor()
116 def _dumpRoutingTable(self):
118 save routing table nodes to the database
120 c = self.store.cursor()
121 c.execute("delete from nodes where id not NULL;")
122 for bucket in self.table.buckets:
123 for node in bucket.l:
124 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
127 def _loadRoutingTable(self):
129 load routing table nodes from database
130 it's usually a good idea to call refreshTable(force=1) after loading the table
132 c = self.store.cursor()
133 c.execute("select * from nodes;")
134 for rec in c.fetchall():
135 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
136 n.conn = self.udp.connectionForAddr((n.host, n.port))
137 self.table.insertNode(n, contacted=0)
141 ####### LOCAL INTERFACE - use these methods!
142 def addContact(self, host, port, callback=None):
144 ping this node and add the contact info to the table on pong!
146 n =self.Node().init(const.NULL_ID, host, port)
147 n.conn = self.udp.connectionForAddr((n.host, n.port))
148 self.sendPing(n, callback=callback)
150 ## this call is async!
151 def findNode(self, id, callback, errback=None):
152 """ returns the contact info for node, or the k closest nodes, from the global table """
153 # get K nodes out of local table/cache, or the node we want
154 nodes = self.table.findNodes(id)
157 d.addCallbacks(callback, errback)
159 d.addCallback(callback)
160 if len(nodes) == 1 and nodes[0].id == id :
163 # create our search state
164 state = FindNode(self, id, d.callback)
165 reactor.callLater(0, state.goWithNodes, nodes)
167 def insertNode(self, n, contacted=1):
169 insert a node in our local table, pinging oldest contact in bucket, if necessary
171 If all you have is a host/port, then use addContact, which calls this method after
172 receiving the PONG from the remote node. The reason for the seperation is we can't insert
173 a node into the table without it's peer-ID. That means of course the node passed into this
174 method needs to be a properly formed Node object with a valid ID.
176 old = self.table.insertNode(n, contacted=contacted)
177 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
178 # the bucket is full, check to see if old node is still around and if so, replace it
180 ## these are the callbacks used when we ping the oldest node in a bucket
181 def _staleNodeHandler(oldnode=old, newnode = n):
182 """ called if the pinged node never responds """
183 self.table.replaceStaleNode(old, newnode)
185 def _notStaleNodeHandler(dict, old=old):
186 """ called when we get a pong from the old node """
188 if dict['id'] == old.id:
189 self.table.justSeenNode(old.id)
191 df = old.ping(self.node.id)
192 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
194 def sendPing(self, node, callback=None):
198 df = node.ping(self.node.id)
199 ## these are the callbacks we use when we issue a PING
200 def _pongHandler(dict, node=node, table=self.table, callback=callback):
201 _krpc_sender = dict['_krpc_sender']
203 sender = {'id' : dict['id']}
204 sender['host'] = _krpc_sender[0]
205 sender['port'] = _krpc_sender[1]
206 n = self.Node().initWithDict(sender)
207 n.conn = self.udp.connectionForAddr((n.host, n.port))
211 def _defaultPong(err, node=node, table=self.table, callback=callback):
212 table.nodeFailed(node)
216 df.addCallbacks(_pongHandler,_defaultPong)
218 def findCloseNodes(self, callback=lambda a: None):
220 This does a findNode on the ID one away from our own.
221 This will allow us to populate our table with nodes on our network closest to our own.
222 This is called as soon as we start up with an empty table
224 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
225 self.findNode(id, callback)
227 def refreshTable(self, force=0):
229 force=1 will refresh table regardless of last bucket access time
234 for bucket in self.table.buckets:
235 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
236 id = newIDInRange(bucket.min, bucket.max)
237 self.findNode(id, callback)
241 Returns (num_contacts, num_nodes)
242 num_contacts: number contacts in our routing table
243 num_nodes: number of nodes estimated in the entire dht
245 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
246 num_nodes = const.K * (2**(len(self.table.buckets) - 1))
247 return (num_contacts, num_nodes)
249 def krpc_ping(self, id, _krpc_sender):
251 sender['host'] = _krpc_sender[0]
252 sender['port'] = _krpc_sender[1]
253 n = self.Node().initWithDict(sender)
254 n.conn = self.udp.connectionForAddr((n.host, n.port))
255 self.insertNode(n, contacted=0)
256 return {"id" : self.node.id}
258 def krpc_find_node(self, target, id, _krpc_sender):
259 nodes = self.table.findNodes(target)
260 nodes = map(lambda node: node.senderDict(), nodes)
262 sender['host'] = _krpc_sender[0]
263 sender['port'] = _krpc_sender[1]
264 n = self.Node().initWithDict(sender)
265 n.conn = self.udp.connectionForAddr((n.host, n.port))
266 self.insertNode(n, contacted=0)
267 return {"nodes" : nodes, "id" : self.node.id}
270 ## This class provides read-only access to the DHT, valueForKey
271 ## you probably want to use this mixin and provide your own write methods
272 class KhashmirRead(KhashmirBase):
274 def retrieveValues(self, key):
275 c = self.store.cursor()
276 c.execute("select value from kv where key = %s;", sqlite.encode(key))
284 def valueForKey(self, key, callback, searchlocal = 1):
285 """ returns the values found for key in global table
286 callback will be called with a list of values for each peer that returns unique values
287 final callback will be an empty list - probably should change to 'more coming' arg
289 nodes = self.table.findNodes(key)
293 l = self.retrieveValues(key)
295 reactor.callLater(0, callback, (l))
299 # create our search state
300 state = GetValue(self, key, callback)
301 reactor.callLater(0, state.goWithNodes, nodes, l)
303 def krpc_find_value(self, key, id, _krpc_sender):
305 sender['host'] = _krpc_sender[0]
306 sender['port'] = _krpc_sender[1]
307 n = self.Node().initWithDict(sender)
308 n.conn = self.udp.connectionForAddr((n.host, n.port))
309 self.insertNode(n, contacted=0)
311 l = self.retrieveValues(key)
313 return {'values' : l, "id": self.node.id}
315 nodes = self.table.findNodes(key)
316 nodes = map(lambda node: node.senderDict(), nodes)
317 return {'nodes' : nodes, "id": self.node.id}
319 ### provides a generic write method, you probably don't want to deploy something that allows
320 ### arbitrary value storage
321 class KhashmirWrite(KhashmirRead):
323 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
324 def storeValueForKey(self, key, value, callback=None):
325 """ stores the value for key in the global table, returns immediately, no status
326 in this implementation, peers respond but don't indicate status to storing values
327 a key can have many values
329 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
332 def _storedValueHandler(sender):
334 response=_storedValueHandler
335 action = StoreValue(self.table, key, value, response)
336 reactor.callLater(action.goWithNodes, nodes)
338 # this call is asynch
339 self.findNode(key, _storeValueForKey)
341 def krpc_store_value(self, key, value, id, _krpc_sender):
342 t = "%0.6f" % time.time()
343 c = self.store.cursor()
345 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
346 except sqlite.IntegrityError, reason:
347 # update last insert time
348 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
351 sender['host'] = _krpc_sender[0]
352 sender['port'] = _krpc_sender[1]
353 n = self.Node().initWithDict(sender)
354 n.conn = self.udp.connectionForAddr((n.host, n.port))
355 self.insertNode(n, contacted=0)
356 return {"id" : self.node.id}
358 # the whole shebang, for testing
359 class Khashmir(KhashmirWrite):