]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
0c0ed52467896e88ba2e77d6dcc26793ea828ad3
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import KNode as Node
13
14 from hash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18 import airhook
19
20 from twisted.internet.defer import Deferred
21 from twisted.internet import protocol
22 from twisted.python import threadable
23 from twisted.internet.app import Application
24 from twisted.web import server
25 threadable.init()
26 import sys
27
28 import sqlite  ## find this at http://pysqlite.sourceforge.net/
29 import pysqlite_exceptions
30
31 class KhashmirDBExcept(Exception):
32     pass
33
34 # this is the main class!
35 class Khashmir(protocol.Factory):
36     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
37     protocol = krpc.KRPC
38     def __init__(self, host, port, db='khashmir.db'):
39         self.setup(host, port, db)
40         
41     def setup(self, host, port, db='khashmir.db'):
42         self._findDB(db)
43         self.port = port
44         self.node = self._loadSelfNode(host, port)
45         self.table = KTable(self.node)
46         self.app = Application("krpc")
47         self.airhook = airhook.listenAirhookStream(port, self)
48         self.last = time.time()
49         self._loadRoutingTable()
50         KeyExpirer(store=self.store)
51         #self.refreshTable(force=1)
52         reactor.callLater(60, self.checkpoint, (1,))
53         
54     def _loadSelfNode(self, host, port):
55         c = self.store.cursor()
56         c.execute('select id from self where num = 0;')
57         if c.rowcount > 0:
58             id = c.fetchone()[0].decode('hex')
59         else:
60             id = newID()
61         return Node().init(id, host, port)
62         
63     def _saveSelfNode(self):
64         self.store.autocommit = 0
65         c = self.store.cursor()
66         c.execute('delete from self where num = 0;')
67         c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
68         self.store.commit()
69         self.store.autocommit = 1
70         
71     def checkpoint(self, auto=0):
72         self._saveSelfNode()
73         self._dumpRoutingTable()
74         if auto:
75             reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
76         
77     def _findDB(self, db):
78         import os
79         try:
80             os.stat(db)
81         except OSError:
82             self._createNewDB(db)
83         else:
84             self._loadDB(db)
85         
86     def _loadDB(self, db):
87         try:
88             self.store = sqlite.connect(db=db)
89             self.store.autocommit = 1
90         except:
91             import traceback
92             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
93         
94     def _createNewDB(self, db):
95         self.store = sqlite.connect(db=db)
96         self.store.autocommit = 1
97         s = """
98             create table kv (key text, value text, time timestamp, primary key (key, value));
99             create index kv_key on kv(key);
100             create index kv_timestamp on kv(time);
101             
102             create table nodes (id text primary key, host text, port number);
103             
104             create table self (num number primary key, id text);
105             """
106         c = self.store.cursor()
107         c.execute(s)
108
109     def _dumpRoutingTable(self):
110         """
111             save routing table nodes to the database
112         """
113         self.store.autocommit = 0;
114         c = self.store.cursor()
115         c.execute("delete from nodes where id not NULL;")
116         for bucket in self.table.buckets:
117             for node in bucket.l:
118                 d = node.senderDict()
119                 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
120         self.store.commit()
121         self.store.autocommit = 1;
122         
123     def _loadRoutingTable(self):
124         """
125             load routing table nodes from database
126             it's usually a good idea to call refreshTable(force=1) after loading the table
127         """
128         c = self.store.cursor()
129         c.execute("select * from nodes;")
130         for rec in c.fetchall():
131             n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
132             n.conn = self.airhook.connectionForAddr((n.host, n.port))
133             self.table.insertNode(n, contacted=0)
134             
135
136     #######
137     #######  LOCAL INTERFACE    - use these methods!
138     def addContact(self, host, port, callback=None):
139         """
140             ping this node and add the contact info to the table on pong!
141         """
142         n =Node().init(const.NULL_ID, host, port) 
143         n.conn = self.airhook.connectionForAddr((n.host, n.port))
144         self.sendPing(n, callback=callback)
145
146     ## this call is async!
147     def findNode(self, id, callback, errback=None):
148         """ returns the contact info for node, or the k closest nodes, from the global table """
149         # get K nodes out of local table/cache, or the node we want
150         nodes = self.table.findNodes(id)
151         d = Deferred()
152         if errback:
153             d.addCallbacks(callback, errback)
154         else:
155             d.addCallback(callback)
156         if len(nodes) == 1 and nodes[0].id == id :
157             d.callback(nodes)
158         else:
159             # create our search state
160             state = FindNode(self, id, d.callback)
161             reactor.callFromThread(state.goWithNodes, nodes)
162     
163     
164     ## also async
165     def valueForKey(self, key, callback, searchlocal = 1):
166         """ returns the values found for key in global table
167             callback will be called with a list of values for each peer that returns unique values
168             final callback will be an empty list - probably should change to 'more coming' arg
169         """
170         nodes = self.table.findNodes(key)
171         
172         # get locals
173         if searchlocal:
174             l = self.retrieveValues(key)
175             if len(l) > 0:
176                 reactor.callLater(0, callback, (l))
177         else:
178             l = []
179         
180         # create our search state
181         state = GetValue(self, key, callback)
182         reactor.callFromThread(state.goWithNodes, nodes, l)
183
184     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
185     def storeValueForKey(self, key, value, callback=None):
186         """ stores the value for key in the global table, returns immediately, no status 
187             in this implementation, peers respond but don't indicate status to storing values
188             a key can have many values
189         """
190         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
191             if not response:
192                 # default callback
193                 def _storedValueHandler(sender):
194                     pass
195                 response=_storedValueHandler
196             action = StoreValue(self.table, key, value, response)
197             reactor.callFromThread(action.goWithNodes, nodes)
198             
199         # this call is asynch
200         self.findNode(key, _storeValueForKey)
201         
202     
203     def insertNode(self, n, contacted=1):
204         """
205         insert a node in our local table, pinging oldest contact in bucket, if necessary
206         
207         If all you have is a host/port, then use addContact, which calls this method after
208         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
209         a node into the table without it's peer-ID.  That means of course the node passed into this
210         method needs to be a properly formed Node object with a valid ID.
211         """
212         old = self.table.insertNode(n, contacted=contacted)
213         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
214             # the bucket is full, check to see if old node is still around and if so, replace it
215             
216             ## these are the callbacks used when we ping the oldest node in a bucket
217             def _staleNodeHandler(oldnode=old, newnode = n):
218                 """ called if the pinged node never responds """
219                 self.table.replaceStaleNode(old, newnode)
220             
221             def _notStaleNodeHandler(dict, old=old):
222                 """ called when we get a pong from the old node """
223                 _krpc_sender = dict['_krpc_sender']
224                 dict = dict['rsp']
225                 sender = dict['sender']
226                 if sender['id'] == old.id:
227                     self.table.justSeenNode(old.id)
228             
229             df = old.ping(self.node.senderDict())
230             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
231
232     def sendPing(self, node, callback=None):
233         """
234             ping a node
235         """
236         df = node.ping(self.node.senderDict())
237         ## these are the callbacks we use when we issue a PING
238         def _pongHandler(dict, node=node, table=self.table, callback=callback):
239             _krpc_sender = dict['_krpc_sender']
240             dict = dict['rsp']
241             sender = dict['sender']
242             if node.id != const.NULL_ID and node.id != sender['id']:
243                 # whoah, got response from different peer than we were expecting
244                 self.table.invalidateNode(node)
245             else:
246                 sender['host'] = node.host
247                 sender['port'] = node.port
248                 n = Node().initWithDict(sender)
249                 n.conn = self.airhook.connectionForAddr((n.host, n.port))
250                 table.insertNode(n)
251                 if callback:
252                     callback()
253         def _defaultPong(err, node=node, table=self.table, callback=callback):
254             table.nodeFailed(node)
255             if callback:
256                 callback()
257         
258         df.addCallbacks(_pongHandler,_defaultPong)
259
260     def findCloseNodes(self, callback=lambda a: None):
261         """
262             This does a findNode on the ID one away from our own.  
263             This will allow us to populate our table with nodes on our network closest to our own.
264             This is called as soon as we start up with an empty table
265         """
266         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
267         self.findNode(id, callback)
268
269     def refreshTable(self, force=0):
270         """
271             force=1 will refresh table regardless of last bucket access time
272         """
273         def callback(nodes):
274             pass
275     
276         for bucket in self.table.buckets:
277             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
278                 id = newIDInRange(bucket.min, bucket.max)
279                 self.findNode(id, callback)
280
281
282     def retrieveValues(self, key):
283         s = "select value from kv where key = '%s';" % key.encode('hex')
284         c = self.store.cursor()
285         c.execute(s)
286         t = c.fetchone()
287         l = []
288         while t:
289             l.append(t['value'].decode('base64'))
290             t = c.fetchone()
291         return l
292     
293     #####
294     ##### INCOMING MESSAGE HANDLERS
295     
296     def krpc_ping(self, sender, _krpc_sender):
297         """
298             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
299             returns sender dict
300         """
301         sender['host'] = _krpc_sender[0]
302         sender['port'] = _krpc_sender[1]        
303         n = Node().initWithDict(sender)
304         n.conn = self.airhook.connectionForAddr((n.host, n.port))
305         self.insertNode(n, contacted=0)
306         return {"sender" : self.node.senderDict()}
307         
308     def krpc_find_node(self, target, sender, _krpc_sender):
309         nodes = self.table.findNodes(target)
310         nodes = map(lambda node: node.senderDict(), nodes)
311         sender['host'] = _krpc_sender[0]
312         sender['port'] = _krpc_sender[1]        
313         n = Node().initWithDict(sender)
314         n.conn = self.airhook.connectionForAddr((n.host, n.port))
315         self.insertNode(n, contacted=0)
316         return {"nodes" : nodes, "sender" : self.node.senderDict()}
317             
318     def krpc_store_value(self, key, value, sender, _krpc_sender):
319         t = "%0.6f" % time.time()
320         s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
321         c = self.store.cursor()
322         try:
323             c.execute(s)
324         except pysqlite_exceptions.IntegrityError, reason:
325             # update last insert time
326             s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
327             c.execute(s)
328         sender['host'] = _krpc_sender[0]
329         sender['port'] = _krpc_sender[1]        
330         n = Node().initWithDict(sender)
331         n.conn = self.airhook.connectionForAddr((n.host, n.port))
332         self.insertNode(n, contacted=0)
333         return {"sender" : self.node.senderDict()}
334     
335     def krpc_find_value(self, key, sender, _krpc_sender):
336         sender['host'] = _krpc_sender[0]
337         sender['port'] = _krpc_sender[1]        
338         n = Node().initWithDict(sender)
339         n.conn = self.airhook.connectionForAddr((n.host, n.port))
340         self.insertNode(n, contacted=0)
341     
342         l = self.retrieveValues(key)
343         if len(l) > 0:
344             return {'values' : l, "sender": self.node.senderDict()}
345         else:
346             nodes = self.table.findNodes(key)
347             nodes = map(lambda node: node.senderDict(), nodes)
348             return {'nodes' : nodes, "sender": self.node.senderDict()}
349
350 ### TESTING ###
351 from random import randrange
352 import threading, thread, sys, time
353 from sha import sha
354 from hash import newID
355
356
357 def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
358     import thread
359     l = []
360     for i in xrange(peers):
361         a = Khashmir(host, startport + i, db = dbprefix+`i`)
362         l.append(a)
363     thread.start_new_thread(l[0].app.run, ())
364     for peer in l[1:]:
365         peer.app.run(installSignalHandlers=0)   
366     return l
367     
368 def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
369     from whrandom import randrange
370     import threading
371     import thread
372     import sys
373     port = startport
374     l = []
375     if not quiet:
376         print "Building %s peer table." % peers
377     
378     for i in xrange(peers):
379         a = Khashmir(host, port + i, db = dbprefix +`i`)
380         l.append(a)
381     
382     
383     thread.start_new_thread(l[0].app.run, ())
384     time.sleep(1)
385     for peer in l[1:]:
386         peer.app.run(installSignalHandlers=0)
387     #time.sleep(3)
388     
389     def spewer(frame, s, ignored):
390         from twisted.python import reflect
391         if frame.f_locals.has_key('self'):
392             se = frame.f_locals['self']
393             print 'method %s of %s at %s' % (
394                 frame.f_code.co_name, reflect.qual(se.__class__), id(se)
395                 )
396     #sys.settrace(spewer)
397
398     print "adding contacts...."
399     def makecb(flag):
400         def cb(f=flag):
401             f.set()
402         return cb
403
404     for peer in l:
405         p = l[randrange(0, len(l))]
406         if p != peer:
407             n = p.node
408             flag = threading.Event()
409             peer.addContact(host, n.port, makecb(flag))
410             flag.wait()
411         p = l[randrange(0, len(l))]
412         if p != peer:
413             n = p.node
414             flag = threading.Event()
415             peer.addContact(host, n.port, makecb(flag))
416             flag.wait()
417         p = l[randrange(0, len(l))]
418         if p != peer:
419             n = p.node
420             flag = threading.Event()
421             peer.addContact(host, n.port, makecb(flag))
422             flag.wait()
423     
424     print "finding close nodes...."
425     
426     for peer in l:
427         flag = threading.Event()
428         def cb(nodes, f=flag):
429             f.set()
430         peer.findCloseNodes(cb)
431         flag.wait()
432     #    for peer in l:
433     #   peer.refreshTable()
434     return l
435         
436 def test_find_nodes(l, quiet=0):
437     flag = threading.Event()
438     
439     n = len(l)
440     
441     a = l[randrange(0,n)]
442     b = l[randrange(0,n)]
443     
444     def callback(nodes, flag=flag, id = b.node.id):
445         if (len(nodes) >0) and (nodes[0].id == id):
446             print "test_find_nodes      PASSED"
447         else:
448             print "test_find_nodes      FAILED"
449         flag.set()
450     a.findNode(b.node.id, callback)
451     flag.wait()
452     
453 def test_find_value(l, quiet=0):
454     ff = threading.Event()
455     fa = threading.Event()
456     fb = threading.Event()
457     fc = threading.Event()
458     
459     n = len(l)
460     a = l[randrange(0,n)]
461     b = l[randrange(0,n)]
462     c = l[randrange(0,n)]
463     d = l[randrange(0,n)]
464     
465     key = newID()
466     value = newID()
467     if not quiet: print "inserting value..."
468     def acb(p, f=ff):
469         f.set()
470     a.storeValueForKey(key, value, acb)
471     ff.wait()
472     
473     if not quiet:
474         print "finding..."
475     
476     class cb:
477         def __init__(self, flag, value=value, port=None):
478             self.flag = flag
479             self.val = value
480             self.found = 0
481             self.port = port
482         def callback(self, values):
483                 if(len(values) == 0):
484                     if not self.found:
485                         print "find   %s           NOT FOUND" % self.port
486                     else:
487                         print "find   %s           FOUND" % self.port
488                     self.flag.set()
489                 else:
490                     if self.val in values:
491                         self.found = 1
492     
493     b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
494     fa.wait()
495     c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
496     fb.wait()
497     d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
498     fc.wait()
499     
500 def test_one(host, port, db='/tmp/test'):
501     import thread
502     k = Khashmir(host, port, db)
503     thread.start_new_thread(reactor.run, ())
504     return k
505     
506 if __name__ == "__main__":
507     import sys
508     n = 8
509     if len(sys.argv) > 1: n = int(sys.argv[1])
510     l = test_build_net(peers=n)
511     time.sleep(3)
512     print "finding nodes..."
513     for i in range(n):
514         test_find_nodes(l)
515     print "inserting and fetching values..."
516     for i in range(10):
517         test_find_value(l)