1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from const import reactor
11 from ktable import KTable, K
12 from knode import KNode as Node
14 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.application import service, internet
23 from twisted.web import server
27 import sqlite ## find this at http://pysqlite.sourceforge.net/
29 class KhashmirDBExcept(Exception):
32 # this is the main class!
33 class Khashmir(protocol.Factory):
34 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
35 def __init__(self, host, port, db='khashmir.db'):
36 self.setup(host, port, db)
38 def setup(self, host, port, db='khashmir.db'):
41 self.node = self._loadSelfNode(host, port)
42 self.table = KTable(self.node)
43 self.app = service.Application("krpc")
44 self.udp = krpc.hostbroker(self)
45 self.udp.protocol = krpc.KRPC
46 self.listenport = reactor.listenUDP(port, self.udp)
47 self.last = time.time()
48 self._loadRoutingTable()
49 KeyExpirer(store=self.store)
50 #self.refreshTable(force=1)
51 reactor.callLater(60, self.checkpoint, (1,))
54 self.listenport.stopListening()
56 def _loadSelfNode(self, host, port):
57 c = self.store.cursor()
58 c.execute('select id from self where num = 0;')
63 return Node().init(id, host, port)
65 def _saveSelfNode(self):
66 self.store.autocommit = 0
67 c = self.store.cursor()
68 c.execute('delete from self where num = 0;')
69 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
71 self.store.autocommit = 1
73 def checkpoint(self, auto=0):
75 self._dumpRoutingTable()
77 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
79 def _findDB(self, db):
88 def _loadDB(self, db):
90 self.store = sqlite.connect(db=db)
91 self.store.autocommit = 1
94 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
96 def _createNewDB(self, db):
97 self.store = sqlite.connect(db=db)
98 self.store.autocommit = 1
100 create table kv (key binary, value binary, time timestamp, primary key (key, value));
101 create index kv_key on kv(key);
102 create index kv_timestamp on kv(time);
104 create table nodes (id binary primary key, host text, port number);
106 create table self (num number primary key, id binary);
108 c = self.store.cursor()
111 def _dumpRoutingTable(self):
113 save routing table nodes to the database
115 self.store.autocommit = 0;
116 c = self.store.cursor()
117 c.execute("delete from nodes where id not NULL;")
118 for bucket in self.table.buckets:
119 for node in bucket.l:
120 d = node.senderDict()
121 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
123 self.store.autocommit = 1;
125 def _loadRoutingTable(self):
127 load routing table nodes from database
128 it's usually a good idea to call refreshTable(force=1) after loading the table
130 c = self.store.cursor()
131 c.execute("select * from nodes;")
132 for rec in c.fetchall():
133 n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
134 n.conn = self.udp.connectionForAddr((n.host, n.port))
135 self.table.insertNode(n, contacted=0)
139 ####### LOCAL INTERFACE - use these methods!
140 def addContact(self, host, port, callback=None):
142 ping this node and add the contact info to the table on pong!
144 n =Node().init(const.NULL_ID, host, port)
145 n.conn = self.udp.connectionForAddr((n.host, n.port))
146 self.sendPing(n, callback=callback)
148 ## this call is async!
149 def findNode(self, id, callback, errback=None):
150 """ returns the contact info for node, or the k closest nodes, from the global table """
151 # get K nodes out of local table/cache, or the node we want
152 nodes = self.table.findNodes(id)
155 d.addCallbacks(callback, errback)
157 d.addCallback(callback)
158 if len(nodes) == 1 and nodes[0].id == id :
161 # create our search state
162 state = FindNode(self, id, d.callback)
163 reactor.callFromThread(state.goWithNodes, nodes)
167 def valueForKey(self, key, callback, searchlocal = 1):
168 """ returns the values found for key in global table
169 callback will be called with a list of values for each peer that returns unique values
170 final callback will be an empty list - probably should change to 'more coming' arg
172 nodes = self.table.findNodes(key)
176 l = self.retrieveValues(key)
178 reactor.callLater(0, callback, (l))
182 # create our search state
183 state = GetValue(self, key, callback)
184 reactor.callFromThread(state.goWithNodes, nodes, l)
186 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
187 def storeValueForKey(self, key, value, callback=None):
188 """ stores the value for key in the global table, returns immediately, no status
189 in this implementation, peers respond but don't indicate status to storing values
190 a key can have many values
192 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
195 def _storedValueHandler(sender):
197 response=_storedValueHandler
198 action = StoreValue(self.table, key, value, response)
199 reactor.callFromThread(action.goWithNodes, nodes)
201 # this call is asynch
202 self.findNode(key, _storeValueForKey)
205 def insertNode(self, n, contacted=1):
207 insert a node in our local table, pinging oldest contact in bucket, if necessary
209 If all you have is a host/port, then use addContact, which calls this method after
210 receiving the PONG from the remote node. The reason for the seperation is we can't insert
211 a node into the table without it's peer-ID. That means of course the node passed into this
212 method needs to be a properly formed Node object with a valid ID.
214 old = self.table.insertNode(n, contacted=contacted)
215 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
216 # the bucket is full, check to see if old node is still around and if so, replace it
218 ## these are the callbacks used when we ping the oldest node in a bucket
219 def _staleNodeHandler(oldnode=old, newnode = n):
220 """ called if the pinged node never responds """
221 self.table.replaceStaleNode(old, newnode)
223 def _notStaleNodeHandler(dict, old=old):
224 """ called when we get a pong from the old node """
225 _krpc_sender = dict['_krpc_sender']
227 sender = dict['sender']
228 if sender['id'] == old.id:
229 self.table.justSeenNode(old.id)
231 df = old.ping(self.node.senderDict())
232 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
234 def sendPing(self, node, callback=None):
238 df = node.ping(self.node.senderDict())
239 ## these are the callbacks we use when we issue a PING
240 def _pongHandler(dict, node=node, table=self.table, callback=callback):
241 _krpc_sender = dict['_krpc_sender']
243 sender = dict['sender']
244 if node.id != const.NULL_ID and node.id != sender['id']:
245 # whoah, got response from different peer than we were expecting
246 self.table.invalidateNode(node)
248 sender['host'] = node.host
249 sender['port'] = node.port
250 n = Node().initWithDict(sender)
251 n.conn = self.udp.connectionForAddr((n.host, n.port))
255 def _defaultPong(err, node=node, table=self.table, callback=callback):
256 table.nodeFailed(node)
260 df.addCallbacks(_pongHandler,_defaultPong)
262 def findCloseNodes(self, callback=lambda a: None):
264 This does a findNode on the ID one away from our own.
265 This will allow us to populate our table with nodes on our network closest to our own.
266 This is called as soon as we start up with an empty table
268 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
269 self.findNode(id, callback)
271 def refreshTable(self, force=0):
273 force=1 will refresh table regardless of last bucket access time
278 for bucket in self.table.buckets:
279 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
280 id = newIDInRange(bucket.min, bucket.max)
281 self.findNode(id, callback)
284 def retrieveValues(self, key):
285 c = self.store.cursor()
286 c.execute("select value from kv where key = %s;", sqlite.encode(key))
295 ##### INCOMING MESSAGE HANDLERS
297 def krpc_ping(self, sender, _krpc_sender):
299 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
302 sender['host'] = _krpc_sender[0]
303 sender['port'] = _krpc_sender[1]
304 n = Node().initWithDict(sender)
305 n.conn = self.udp.connectionForAddr((n.host, n.port))
306 self.insertNode(n, contacted=0)
307 return {"sender" : self.node.senderDict()}
309 def krpc_find_node(self, target, sender, _krpc_sender):
310 nodes = self.table.findNodes(target)
311 nodes = map(lambda node: node.senderDict(), nodes)
312 sender['host'] = _krpc_sender[0]
313 sender['port'] = _krpc_sender[1]
314 n = Node().initWithDict(sender)
315 n.conn = self.udp.connectionForAddr((n.host, n.port))
316 self.insertNode(n, contacted=0)
317 return {"nodes" : nodes, "sender" : self.node.senderDict()}
319 def krpc_store_value(self, key, value, sender, _krpc_sender):
320 t = "%0.6f" % time.time()
321 c = self.store.cursor()
323 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
324 except sqlite.IntegrityError, reason:
325 # update last insert time
326 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
327 sender['host'] = _krpc_sender[0]
328 sender['port'] = _krpc_sender[1]
329 n = Node().initWithDict(sender)
330 n.conn = self.udp.connectionForAddr((n.host, n.port))
331 self.insertNode(n, contacted=0)
332 return {"sender" : self.node.senderDict()}
334 def krpc_find_value(self, key, sender, _krpc_sender):
335 sender['host'] = _krpc_sender[0]
336 sender['port'] = _krpc_sender[1]
337 n = Node().initWithDict(sender)
338 n.conn = self.udp.connectionForAddr((n.host, n.port))
339 self.insertNode(n, contacted=0)
341 l = self.retrieveValues(key)
343 return {'values' : l, "sender": self.node.senderDict()}
345 nodes = self.table.findNodes(key)
346 nodes = map(lambda node: node.senderDict(), nodes)
347 return {'nodes' : nodes, "sender": self.node.senderDict()}