1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from const import reactor
11 from ktable import KTable, K
14 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.application import service, internet
22 from twisted.web import server
25 from random import randrange
27 import sqlite ## find this at http://pysqlite.sourceforge.net/
29 class KhashmirDBExcept(Exception):
32 # this is the base class, has base functionality and find node, no key-value mappings
33 class KhashmirBase(protocol.Factory):
35 def __init__(self, host, port, db='khashmir.db'):
36 self.setup(host, port, db)
38 def setup(self, host, port, db='khashmir.db'):
41 self.node = self._loadSelfNode(host, port)
42 self.table = KTable(self.node)
43 #self.app = service.Application("krpc")
44 self.udp = krpc.hostbroker(self)
45 self.udp.protocol = krpc.KRPC
46 self.listenport = reactor.listenUDP(port, self.udp)
47 self.last = time.time()
48 self._loadRoutingTable()
49 KeyExpirer(store=self.store)
50 self.refreshTable(force=1)
51 reactor.callLater(60, self.checkpoint, (1,))
59 self.listenport.stopListening()
61 def _loadSelfNode(self, host, port):
62 c = self.store.cursor()
63 c.execute('select id from self where num = 0;')
68 return self._Node().init(id, host, port)
70 def _saveSelfNode(self):
71 c = self.store.cursor()
72 c.execute('delete from self where num = 0;')
73 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
76 def checkpoint(self, auto=0):
78 self._dumpRoutingTable()
81 reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
83 def _findDB(self, db):
92 def _loadDB(self, db):
94 self.store = sqlite.connect(db=db)
95 #self.store.autocommit = 0
98 raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
100 def _createNewDB(self, db):
101 self.store = sqlite.connect(db=db)
103 create table kv (key binary, value binary, time timestamp, primary key (key, value));
104 create index kv_key on kv(key);
105 create index kv_timestamp on kv(time);
107 create table nodes (id binary primary key, host text, port number);
109 create table self (num number primary key, id binary);
111 c = self.store.cursor()
115 def _dumpRoutingTable(self):
117 save routing table nodes to the database
119 c = self.store.cursor()
120 c.execute("delete from nodes where id not NULL;")
121 for bucket in self.table.buckets:
122 for node in bucket.l:
123 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
126 def _loadRoutingTable(self):
128 load routing table nodes from database
129 it's usually a good idea to call refreshTable(force=1) after loading the table
131 c = self.store.cursor()
132 c.execute("select * from nodes;")
133 for rec in c.fetchall():
134 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
135 n.conn = self.udp.connectionForAddr((n.host, n.port))
136 self.table.insertNode(n, contacted=0)
140 ####### LOCAL INTERFACE - use these methods!
141 def addContact(self, host, port, callback=None):
143 ping this node and add the contact info to the table on pong!
145 n =self.Node().init(const.NULL_ID, host, port)
146 n.conn = self.udp.connectionForAddr((n.host, n.port))
147 self.sendPing(n, callback=callback)
149 ## this call is async!
150 def findNode(self, id, callback, errback=None):
151 """ returns the contact info for node, or the k closest nodes, from the global table """
152 # get K nodes out of local table/cache, or the node we want
153 nodes = self.table.findNodes(id)
156 d.addCallbacks(callback, errback)
158 d.addCallback(callback)
159 if len(nodes) == 1 and nodes[0].id == id :
162 # create our search state
163 state = FindNode(self, id, d.callback)
164 reactor.callLater(0, state.goWithNodes, nodes)
166 def insertNode(self, n, contacted=1):
168 insert a node in our local table, pinging oldest contact in bucket, if necessary
170 If all you have is a host/port, then use addContact, which calls this method after
171 receiving the PONG from the remote node. The reason for the seperation is we can't insert
172 a node into the table without it's peer-ID. That means of course the node passed into this
173 method needs to be a properly formed Node object with a valid ID.
175 old = self.table.insertNode(n, contacted=contacted)
176 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
177 # the bucket is full, check to see if old node is still around and if so, replace it
179 ## these are the callbacks used when we ping the oldest node in a bucket
180 def _staleNodeHandler(oldnode=old, newnode = n):
181 """ called if the pinged node never responds """
182 self.table.replaceStaleNode(old, newnode)
184 def _notStaleNodeHandler(dict, old=old):
185 """ called when we get a pong from the old node """
187 if dict['id'] == old.id:
188 self.table.justSeenNode(old.id)
190 df = old.ping(self.node.id)
191 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
193 def sendPing(self, node, callback=None):
197 df = node.ping(self.node.id)
198 ## these are the callbacks we use when we issue a PING
199 def _pongHandler(dict, node=node, table=self.table, callback=callback):
200 _krpc_sender = dict['_krpc_sender']
202 sender = {'id' : dict['id']}
203 sender['host'] = _krpc_sender[0]
204 sender['port'] = _krpc_sender[1]
205 n = self.Node().initWithDict(sender)
206 n.conn = self.udp.connectionForAddr((n.host, n.port))
210 def _defaultPong(err, node=node, table=self.table, callback=callback):
211 table.nodeFailed(node)
215 df.addCallbacks(_pongHandler,_defaultPong)
217 def findCloseNodes(self, callback=lambda a: None):
219 This does a findNode on the ID one away from our own.
220 This will allow us to populate our table with nodes on our network closest to our own.
221 This is called as soon as we start up with an empty table
223 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
224 self.findNode(id, callback)
226 def refreshTable(self, force=0):
228 force=1 will refresh table regardless of last bucket access time
233 for bucket in self.table.buckets:
234 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
235 id = newIDInRange(bucket.min, bucket.max)
236 self.findNode(id, callback)
240 Returns (num_contacts, num_nodes)
241 num_contacts: number contacts in our routing table
242 num_nodes: number of nodes estimated in the entire dht
244 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
245 num_nodes = const.K * (2**(len(self.table.buckets) - 1))
246 return (num_contacts, num_nodes)
248 def krpc_ping(self, id, _krpc_sender):
250 sender['host'] = _krpc_sender[0]
251 sender['port'] = _krpc_sender[1]
252 n = self.Node().initWithDict(sender)
253 n.conn = self.udp.connectionForAddr((n.host, n.port))
254 self.insertNode(n, contacted=0)
255 return {"id" : self.node.id}
257 def krpc_find_node(self, target, id, _krpc_sender):
258 nodes = self.table.findNodes(target)
259 nodes = map(lambda node: node.senderDict(), nodes)
261 sender['host'] = _krpc_sender[0]
262 sender['port'] = _krpc_sender[1]
263 n = self.Node().initWithDict(sender)
264 n.conn = self.udp.connectionForAddr((n.host, n.port))
265 self.insertNode(n, contacted=0)
266 return {"nodes" : nodes, "id" : self.node.id}
269 ## This class provides read-only access to the DHT, valueForKey
270 ## you probably want to use this mixin and provide your own write methods
271 class KhashmirRead(KhashmirBase):
273 def retrieveValues(self, key):
274 c = self.store.cursor()
275 c.execute("select value from kv where key = %s;", sqlite.encode(key))
283 def valueForKey(self, key, callback, searchlocal = 1):
284 """ returns the values found for key in global table
285 callback will be called with a list of values for each peer that returns unique values
286 final callback will be an empty list - probably should change to 'more coming' arg
288 nodes = self.table.findNodes(key)
292 l = self.retrieveValues(key)
294 reactor.callLater(0, callback, (l))
298 # create our search state
299 state = GetValue(self, key, callback)
300 reactor.callLater(0, state.goWithNodes, nodes, l)
302 def krpc_find_value(self, key, id, _krpc_sender):
304 sender['host'] = _krpc_sender[0]
305 sender['port'] = _krpc_sender[1]
306 n = self.Node().initWithDict(sender)
307 n.conn = self.udp.connectionForAddr((n.host, n.port))
308 self.insertNode(n, contacted=0)
310 l = self.retrieveValues(key)
312 return {'values' : l, "id": self.node.id}
314 nodes = self.table.findNodes(key)
315 nodes = map(lambda node: node.senderDict(), nodes)
316 return {'nodes' : nodes, "id": self.node.id}
318 ### provides a generic write method, you probably don't want to deploy something that allows
319 ### arbitrary value storage
320 class KhashmirWrite(KhashmirRead):
322 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
323 def storeValueForKey(self, key, value, callback=None):
324 """ stores the value for key in the global table, returns immediately, no status
325 in this implementation, peers respond but don't indicate status to storing values
326 a key can have many values
328 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
331 def _storedValueHandler(sender):
333 response=_storedValueHandler
334 action = StoreValue(self.table, key, value, response)
335 reactor.callLater(action.goWithNodes, nodes)
337 # this call is asynch
338 self.findNode(key, _storeValueForKey)
340 def krpc_store_value(self, key, value, id, _krpc_sender):
341 t = "%0.6f" % time.time()
342 c = self.store.cursor()
344 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
345 except sqlite.IntegrityError, reason:
346 # update last insert time
347 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
350 sender['host'] = _krpc_sender[0]
351 sender['port'] = _krpc_sender[1]
352 n = self.Node().initWithDict(sender)
353 n.conn = self.udp.connectionForAddr((n.host, n.port))
354 self.insertNode(n, contacted=0)
355 return {"id" : self.node.id}
357 # the whole shebang, for testing
358 class Khashmir(KhashmirWrite):