3ca5d7250d0cfb82c0529114606e7e62ed04ebd8
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30         __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31         def __init__(self, host, port, db='khashmir.db'):
32                 self.setup(host, port, db)
33                 
34         def setup(self, host, port, db='khashmir.db'):
35                 self._findDB(db)
36                 self.node = self._loadSelfNode(host, port)
37                 self.table = KTable(self.node)
38                 self._loadRoutingTable()
39                 self.app = Application("xmlrpc")
40                 self.app.listenTCP(port, server.Site(self))
41                 self.last = time.time()
42                 KeyExpirer(store=self.store)
43                 #self.refreshTable(force=1)
44                 reactor.callLater(60, self.checkpoint, (1,))
45                 
46         def _loadSelfNode(self, host, port):
47                 c = self.store.cursor()
48                 c.execute('select id from self where num = 0;')
49                 if c.rowcount > 0:
50                         id = c.fetchone()[0].decode('base64')
51                 else:
52                         id = newID()
53                 return Node().init(id, host, port)
54                 
55         def _saveSelfNode(self):
56                 self.store.autocommit = 0
57                 c = self.store.cursor()
58                 c.execute('delete from self where num = 0;')
59                 c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
60                 self.store.commit()
61                 self.store.autocommit = 1
62                 
63         def checkpoint(self, auto=0):
64                 self._saveSelfNode()
65                 self._dumpRoutingTable()
66                 if auto:
67                         reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
68                 
69         def _findDB(self, db):
70                 import os
71                 try:
72                         os.stat(db)
73                 except OSError:
74                         self._createNewDB(db)
75                 else:
76                         self._loadDB(db)
77             
78         def _loadDB(self, db):
79                 try:
80                         self.store = sqlite.connect(db=db)
81                         self.store.autocommit = 1
82                 except:
83                         import traceback
84                         raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
85             
86         def _createNewDB(self, db):
87                 self.store = sqlite.connect(db=db)
88                 self.store.autocommit = 1
89                 s = """
90                         create table kv (key text, value text, time timestamp, primary key (key, value));
91                         create index kv_key on kv(key);
92                         create index kv_timestamp on kv(time);
93                         
94                         create table nodes (id text primary key, host text, port number);
95                         
96                         create table self (num number primary key, id text);
97                         """
98                 c = self.store.cursor()
99                 c.execute(s)
100
101         def _dumpRoutingTable(self):
102                 """
103                         save routing table nodes to the database
104                 """
105                 self.store.autocommit = 0;
106                 c = self.store.cursor()
107                 c.execute("delete from nodes where id not NULL;")
108                 for bucket in self.table.buckets:
109                         for node in bucket.l:
110                                 d = node.senderDict()
111                                 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
112                 self.store.commit()
113                 self.store.autocommit = 1;
114                 
115         def _loadRoutingTable(self):
116                 """
117                         load routing table nodes from database
118                         it's usually a good idea to call refreshTable(force=1) after loading the table
119                 """
120                 c = self.store.cursor()
121                 c.execute("select * from nodes;")
122                 for rec in c.fetchall():
123                         n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
124                         self.table.insertNode(n, contacted=0)
125                         
126         def render(self, request):
127                 """
128                         Override the built in render so we can have access to the request object!
129                         note, crequest is probably only valid on the initial call (not after deferred!)
130                 """
131                 self.crequest = request
132                 return xmlrpc.XMLRPC.render(self, request)
133
134
135         #######
136         #######  LOCAL INTERFACE    - use these methods!
137         def addContact(self, host, port):
138                 """
139                         ping this node and add the contact info to the table on pong!
140                 """
141                 n =Node().init(const.NULL_ID, host, port)  # note, we 
142                 self.sendPing(n)
143
144         ## this call is async!
145         def findNode(self, id, callback, errback=None):
146                 """ returns the contact info for node, or the k closest nodes, from the global table """
147                 # get K nodes out of local table/cache, or the node we want
148                 nodes = self.table.findNodes(id)
149                 d = Deferred()
150                 if errback:
151                         d.addCallbacks(callback, errback)
152                 else:
153                         d.addCallback(callback)
154                 if len(nodes) == 1 and nodes[0].id == id :
155                         d.callback(nodes)
156                 else:
157                         # create our search state
158                         state = FindNode(self, id, d.callback)
159                         reactor.callFromThread(state.goWithNodes, nodes)
160         
161         
162         ## also async
163         def valueForKey(self, key, callback):
164                 """ returns the values found for key in global table
165                         callback will be called with a list of values for each peer that returns unique values
166                         final callback will be an empty list - probably should change to 'more coming' arg
167                 """
168                 nodes = self.table.findNodes(key)
169                 
170                 # get locals
171                 l = self.retrieveValues(key)
172                 if len(l) > 0:
173                         reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
174                 
175                 # create our search state
176                 state = GetValue(self, key, callback)
177                 reactor.callFromThread(state.goWithNodes, nodes, l)
178
179         ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
180         def storeValueForKey(self, key, value, callback=None):
181                 """ stores the value for key in the global table, returns immediately, no status 
182                         in this implementation, peers respond but don't indicate status to storing values
183                         a key can have many values
184                 """
185                 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
186                         if not response:
187                                 # default callback
188                                 def _storedValueHandler(sender):
189                                         pass
190                                 response=_storedValueHandler
191                 
192                         for node in nodes[:const.STORE_REDUNDANCY]:
193                                 def cb(t, table = table, node=node, resp=response):
194                                         self.table.insertNode(node)
195                                         response(t)
196                                 if node.id != self.node.id:
197                                         def default(err, node=node, table=table):
198                                                 table.nodeFailed(node)
199                                         df = node.storeValue(key, value, self.node.senderDict())
200                                         df.addCallbacks(cb, default)
201                 # this call is asynch
202                 self.findNode(key, _storeValueForKey)
203         
204         
205         def insertNode(self, n, contacted=1):
206                 """
207                 insert a node in our local table, pinging oldest contact in bucket, if necessary
208                 
209                 If all you have is a host/port, then use addContact, which calls this method after
210                 receiving the PONG from the remote node.  The reason for the seperation is we can't insert
211                 a node into the table without it's peer-ID.  That means of course the node passed into this
212                 method needs to be a properly formed Node object with a valid ID.
213                 """
214                 old = self.table.insertNode(n, contacted=contacted)
215                 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
216                         # the bucket is full, check to see if old node is still around and if so, replace it
217                         
218                         ## these are the callbacks used when we ping the oldest node in a bucket
219                         def _staleNodeHandler(oldnode=old, newnode = n):
220                                 """ called if the pinged node never responds """
221                                 self.table.replaceStaleNode(old, newnode)
222                         
223                         def _notStaleNodeHandler(sender, old=old):
224                                 """ called when we get a pong from the old node """
225                                 args, sender = sender
226                                 sender = Node().initWithDict(sender)
227                                 if sender.id == old.id:
228                                         self.table.justSeenNode(old)
229                         
230                         df = old.ping(self.node.senderDict())
231                         df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
232
233         def sendPing(self, node):
234                 """
235                         ping a node
236                 """
237                 df = node.ping(self.node.senderDict())
238                 ## these are the callbacks we use when we issue a PING
239                 def _pongHandler(args, node=node, table=self.table):
240                         l, sender = args
241                         if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
242                                 # whoah, got response from different peer than we were expecting
243                                 self.table.invalidateNode(node)
244                         else:
245                                 sender['host'] = node.host
246                                 sender['port'] = node.port
247                                 n = Node().initWithDict(sender)
248                                 table.insertNode(n)
249                                 return
250                 def _defaultPong(err, node=node, table=self.table):
251                         table.nodeFailed(node)
252                 
253                 df.addCallbacks(_pongHandler,_defaultPong)
254
255         def findCloseNodes(self, callback=lambda a: None):
256                 """
257                         This does a findNode on the ID one away from our own.  
258                         This will allow us to populate our table with nodes on our network closest to our own.
259                         This is called as soon as we start up with an empty table
260                 """
261                 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
262                 self.findNode(id, callback)
263
264         def refreshTable(self, force=0):
265                 """
266                         force=1 will refresh table regardless of last bucket access time
267                 """
268                 def callback(nodes):
269                         pass
270         
271                 for bucket in self.table.buckets:
272                         if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
273                                 id = newIDInRange(bucket.min, bucket.max)
274                                 self.findNode(id, callback)
275
276
277         def retrieveValues(self, key):
278                 s = "select value from kv where key = '%s';" % key.encode('base64')
279                 c = self.store.cursor()
280                 c.execute(s)
281                 t = c.fetchone()
282                 l = []
283                 while t:
284                         l.append(t['value'])
285                         t = c.fetchone()
286                 return l
287         
288         #####
289         ##### INCOMING MESSAGE HANDLERS
290         
291         def xmlrpc_ping(self, sender):
292                 """
293                         takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
294                         returns sender dict
295                 """
296                 ip = self.crequest.getClientIP()
297                 sender['host'] = ip
298                 n = Node().initWithDict(sender)
299                 self.insertNode(n, contacted=0)
300                 return (), self.node.senderDict()
301                 
302         def xmlrpc_find_node(self, target, sender):
303                 nodes = self.table.findNodes(target.decode('base64'))
304                 nodes = map(lambda node: node.senderDict(), nodes)
305                 ip = self.crequest.getClientIP()
306                 sender['host'] = ip
307                 n = Node().initWithDict(sender)
308                 self.insertNode(n, contacted=0)
309                 return nodes, self.node.senderDict()
310             
311         def xmlrpc_store_value(self, key, value, sender):
312                 t = "%0.6f" % time.time()
313                 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
314                 c = self.store.cursor()
315                 try:
316                         c.execute(s)
317                 except pysqlite_exceptions.IntegrityError, reason:
318                         # update last insert time
319                         s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
320                         c.execute(s)
321                 ip = self.crequest.getClientIP()
322                 sender['host'] = ip
323                 n = Node().initWithDict(sender)
324                 self.insertNode(n, contacted=0)
325                 return (), self.node.senderDict()
326         
327         def xmlrpc_find_value(self, key, sender):
328                 ip = self.crequest.getClientIP()
329                 key = key.decode('base64')
330                 sender['host'] = ip
331                 n = Node().initWithDict(sender)
332                 self.insertNode(n, contacted=0)
333         
334                 l = self.retrieveValues(key)
335                 if len(l) > 0:
336                         return {'values' : l}, self.node.senderDict()
337                 else:
338                         nodes = self.table.findNodes(key)
339                         nodes = map(lambda node: node.senderDict(), nodes)
340                         return {'nodes' : nodes}, self.node.senderDict()
341
342 ### TESTING ###
343 from random import randrange
344 import threading, thread, sys, time
345 from sha import sha
346 from hash import newID
347
348
349 def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
350         import thread
351         l = []
352         for i in xrange(peers):
353                 a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
354                 l.append(a)
355         thread.start_new_thread(l[0].app.run, ())
356         for peer in l[1:]:
357                 peer.app.run()  
358         return l
359         
360 def test_build_net(quiet=0, peers=24, host='localhost',  pause=0, startport=2001, dbprefix='/tmp/test'):
361         from whrandom import randrange
362         import threading
363         import thread
364         import sys
365         port = startport
366         l = []
367         if not quiet:
368                 print "Building %s peer table." % peers
369         
370         for i in xrange(peers):
371                 a = Khashmir(host, port + i, db = dbprefix +`i`)
372                 l.append(a)
373         
374         
375         thread.start_new_thread(l[0].app.run, ())
376         time.sleep(1)
377         for peer in l[1:]:
378                 peer.app.run()
379         time.sleep(3)
380         
381         print "adding contacts...."
382         
383         for peer in l:
384                 n = l[randrange(0, len(l))].node
385                 peer.addContact(host, n.port)
386                 n = l[randrange(0, len(l))].node
387                 peer.addContact(host, n.port)
388                 n = l[randrange(0, len(l))].node
389                 peer.addContact(host, n.port)
390                 if pause:
391                         time.sleep(.33)
392         
393         time.sleep(10)
394         print "finding close nodes...."
395         
396         for peer in l:
397                 flag = threading.Event()
398                 def cb(nodes, f=flag):
399                         f.set()
400                 peer.findCloseNodes(cb)
401                 flag.wait()
402         #    for peer in l:
403         #       peer.refreshTable()
404         return l
405         
406 def test_find_nodes(l, quiet=0):
407         flag = threading.Event()
408         
409         n = len(l)
410         
411         a = l[randrange(0,n)]
412         b = l[randrange(0,n)]
413         
414         def callback(nodes, flag=flag, id = b.node.id):
415                 if (len(nodes) >0) and (nodes[0].id == id):
416                         print "test_find_nodes  PASSED"
417                 else:
418                         print "test_find_nodes  FAILED"
419                 flag.set()
420         a.findNode(b.node.id, callback)
421         flag.wait()
422     
423 def test_find_value(l, quiet=0):
424         
425         fa = threading.Event()
426         fb = threading.Event()
427         fc = threading.Event()
428         
429         n = len(l)
430         a = l[randrange(0,n)]
431         b = l[randrange(0,n)]
432         c = l[randrange(0,n)]
433         d = l[randrange(0,n)]
434         
435         key = newID()
436         value = newID()
437         if not quiet: print "inserting value..."
438         a.storeValueForKey(key, value)
439         time.sleep(3)
440         if not quiet:
441                 print "finding..."
442         
443         class cb:
444                 def __init__(self, flag, value=value):
445                         self.flag = flag
446                         self.val = value
447                         self.found = 0
448                 def callback(self, values):
449                         try:
450                                 if(len(values) == 0):
451                                         if not self.found:
452                                                 print "find                NOT FOUND"
453                                         else:
454                                                 print "find                FOUND"
455                                 else:
456                                         if self.val in values:
457                                                 self.found = 1
458                         finally:
459                                 self.flag.set()
460         
461         b.valueForKey(key, cb(fa).callback)
462         fa.wait()
463         c.valueForKey(key, cb(fb).callback)
464         fb.wait()
465         d.valueForKey(key, cb(fc).callback)    
466         fc.wait()
467     
468 def test_one(host, port, db='/tmp/test'):
469         import thread
470         k = Khashmir(host, port, db)
471         thread.start_new_thread(k.app.run, ())
472         return k
473     
474 if __name__ == "__main__":
475     import sys
476     n = 8
477     if len(sys.argv) > 1: n = int(sys.argv[1])
478     l = test_build_net(peers=n)
479     time.sleep(3)
480     print "finding nodes..."
481     for i in range(10):
482                 test_find_nodes(l)
483     print "inserting and fetching values..."
484     for i in range(10):
485                 test_find_value(l)