1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 from random import randrange
7 import sqlite ## find this at http://pysqlite.sourceforge.net/
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol
11 from twisted.internet import reactor
13 from ktable import KTable
14 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
15 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 class KhashmirDBExcept(Exception):
22 # this is the base class, has base functionality and find node, no key-value mappings
23 class KhashmirBase(protocol.Factory):
25 def __init__(self, config, cache_dir='/tmp'):
27 self.setup(config, cache_dir)
29 def setup(self, config, cache_dir):
31 self._findDB(os.path.join(cache_dir, 'khashmir.db'))
32 self.port = config['PORT']
33 self.node = self._loadSelfNode('', self.port)
34 self.table = KTable(self.node, config)
35 #self.app = service.Application("krpc")
36 self.udp = krpc.hostbroker(self)
37 self.udp.protocol = krpc.KRPC
38 self.listenport = reactor.listenUDP(port, self.udp)
40 self._loadRoutingTable()
41 KeyExpirer(self.store, config)
42 self.refreshTable(force=1)
43 reactor.callLater(60, self.checkpoint, (1,))
51 self.listenport.stopListening()
53 def _loadSelfNode(self, host, port):
54 c = self.store.cursor()
55 c.execute('select id from self where num = 0;')
60 return self._Node().init(id, host, port)
62 def _saveSelfNode(self):
63 c = self.store.cursor()
64 c.execute('delete from self where num = 0;')
65 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
68 def checkpoint(self, auto=0):
70 self._dumpRoutingTable()
73 reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
74 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
75 self.checkpoint, (1,))
77 def _findDB(self, db):
85 def _loadDB(self, db):
87 self.store = sqlite.connect(db=db)
88 #self.store.autocommit = 0
91 raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
93 def _createNewDB(self, db):
94 self.store = sqlite.connect(db=db)
96 create table kv (key binary, value binary, time timestamp, primary key (key, value));
97 create index kv_key on kv(key);
98 create index kv_timestamp on kv(time);
100 create table nodes (id binary primary key, host text, port number);
102 create table self (num number primary key, id binary);
104 c = self.store.cursor()
108 def _dumpRoutingTable(self):
110 save routing table nodes to the database
112 c = self.store.cursor()
113 c.execute("delete from nodes where id not NULL;")
114 for bucket in self.table.buckets:
115 for node in bucket.l:
116 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
119 def _loadRoutingTable(self):
121 load routing table nodes from database
122 it's usually a good idea to call refreshTable(force=1) after loading the table
124 c = self.store.cursor()
125 c.execute("select * from nodes;")
126 for rec in c.fetchall():
127 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
128 n.conn = self.udp.connectionForAddr((n.host, n.port))
129 self.table.insertNode(n, contacted=0)
133 ####### LOCAL INTERFACE - use these methods!
134 def addContact(self, host, port, callback=None):
136 ping this node and add the contact info to the table on pong!
138 n =self.Node().init(NULL_ID, host, port)
139 n.conn = self.udp.connectionForAddr((n.host, n.port))
140 self.sendPing(n, callback=callback)
142 ## this call is async!
143 def findNode(self, id, callback, errback=None):
144 """ returns the contact info for node, or the k closest nodes, from the global table """
145 # get K nodes out of local table/cache, or the node we want
146 nodes = self.table.findNodes(id)
149 d.addCallbacks(callback, errback)
151 d.addCallback(callback)
152 if len(nodes) == 1 and nodes[0].id == id :
155 # create our search state
156 state = FindNode(self, id, d.callback, self.config)
157 reactor.callLater(0, state.goWithNodes, nodes)
159 def insertNode(self, n, contacted=1):
161 insert a node in our local table, pinging oldest contact in bucket, if necessary
163 If all you have is a host/port, then use addContact, which calls this method after
164 receiving the PONG from the remote node. The reason for the seperation is we can't insert
165 a node into the table without it's peer-ID. That means of course the node passed into this
166 method needs to be a properly formed Node object with a valid ID.
168 old = self.table.insertNode(n, contacted=contacted)
169 if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
170 # the bucket is full, check to see if old node is still around and if so, replace it
172 ## these are the callbacks used when we ping the oldest node in a bucket
173 def _staleNodeHandler(oldnode=old, newnode = n):
174 """ called if the pinged node never responds """
175 self.table.replaceStaleNode(old, newnode)
177 def _notStaleNodeHandler(dict, old=old):
178 """ called when we get a pong from the old node """
180 if dict['id'] == old.id:
181 self.table.justSeenNode(old.id)
183 df = old.ping(self.node.id)
184 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
186 def sendPing(self, node, callback=None):
190 df = node.ping(self.node.id)
191 ## these are the callbacks we use when we issue a PING
192 def _pongHandler(dict, node=node, table=self.table, callback=callback):
193 _krpc_sender = dict['_krpc_sender']
195 sender = {'id' : dict['id']}
196 sender['host'] = _krpc_sender[0]
197 sender['port'] = _krpc_sender[1]
198 n = self.Node().initWithDict(sender)
199 n.conn = self.udp.connectionForAddr((n.host, n.port))
203 def _defaultPong(err, node=node, table=self.table, callback=callback):
204 table.nodeFailed(node)
208 df.addCallbacks(_pongHandler,_defaultPong)
210 def findCloseNodes(self, callback=lambda a: None):
212 This does a findNode on the ID one away from our own.
213 This will allow us to populate our table with nodes on our network closest to our own.
214 This is called as soon as we start up with an empty table
216 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
217 self.findNode(id, callback)
219 def refreshTable(self, force=0):
221 force=1 will refresh table regardless of last bucket access time
226 for bucket in self.table.buckets:
227 if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
228 id = newIDInRange(bucket.min, bucket.max)
229 self.findNode(id, callback)
233 Returns (num_contacts, num_nodes)
234 num_contacts: number contacts in our routing table
235 num_nodes: number of nodes estimated in the entire dht
237 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
238 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
239 return (num_contacts, num_nodes)
241 def krpc_ping(self, id, _krpc_sender):
243 sender['host'] = _krpc_sender[0]
244 sender['port'] = _krpc_sender[1]
245 n = self.Node().initWithDict(sender)
246 n.conn = self.udp.connectionForAddr((n.host, n.port))
247 self.insertNode(n, contacted=0)
248 return {"id" : self.node.id}
250 def krpc_find_node(self, target, id, _krpc_sender):
251 nodes = self.table.findNodes(target)
252 nodes = map(lambda node: node.senderDict(), nodes)
254 sender['host'] = _krpc_sender[0]
255 sender['port'] = _krpc_sender[1]
256 n = self.Node().initWithDict(sender)
257 n.conn = self.udp.connectionForAddr((n.host, n.port))
258 self.insertNode(n, contacted=0)
259 return {"nodes" : nodes, "id" : self.node.id}
262 ## This class provides read-only access to the DHT, valueForKey
263 ## you probably want to use this mixin and provide your own write methods
264 class KhashmirRead(KhashmirBase):
266 def retrieveValues(self, key):
267 c = self.store.cursor()
268 c.execute("select value from kv where key = %s;", sqlite.encode(key))
276 def valueForKey(self, key, callback, searchlocal = 1):
277 """ returns the values found for key in global table
278 callback will be called with a list of values for each peer that returns unique values
279 final callback will be an empty list - probably should change to 'more coming' arg
281 nodes = self.table.findNodes(key)
285 l = self.retrieveValues(key)
287 reactor.callLater(0, callback, (l))
291 # create our search state
292 state = GetValue(self, key, callback, self.config)
293 reactor.callLater(0, state.goWithNodes, nodes, l)
295 def krpc_find_value(self, key, id, _krpc_sender):
297 sender['host'] = _krpc_sender[0]
298 sender['port'] = _krpc_sender[1]
299 n = self.Node().initWithDict(sender)
300 n.conn = self.udp.connectionForAddr((n.host, n.port))
301 self.insertNode(n, contacted=0)
303 l = self.retrieveValues(key)
305 return {'values' : l, "id": self.node.id}
307 nodes = self.table.findNodes(key)
308 nodes = map(lambda node: node.senderDict(), nodes)
309 return {'nodes' : nodes, "id": self.node.id}
311 ### provides a generic write method, you probably don't want to deploy something that allows
312 ### arbitrary value storage
313 class KhashmirWrite(KhashmirRead):
315 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
316 def storeValueForKey(self, key, value, callback=None):
317 """ stores the value for key in the global table, returns immediately, no status
318 in this implementation, peers respond but don't indicate status to storing values
319 a key can have many values
321 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
324 def _storedValueHandler(sender):
326 response=_storedValueHandler
327 action = StoreValue(self.table, key, value, response, self.config)
328 reactor.callLater(0, action.goWithNodes, nodes)
330 # this call is asynch
331 self.findNode(key, _storeValueForKey)
333 def krpc_store_value(self, key, value, id, _krpc_sender):
335 c = self.store.cursor()
337 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
338 except sqlite.IntegrityError, reason:
339 # update last insert time
340 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
343 sender['host'] = _krpc_sender[0]
344 sender['port'] = _krpc_sender[1]
345 n = self.Node().initWithDict(sender)
346 n.conn = self.udp.connectionForAddr((n.host, n.port))
347 self.insertNode(n, contacted=0)
348 return {"id" : self.node.id}
350 # the whole shebang, for testing
351 class Khashmir(KhashmirWrite):