]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
Add timeouts to some unittests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from time import time
8 from random import randrange
9 from sha import sha
10 import os
11 import sqlite  ## find this at http://pysqlite.sourceforge.net/
12
13 from twisted.internet.defer import Deferred
14 from twisted.internet import protocol, reactor
15 from twisted.trial import unittest
16
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 class KhashmirDBExcept(Exception):
24     pass
25
26 # this is the base class, has base functionality and find node, no key-value mappings
27 class KhashmirBase(protocol.Factory):
28     _Node = KNodeBase
29     def __init__(self, config, cache_dir='/tmp'):
30         self.config = None
31         self.setup(config, cache_dir)
32         
33     def setup(self, config, cache_dir):
34         self.config = config
35         self.port = config['PORT']
36         self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
37         self.node = self._loadSelfNode('', self.port)
38         self.table = KTable(self.node, config)
39         #self.app = service.Application("krpc")
40         self.udp = krpc.hostbroker(self)
41         self.udp.protocol = krpc.KRPC
42         self.listenport = reactor.listenUDP(self.port, self.udp)
43         self.last = time()
44         self._loadRoutingTable()
45         self.expirer = KeyExpirer(self.store, config)
46         self.refreshTable(force=1)
47         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
48
49     def Node(self):
50         n = self._Node()
51         n.table = self.table
52         return n
53     
54     def __del__(self):
55         self.listenport.stopListening()
56         
57     def _loadSelfNode(self, host, port):
58         c = self.store.cursor()
59         c.execute('select id from self where num = 0;')
60         if c.rowcount > 0:
61             id = c.fetchone()[0]
62         else:
63             id = newID()
64         return self._Node().init(id, host, port)
65         
66     def _saveSelfNode(self):
67         c = self.store.cursor()
68         c.execute('delete from self where num = 0;')
69         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
70         self.store.commit()
71         
72     def checkpoint(self, auto=0):
73         self._saveSelfNode()
74         self._dumpRoutingTable()
75         self.refreshTable()
76         if auto:
77             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
78                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
79                               self.checkpoint, (1,))
80         
81     def _findDB(self, db):
82         self.db = db
83         try:
84             os.stat(db)
85         except OSError:
86             self._createNewDB(db)
87         else:
88             self._loadDB(db)
89         
90     def _loadDB(self, db):
91         try:
92             self.store = sqlite.connect(db=db)
93             #self.store.autocommit = 0
94         except:
95             import traceback
96             raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
97         
98     def _createNewDB(self, db):
99         self.store = sqlite.connect(db=db)
100         s = """
101             create table kv (key binary, value binary, time timestamp, primary key (key, value));
102             create index kv_key on kv(key);
103             create index kv_timestamp on kv(time);
104             
105             create table nodes (id binary primary key, host text, port number);
106             
107             create table self (num number primary key, id binary);
108             """
109         c = self.store.cursor()
110         c.execute(s)
111         self.store.commit()
112
113     def _dumpRoutingTable(self):
114         """
115             save routing table nodes to the database
116         """
117         c = self.store.cursor()
118         c.execute("delete from nodes where id not NULL;")
119         for bucket in self.table.buckets:
120             for node in bucket.l:
121                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
122         self.store.commit()
123         
124     def _loadRoutingTable(self):
125         """
126             load routing table nodes from database
127             it's usually a good idea to call refreshTable(force=1) after loading the table
128         """
129         c = self.store.cursor()
130         c.execute("select * from nodes;")
131         for rec in c.fetchall():
132             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
133             n.conn = self.udp.connectionForAddr((n.host, n.port))
134             self.table.insertNode(n, contacted=0)
135             
136
137     #######
138     #######  LOCAL INTERFACE    - use these methods!
139     def addContact(self, host, port, callback=None):
140         """
141             ping this node and add the contact info to the table on pong!
142         """
143         n =self.Node().init(NULL_ID, host, port) 
144         n.conn = self.udp.connectionForAddr((n.host, n.port))
145         self.sendPing(n, callback=callback)
146
147     ## this call is async!
148     def findNode(self, id, callback, errback=None):
149         """ returns the contact info for node, or the k closest nodes, from the global table """
150         # get K nodes out of local table/cache, or the node we want
151         nodes = self.table.findNodes(id)
152         d = Deferred()
153         if errback:
154             d.addCallbacks(callback, errback)
155         else:
156             d.addCallback(callback)
157         if len(nodes) == 1 and nodes[0].id == id :
158             d.callback(nodes)
159         else:
160             # create our search state
161             state = FindNode(self, id, d.callback, self.config)
162             reactor.callLater(0, state.goWithNodes, nodes)
163     
164     def insertNode(self, n, contacted=1):
165         """
166         insert a node in our local table, pinging oldest contact in bucket, if necessary
167         
168         If all you have is a host/port, then use addContact, which calls this method after
169         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
170         a node into the table without it's peer-ID.  That means of course the node passed into this
171         method needs to be a properly formed Node object with a valid ID.
172         """
173         old = self.table.insertNode(n, contacted=contacted)
174         if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
175             # the bucket is full, check to see if old node is still around and if so, replace it
176             
177             ## these are the callbacks used when we ping the oldest node in a bucket
178             def _staleNodeHandler(oldnode=old, newnode = n):
179                 """ called if the pinged node never responds """
180                 self.table.replaceStaleNode(old, newnode)
181             
182             def _notStaleNodeHandler(dict, old=old):
183                 """ called when we get a pong from the old node """
184                 dict = dict['rsp']
185                 if dict['id'] == old.id:
186                     self.table.justSeenNode(old.id)
187             
188             df = old.ping(self.node.id)
189             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
190
191     def sendPing(self, node, callback=None):
192         """
193             ping a node
194         """
195         df = node.ping(self.node.id)
196         ## these are the callbacks we use when we issue a PING
197         def _pongHandler(dict, node=node, table=self.table, callback=callback):
198             _krpc_sender = dict['_krpc_sender']
199             dict = dict['rsp']
200             sender = {'id' : dict['id']}
201             sender['host'] = _krpc_sender[0]
202             sender['port'] = _krpc_sender[1]
203             n = self.Node().initWithDict(sender)
204             n.conn = self.udp.connectionForAddr((n.host, n.port))
205             table.insertNode(n)
206             if callback:
207                 callback()
208         def _defaultPong(err, node=node, table=self.table, callback=callback):
209             table.nodeFailed(node)
210             if callback:
211                 callback()
212         
213         df.addCallbacks(_pongHandler,_defaultPong)
214
215     def findCloseNodes(self, callback=lambda a: None):
216         """
217             This does a findNode on the ID one away from our own.  
218             This will allow us to populate our table with nodes on our network closest to our own.
219             This is called as soon as we start up with an empty table
220         """
221         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
222         self.findNode(id, callback)
223
224     def refreshTable(self, force=0):
225         """
226             force=1 will refresh table regardless of last bucket access time
227         """
228         def callback(nodes):
229             pass
230     
231         for bucket in self.table.buckets:
232             if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
233                 id = newIDInRange(bucket.min, bucket.max)
234                 self.findNode(id, callback)
235
236     def stats(self):
237         """
238         Returns (num_contacts, num_nodes)
239         num_contacts: number contacts in our routing table
240         num_nodes: number of nodes estimated in the entire dht
241         """
242         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
243         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
244         return (num_contacts, num_nodes)
245
246     def krpc_ping(self, id, _krpc_sender):
247         sender = {'id' : id}
248         sender['host'] = _krpc_sender[0]
249         sender['port'] = _krpc_sender[1]        
250         n = self.Node().initWithDict(sender)
251         n.conn = self.udp.connectionForAddr((n.host, n.port))
252         self.insertNode(n, contacted=0)
253         return {"id" : self.node.id}
254         
255     def krpc_find_node(self, target, id, _krpc_sender):
256         nodes = self.table.findNodes(target)
257         nodes = map(lambda node: node.senderDict(), nodes)
258         sender = {'id' : id}
259         sender['host'] = _krpc_sender[0]
260         sender['port'] = _krpc_sender[1]        
261         n = self.Node().initWithDict(sender)
262         n.conn = self.udp.connectionForAddr((n.host, n.port))
263         self.insertNode(n, contacted=0)
264         return {"nodes" : nodes, "id" : self.node.id}
265
266
267 ## This class provides read-only access to the DHT, valueForKey
268 ## you probably want to use this mixin and provide your own write methods
269 class KhashmirRead(KhashmirBase):
270     _Node = KNodeRead
271     def retrieveValues(self, key):
272         c = self.store.cursor()
273         c.execute("select value from kv where key = %s;", sqlite.encode(key))
274         t = c.fetchone()
275         l = []
276         while t:
277             l.append(t['value'])
278             t = c.fetchone()
279         return l
280     ## also async
281     def valueForKey(self, key, callback, searchlocal = 1):
282         """ returns the values found for key in global table
283             callback will be called with a list of values for each peer that returns unique values
284             final callback will be an empty list - probably should change to 'more coming' arg
285         """
286         nodes = self.table.findNodes(key)
287         
288         # get locals
289         if searchlocal:
290             l = self.retrieveValues(key)
291             if len(l) > 0:
292                 reactor.callLater(0, callback, (l))
293         else:
294             l = []
295         
296         # create our search state
297         state = GetValue(self, key, callback, self.config)
298         reactor.callLater(0, state.goWithNodes, nodes, l)
299
300     def krpc_find_value(self, key, id, _krpc_sender):
301         sender = {'id' : id}
302         sender['host'] = _krpc_sender[0]
303         sender['port'] = _krpc_sender[1]        
304         n = self.Node().initWithDict(sender)
305         n.conn = self.udp.connectionForAddr((n.host, n.port))
306         self.insertNode(n, contacted=0)
307     
308         l = self.retrieveValues(key)
309         if len(l) > 0:
310             return {'values' : l, "id": self.node.id}
311         else:
312             nodes = self.table.findNodes(key)
313             nodes = map(lambda node: node.senderDict(), nodes)
314             return {'nodes' : nodes, "id": self.node.id}
315
316 ###  provides a generic write method, you probably don't want to deploy something that allows
317 ###  arbitrary value storage
318 class KhashmirWrite(KhashmirRead):
319     _Node = KNodeWrite
320     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
321     def storeValueForKey(self, key, value, callback=None):
322         """ stores the value for key in the global table, returns immediately, no status 
323             in this implementation, peers respond but don't indicate status to storing values
324             a key can have many values
325         """
326         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
327             if not response:
328                 # default callback
329                 def _storedValueHandler(sender):
330                     pass
331                 response=_storedValueHandler
332             action = StoreValue(self.table, key, value, response, self.config)
333             reactor.callLater(0, action.goWithNodes, nodes)
334             
335         # this call is asynch
336         self.findNode(key, _storeValueForKey)
337                     
338     def krpc_store_value(self, key, value, id, _krpc_sender):
339         t = "%0.6f" % time()
340         c = self.store.cursor()
341         try:
342             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
343         except sqlite.IntegrityError, reason:
344             # update last insert time
345             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
346         self.store.commit()
347         sender = {'id' : id}
348         sender['host'] = _krpc_sender[0]
349         sender['port'] = _krpc_sender[1]        
350         n = self.Node().initWithDict(sender)
351         n.conn = self.udp.connectionForAddr((n.host, n.port))
352         self.insertNode(n, contacted=0)
353         return {"id" : self.node.id}
354
355 # the whole shebang, for testing
356 class Khashmir(KhashmirWrite):
357     _Node = KNodeWrite
358
359 class SimpleTests(unittest.TestCase):
360     
361     timeout = 10
362     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
363                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
364                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
365                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
366                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
367                     'KE_AGE': 3600, }
368
369     def setUp(self):
370         krpc.KRPC.noisy = 0
371         d = self.DHT_DEFAULTS.copy()
372         d['PORT'] = 4044
373         self.a = Khashmir(d)
374         d = self.DHT_DEFAULTS.copy()
375         d['PORT'] = 4045
376         self.b = Khashmir(d)
377         
378     def tearDown(self):
379         self.a.listenport.stopListening()
380         self.b.listenport.stopListening()
381         try:
382             self.a.next_checkpoint.cancel()
383         except:
384             pass
385         try:
386             self.b.next_checkpoint.cancel()
387         except:
388             pass
389         try:
390             self.a.expirer.next_expire.cancel()
391         except:
392             pass
393         try:
394             self.b.expirer.next_expire.cancel()
395         except:
396             pass
397         self.a.store.close()
398         self.b.store.close()
399         os.unlink(self.a.db)
400         os.unlink(self.b.db)
401
402     def testAddContact(self):
403         self.assertEqual(len(self.a.table.buckets), 1)
404         self.assertEqual(len(self.a.table.buckets[0].l), 0)
405
406         self.assertEqual(len(self.b.table.buckets), 1)
407         self.assertEqual(len(self.b.table.buckets[0].l), 0)
408
409         self.a.addContact('127.0.0.1', 4045)
410         reactor.iterate()
411         reactor.iterate()
412         reactor.iterate()
413         reactor.iterate()
414
415         self.assertEqual(len(self.a.table.buckets), 1)
416         self.assertEqual(len(self.a.table.buckets[0].l), 1)
417         self.assertEqual(len(self.b.table.buckets), 1)
418         self.assertEqual(len(self.b.table.buckets[0].l), 1)
419
420     def testStoreRetrieve(self):
421         self.a.addContact('127.0.0.1', 4045)
422         reactor.iterate()
423         reactor.iterate()
424         reactor.iterate()
425         reactor.iterate()
426         self.got = 0
427         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
428         reactor.iterate()
429         reactor.iterate()
430         reactor.iterate()
431         reactor.iterate()
432         reactor.iterate()
433         reactor.iterate()
434         self.a.valueForKey(sha('foo').digest(), self._cb)
435         reactor.iterate()
436         reactor.iterate()
437         reactor.iterate()
438         reactor.iterate()
439         reactor.iterate()
440         reactor.iterate()
441         reactor.iterate()
442
443     def _cb(self, val):
444         if not val:
445             self.assertEqual(self.got, 1)
446         elif 'foobar' in val:
447             self.got = 1
448
449
450 class MultiTest(unittest.TestCase):
451     
452     timeout = 30
453     num = 20
454     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
455                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
456                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
457                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
458                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
459                     'KE_AGE': 3600, }
460
461     def _done(self, val):
462         self.done = 1
463         
464     def setUp(self):
465         self.l = []
466         self.startport = 4088
467         for i in range(self.num):
468             d = self.DHT_DEFAULTS.copy()
469             d['PORT'] = self.startport + i
470             self.l.append(Khashmir(d))
471         reactor.iterate()
472         reactor.iterate()
473         
474         for i in self.l:
475             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
476             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
477             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
478             reactor.iterate()
479             reactor.iterate()
480             reactor.iterate() 
481             
482         for i in self.l:
483             self.done = 0
484             i.findCloseNodes(self._done)
485             while not self.done:
486                 reactor.iterate()
487         for i in self.l:
488             self.done = 0
489             i.findCloseNodes(self._done)
490             while not self.done:
491                 reactor.iterate()
492
493     def tearDown(self):
494         for i in self.l:
495             i.listenport.stopListening()
496             try:
497                 i.next_checkpoint.cancel()
498             except:
499                 pass
500             try:
501                 i.expirer.next_expire.cancel()
502             except:
503                 pass
504             i.store.close()
505             os.unlink(i.db)
506             
507         reactor.iterate()
508         
509     def testStoreRetrieve(self):
510         for i in range(10):
511             K = newID()
512             V = newID()
513             
514             for a in range(3):
515                 self.done = 0
516                 def _scb(val):
517                     self.done = 1
518                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
519                 while not self.done:
520                     reactor.iterate()
521
522
523                 def _rcb(val):
524                     if not val:
525                         self.done = 1
526                         self.assertEqual(self.got, 1)
527                     elif V in val:
528                         self.got = 1
529                 for x in range(3):
530                     self.got = 0
531                     self.done = 0
532                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
533                     while not self.done:
534                         reactor.iterate()