]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
a738a97fad8048e2296aeee876a9d8a20bddfe24
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer, StoreValue
16 import krpc
17 import airhook
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.internet.app import Application
23 from twisted.web import server
24 threadable.init()
25 import sys
26
27 import sqlite  ## find this at http://pysqlite.sourceforge.net/
28 import pysqlite_exceptions
29
30 class KhashmirDBExcept(Exception):
31     pass
32
33 # this is the main class!
34 class Khashmir(protocol.Factory):
35     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
36     protocol = krpc.KRPC
37     def __init__(self, host, port, db='khashmir.db'):
38         self.setup(host, port, db)
39         
40     def setup(self, host, port, db='khashmir.db'):
41         self._findDB(db)
42         self.port = port
43         self.node = self._loadSelfNode(host, port)
44         self.table = KTable(self.node)
45         self.app = Application("krpc")
46         self.airhook = airhook.listenAirhookStream(port, self)
47         self.last = time.time()
48         self._loadRoutingTable()
49         KeyExpirer(store=self.store)
50         #self.refreshTable(force=1)
51         reactor.callLater(60, self.checkpoint, (1,))
52         
53     def _loadSelfNode(self, host, port):
54         c = self.store.cursor()
55         c.execute('select id from self where num = 0;')
56         if c.rowcount > 0:
57             id = c.fetchone()[0].decode('hex')
58         else:
59             id = newID()
60         return Node().init(id, host, port)
61         
62     def _saveSelfNode(self):
63         self.store.autocommit = 0
64         c = self.store.cursor()
65         c.execute('delete from self where num = 0;')
66         c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
67         self.store.commit()
68         self.store.autocommit = 1
69         
70     def checkpoint(self, auto=0):
71         self._saveSelfNode()
72         self._dumpRoutingTable()
73         if auto:
74             reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
75         
76     def _findDB(self, db):
77         import os
78         try:
79             os.stat(db)
80         except OSError:
81             self._createNewDB(db)
82         else:
83             self._loadDB(db)
84         
85     def _loadDB(self, db):
86         try:
87             self.store = sqlite.connect(db=db)
88             self.store.autocommit = 1
89         except:
90             import traceback
91             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
92         
93     def _createNewDB(self, db):
94         self.store = sqlite.connect(db=db)
95         self.store.autocommit = 1
96         s = """
97             create table kv (key text, value text, time timestamp, primary key (key, value));
98             create index kv_key on kv(key);
99             create index kv_timestamp on kv(time);
100             
101             create table nodes (id text primary key, host text, port number);
102             
103             create table self (num number primary key, id text);
104             """
105         c = self.store.cursor()
106         c.execute(s)
107
108     def _dumpRoutingTable(self):
109         """
110             save routing table nodes to the database
111         """
112         self.store.autocommit = 0;
113         c = self.store.cursor()
114         c.execute("delete from nodes where id not NULL;")
115         for bucket in self.table.buckets:
116             for node in bucket.l:
117                 d = node.senderDict()
118                 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
119         self.store.commit()
120         self.store.autocommit = 1;
121         
122     def _loadRoutingTable(self):
123         """
124             load routing table nodes from database
125             it's usually a good idea to call refreshTable(force=1) after loading the table
126         """
127         c = self.store.cursor()
128         c.execute("select * from nodes;")
129         for rec in c.fetchall():
130             n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
131             n.conn = self.airhook.connectionForAddr((n.host, n.port))
132             self.table.insertNode(n, contacted=0)
133             
134
135     #######
136     #######  LOCAL INTERFACE    - use these methods!
137     def addContact(self, host, port, callback=None):
138         """
139             ping this node and add the contact info to the table on pong!
140         """
141         n =Node().init(const.NULL_ID, host, port) 
142         n.conn = self.airhook.connectionForAddr((n.host, n.port))
143         self.sendPing(n, callback=callback)
144
145     ## this call is async!
146     def findNode(self, id, callback, errback=None):
147         """ returns the contact info for node, or the k closest nodes, from the global table """
148         # get K nodes out of local table/cache, or the node we want
149         nodes = self.table.findNodes(id)
150         d = Deferred()
151         if errback:
152             d.addCallbacks(callback, errback)
153         else:
154             d.addCallback(callback)
155         if len(nodes) == 1 and nodes[0].id == id :
156             d.callback(nodes)
157         else:
158             # create our search state
159             state = FindNode(self, id, d.callback)
160             reactor.callFromThread(state.goWithNodes, nodes)
161     
162     
163     ## also async
164     def valueForKey(self, key, callback, searchlocal = 1):
165         """ returns the values found for key in global table
166             callback will be called with a list of values for each peer that returns unique values
167             final callback will be an empty list - probably should change to 'more coming' arg
168         """
169         nodes = self.table.findNodes(key)
170         
171         # get locals
172         if searchlocal:
173             l = self.retrieveValues(key)
174             if len(l) > 0:
175                 reactor.callLater(0, callback, (l))
176         else:
177             l = []
178         
179         # create our search state
180         state = GetValue(self, key, callback)
181         reactor.callFromThread(state.goWithNodes, nodes, l)
182
183     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
184     def storeValueForKey(self, key, value, callback=None):
185         """ stores the value for key in the global table, returns immediately, no status 
186             in this implementation, peers respond but don't indicate status to storing values
187             a key can have many values
188         """
189         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
190             if not response:
191                 # default callback
192                 def _storedValueHandler(sender):
193                     pass
194                 response=_storedValueHandler
195             action = StoreValue(self.table, key, value, response)
196             reactor.callFromThread(action.goWithNodes, nodes)
197             
198         # this call is asynch
199         self.findNode(key, _storeValueForKey)
200         
201     
202     def insertNode(self, n, contacted=1):
203         """
204         insert a node in our local table, pinging oldest contact in bucket, if necessary
205         
206         If all you have is a host/port, then use addContact, which calls this method after
207         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
208         a node into the table without it's peer-ID.  That means of course the node passed into this
209         method needs to be a properly formed Node object with a valid ID.
210         """
211         old = self.table.insertNode(n, contacted=contacted)
212         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
213             # the bucket is full, check to see if old node is still around and if so, replace it
214             
215             ## these are the callbacks used when we ping the oldest node in a bucket
216             def _staleNodeHandler(oldnode=old, newnode = n):
217                 """ called if the pinged node never responds """
218                 self.table.replaceStaleNode(old, newnode)
219             
220             def _notStaleNodeHandler(dict, old=old):
221                 """ called when we get a pong from the old node """
222                 _krpc_sender = dict['_krpc_sender']
223                 dict = dict['rsp']
224                 sender = dict['sender']
225                 if sender['id'] == old.id:
226                     self.table.justSeenNode(old.id)
227             
228             df = old.ping(self.node.senderDict())
229             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
230
231     def sendPing(self, node, callback=None):
232         """
233             ping a node
234         """
235         df = node.ping(self.node.senderDict())
236         ## these are the callbacks we use when we issue a PING
237         def _pongHandler(dict, node=node, table=self.table, callback=callback):
238             _krpc_sender = dict['_krpc_sender']
239             dict = dict['rsp']
240             sender = dict['sender']
241             if node.id != const.NULL_ID and node.id != sender['id']:
242                 # whoah, got response from different peer than we were expecting
243                 self.table.invalidateNode(node)
244             else:
245                 sender['host'] = node.host
246                 sender['port'] = node.port
247                 n = Node().initWithDict(sender)
248                 n.conn = self.airhook.connectionForAddr((n.host, n.port))
249                 table.insertNode(n)
250                 if callback:
251                     callback()
252         def _defaultPong(err, node=node, table=self.table, callback=callback):
253             table.nodeFailed(node)
254             if callback:
255                 callback()
256         
257         df.addCallbacks(_pongHandler,_defaultPong)
258
259     def findCloseNodes(self, callback=lambda a: None):
260         """
261             This does a findNode on the ID one away from our own.  
262             This will allow us to populate our table with nodes on our network closest to our own.
263             This is called as soon as we start up with an empty table
264         """
265         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
266         self.findNode(id, callback)
267
268     def refreshTable(self, force=0):
269         """
270             force=1 will refresh table regardless of last bucket access time
271         """
272         def callback(nodes):
273             pass
274     
275         for bucket in self.table.buckets:
276             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
277                 id = newIDInRange(bucket.min, bucket.max)
278                 self.findNode(id, callback)
279
280
281     def retrieveValues(self, key):
282         s = "select value from kv where key = '%s';" % key.encode('hex')
283         c = self.store.cursor()
284         c.execute(s)
285         t = c.fetchone()
286         l = []
287         while t:
288             l.append(t['value'].decode('base64'))
289             t = c.fetchone()
290         return l
291     
292     #####
293     ##### INCOMING MESSAGE HANDLERS
294     
295     def krpc_ping(self, sender, _krpc_sender):
296         """
297             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
298             returns sender dict
299         """
300         sender['host'] = _krpc_sender[0]
301         sender['port'] = _krpc_sender[1]        
302         n = Node().initWithDict(sender)
303         n.conn = self.airhook.connectionForAddr((n.host, n.port))
304         self.insertNode(n, contacted=0)
305         return {"sender" : self.node.senderDict()}
306         
307     def krpc_find_node(self, target, sender, _krpc_sender):
308         nodes = self.table.findNodes(target)
309         nodes = map(lambda node: node.senderDict(), nodes)
310         sender['host'] = _krpc_sender[0]
311         sender['port'] = _krpc_sender[1]        
312         n = Node().initWithDict(sender)
313         n.conn = self.airhook.connectionForAddr((n.host, n.port))
314         self.insertNode(n, contacted=0)
315         return {"nodes" : nodes, "sender" : self.node.senderDict()}
316             
317     def krpc_store_value(self, key, value, sender, _krpc_sender):
318         t = "%0.6f" % time.time()
319         s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
320         c = self.store.cursor()
321         try:
322             c.execute(s)
323         except pysqlite_exceptions.IntegrityError, reason:
324             # update last insert time
325             s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
326             c.execute(s)
327         sender['host'] = _krpc_sender[0]
328         sender['port'] = _krpc_sender[1]        
329         n = Node().initWithDict(sender)
330         n.conn = self.airhook.connectionForAddr((n.host, n.port))
331         self.insertNode(n, contacted=0)
332         return {"sender" : self.node.senderDict()}
333     
334     def krpc_find_value(self, key, sender, _krpc_sender):
335         sender['host'] = _krpc_sender[0]
336         sender['port'] = _krpc_sender[1]        
337         n = Node().initWithDict(sender)
338         n.conn = self.airhook.connectionForAddr((n.host, n.port))
339         self.insertNode(n, contacted=0)
340     
341         l = self.retrieveValues(key)
342         if len(l) > 0:
343             return {'values' : l, "sender": self.node.senderDict()}
344         else:
345             nodes = self.table.findNodes(key)
346             nodes = map(lambda node: node.senderDict(), nodes)
347             return {'nodes' : nodes, "sender": self.node.senderDict()}
348
349 ### TESTING ###
350 from random import randrange
351 import threading, thread, sys, time
352 from sha import sha
353 from hash import newID
354
355
356 def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
357     import thread
358     l = []
359     for i in xrange(peers):
360         a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
361         l.append(a)
362     thread.start_new_thread(l[0].app.run, ())
363     for peer in l[1:]:
364         peer.app.run()  
365     return l
366     
367 def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
368     from whrandom import randrange
369     import threading
370     import thread
371     import sys
372     port = startport
373     l = []
374     if not quiet:
375         print "Building %s peer table." % peers
376     
377     for i in xrange(peers):
378         a = Khashmir(host, port + i, db = dbprefix +`i`)
379         l.append(a)
380     
381     
382     thread.start_new_thread(l[0].app.run, ())
383     time.sleep(1)
384     for peer in l[1:]:
385         peer.app.run()
386     #time.sleep(3)
387     
388     def spewer(frame, s, ignored):
389         from twisted.python import reflect
390         if frame.f_locals.has_key('self'):
391             se = frame.f_locals['self']
392             print 'method %s of %s at %s' % (
393                 frame.f_code.co_name, reflect.qual(se.__class__), id(se)
394                 )
395     #sys.settrace(spewer)
396
397     print "adding contacts...."
398     def makecb(flag):
399         def cb(f=flag):
400             f.set()
401         return cb
402
403     for peer in l:
404         p = l[randrange(0, len(l))]
405         if p != peer:
406             n = p.node
407             flag = threading.Event()
408             peer.addContact(host, n.port, makecb(flag))
409             flag.wait()
410         p = l[randrange(0, len(l))]
411         if p != peer:
412             n = p.node
413             flag = threading.Event()
414             peer.addContact(host, n.port, makecb(flag))
415             flag.wait()
416         p = l[randrange(0, len(l))]
417         if p != peer:
418             n = p.node
419             flag = threading.Event()
420             peer.addContact(host, n.port, makecb(flag))
421             flag.wait()
422     
423     print "finding close nodes...."
424     
425     for peer in l:
426         flag = threading.Event()
427         def cb(nodes, f=flag):
428             f.set()
429         peer.findCloseNodes(cb)
430         flag.wait()
431     #    for peer in l:
432     #   peer.refreshTable()
433     return l
434         
435 def test_find_nodes(l, quiet=0):
436     flag = threading.Event()
437     
438     n = len(l)
439     
440     a = l[randrange(0,n)]
441     b = l[randrange(0,n)]
442     
443     def callback(nodes, flag=flag, id = b.node.id):
444         if (len(nodes) >0) and (nodes[0].id == id):
445             print "test_find_nodes      PASSED"
446         else:
447             print "test_find_nodes      FAILED"
448         flag.set()
449     a.findNode(b.node.id, callback)
450     flag.wait()
451     
452 def test_find_value(l, quiet=0):
453     ff = threading.Event()
454     fa = threading.Event()
455     fb = threading.Event()
456     fc = threading.Event()
457     
458     n = len(l)
459     a = l[randrange(0,n)]
460     b = l[randrange(0,n)]
461     c = l[randrange(0,n)]
462     d = l[randrange(0,n)]
463     
464     key = newID()
465     value = newID()
466     if not quiet: print "inserting value..."
467     def acb(p, f=ff):
468         f.set()
469     a.storeValueForKey(key, value, acb)
470     ff.wait()
471     
472     if not quiet:
473         print "finding..."
474     
475     class cb:
476         def __init__(self, flag, value=value, port=None):
477             self.flag = flag
478             self.val = value
479             self.found = 0
480             self.port = port
481         def callback(self, values):
482             try:
483                 if(len(values) == 0):
484                     if not self.found:
485                         print "find   %s             NOT FOUND" % self.port
486                     else:
487                         print "find   %s           FOUND" % self.port
488                 else:
489                     if self.val in values:
490                         self.found = 1
491             finally:
492                 self.flag.set()
493     
494     b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
495     fa.wait()
496     c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
497     fb.wait()
498     d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
499     fc.wait()
500     
501 def test_one(host, port, db='/tmp/test'):
502     import thread
503     k = Khashmir(host, port, db)
504     thread.start_new_thread(reactor.run, ())
505     return k
506     
507 if __name__ == "__main__":
508     import sys
509     n = 8
510     if len(sys.argv) > 1: n = int(sys.argv[1])
511     l = test_build_net(peers=n)
512     time.sleep(3)
513     print "finding nodes..."
514     for i in range(n):
515         test_find_nodes(l)
516     print "inserting and fetching values..."
517     for i in range(10):
518         test_find_value(l)