]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
Disable the deprecation warning for the khashmir tests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from time import time
8 from random import randrange
9 from sha import sha
10 import os
11 import sqlite  ## find this at http://pysqlite.sourceforge.net/
12
13 from twisted.internet.defer import Deferred
14 from twisted.internet import protocol, reactor
15 from twisted.trial import unittest
16
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 class KhashmirDBExcept(Exception):
24     pass
25
26 # this is the base class, has base functionality and find node, no key-value mappings
27 class KhashmirBase(protocol.Factory):
28     _Node = KNodeBase
29     def __init__(self, config, cache_dir='/tmp'):
30         self.config = None
31         self.setup(config, cache_dir)
32         
33     def setup(self, config, cache_dir):
34         self.config = config
35         self.port = config['PORT']
36         self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
37         self.node = self._loadSelfNode('', self.port)
38         self.table = KTable(self.node, config)
39         #self.app = service.Application("krpc")
40         self.udp = krpc.hostbroker(self)
41         self.udp.protocol = krpc.KRPC
42         self.listenport = reactor.listenUDP(self.port, self.udp)
43         self.last = time()
44         self._loadRoutingTable()
45         self.expirer = KeyExpirer(self.store, config)
46         self.refreshTable(force=1)
47         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
48
49     def Node(self):
50         n = self._Node()
51         n.table = self.table
52         return n
53     
54     def __del__(self):
55         self.listenport.stopListening()
56         
57     def _loadSelfNode(self, host, port):
58         c = self.store.cursor()
59         c.execute('select id from self where num = 0;')
60         if c.rowcount > 0:
61             id = c.fetchone()[0]
62         else:
63             id = newID()
64         return self._Node().init(id, host, port)
65         
66     def _saveSelfNode(self):
67         c = self.store.cursor()
68         c.execute('delete from self where num = 0;')
69         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
70         self.store.commit()
71         
72     def checkpoint(self, auto=0):
73         self._saveSelfNode()
74         self._dumpRoutingTable()
75         self.refreshTable()
76         if auto:
77             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
78                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
79                               self.checkpoint, (1,))
80         
81     def _findDB(self, db):
82         self.db = db
83         try:
84             os.stat(db)
85         except OSError:
86             self._createNewDB(db)
87         else:
88             self._loadDB(db)
89         
90     def _loadDB(self, db):
91         try:
92             self.store = sqlite.connect(db=db)
93             #self.store.autocommit = 0
94         except:
95             import traceback
96             raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
97         
98     def _createNewDB(self, db):
99         self.store = sqlite.connect(db=db)
100         s = """
101             create table kv (key binary, value binary, time timestamp, primary key (key, value));
102             create index kv_key on kv(key);
103             create index kv_timestamp on kv(time);
104             
105             create table nodes (id binary primary key, host text, port number);
106             
107             create table self (num number primary key, id binary);
108             """
109         c = self.store.cursor()
110         c.execute(s)
111         self.store.commit()
112
113     def _dumpRoutingTable(self):
114         """
115             save routing table nodes to the database
116         """
117         c = self.store.cursor()
118         c.execute("delete from nodes where id not NULL;")
119         for bucket in self.table.buckets:
120             for node in bucket.l:
121                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
122         self.store.commit()
123         
124     def _loadRoutingTable(self):
125         """
126             load routing table nodes from database
127             it's usually a good idea to call refreshTable(force=1) after loading the table
128         """
129         c = self.store.cursor()
130         c.execute("select * from nodes;")
131         for rec in c.fetchall():
132             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
133             n.conn = self.udp.connectionForAddr((n.host, n.port))
134             self.table.insertNode(n, contacted=0)
135             
136
137     #######
138     #######  LOCAL INTERFACE    - use these methods!
139     def addContact(self, host, port, callback=None):
140         """
141             ping this node and add the contact info to the table on pong!
142         """
143         n =self.Node().init(NULL_ID, host, port) 
144         n.conn = self.udp.connectionForAddr((n.host, n.port))
145         self.sendPing(n, callback=callback)
146
147     ## this call is async!
148     def findNode(self, id, callback, errback=None):
149         """ returns the contact info for node, or the k closest nodes, from the global table """
150         # get K nodes out of local table/cache, or the node we want
151         nodes = self.table.findNodes(id)
152         d = Deferred()
153         if errback:
154             d.addCallbacks(callback, errback)
155         else:
156             d.addCallback(callback)
157         if len(nodes) == 1 and nodes[0].id == id :
158             d.callback(nodes)
159         else:
160             # create our search state
161             state = FindNode(self, id, d.callback, self.config)
162             reactor.callLater(0, state.goWithNodes, nodes)
163     
164     def insertNode(self, n, contacted=1):
165         """
166         insert a node in our local table, pinging oldest contact in bucket, if necessary
167         
168         If all you have is a host/port, then use addContact, which calls this method after
169         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
170         a node into the table without it's peer-ID.  That means of course the node passed into this
171         method needs to be a properly formed Node object with a valid ID.
172         """
173         old = self.table.insertNode(n, contacted=contacted)
174         if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
175             # the bucket is full, check to see if old node is still around and if so, replace it
176             
177             ## these are the callbacks used when we ping the oldest node in a bucket
178             def _staleNodeHandler(oldnode=old, newnode = n):
179                 """ called if the pinged node never responds """
180                 self.table.replaceStaleNode(old, newnode)
181             
182             def _notStaleNodeHandler(dict, old=old):
183                 """ called when we get a pong from the old node """
184                 dict = dict['rsp']
185                 if dict['id'] == old.id:
186                     self.table.justSeenNode(old.id)
187             
188             df = old.ping(self.node.id)
189             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
190
191     def sendPing(self, node, callback=None):
192         """
193             ping a node
194         """
195         df = node.ping(self.node.id)
196         ## these are the callbacks we use when we issue a PING
197         def _pongHandler(dict, node=node, table=self.table, callback=callback):
198             _krpc_sender = dict['_krpc_sender']
199             dict = dict['rsp']
200             sender = {'id' : dict['id']}
201             sender['host'] = _krpc_sender[0]
202             sender['port'] = _krpc_sender[1]
203             n = self.Node().initWithDict(sender)
204             n.conn = self.udp.connectionForAddr((n.host, n.port))
205             table.insertNode(n)
206             if callback:
207                 callback()
208         def _defaultPong(err, node=node, table=self.table, callback=callback):
209             table.nodeFailed(node)
210             if callback:
211                 callback()
212         
213         df.addCallbacks(_pongHandler,_defaultPong)
214
215     def findCloseNodes(self, callback=lambda a: None):
216         """
217             This does a findNode on the ID one away from our own.  
218             This will allow us to populate our table with nodes on our network closest to our own.
219             This is called as soon as we start up with an empty table
220         """
221         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
222         self.findNode(id, callback)
223
224     def refreshTable(self, force=0):
225         """
226             force=1 will refresh table regardless of last bucket access time
227         """
228         def callback(nodes):
229             pass
230     
231         for bucket in self.table.buckets:
232             if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
233                 id = newIDInRange(bucket.min, bucket.max)
234                 self.findNode(id, callback)
235
236     def stats(self):
237         """
238         Returns (num_contacts, num_nodes)
239         num_contacts: number contacts in our routing table
240         num_nodes: number of nodes estimated in the entire dht
241         """
242         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
243         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
244         return (num_contacts, num_nodes)
245
246     def krpc_ping(self, id, _krpc_sender):
247         sender = {'id' : id}
248         sender['host'] = _krpc_sender[0]
249         sender['port'] = _krpc_sender[1]        
250         n = self.Node().initWithDict(sender)
251         n.conn = self.udp.connectionForAddr((n.host, n.port))
252         self.insertNode(n, contacted=0)
253         return {"id" : self.node.id}
254         
255     def krpc_find_node(self, target, id, _krpc_sender):
256         nodes = self.table.findNodes(target)
257         nodes = map(lambda node: node.senderDict(), nodes)
258         sender = {'id' : id}
259         sender['host'] = _krpc_sender[0]
260         sender['port'] = _krpc_sender[1]        
261         n = self.Node().initWithDict(sender)
262         n.conn = self.udp.connectionForAddr((n.host, n.port))
263         self.insertNode(n, contacted=0)
264         return {"nodes" : nodes, "id" : self.node.id}
265
266
267 ## This class provides read-only access to the DHT, valueForKey
268 ## you probably want to use this mixin and provide your own write methods
269 class KhashmirRead(KhashmirBase):
270     _Node = KNodeRead
271     def retrieveValues(self, key):
272         c = self.store.cursor()
273         c.execute("select value from kv where key = %s;", sqlite.encode(key))
274         t = c.fetchone()
275         l = []
276         while t:
277             l.append(t['value'])
278             t = c.fetchone()
279         return l
280     ## also async
281     def valueForKey(self, key, callback, searchlocal = 1):
282         """ returns the values found for key in global table
283             callback will be called with a list of values for each peer that returns unique values
284             final callback will be an empty list - probably should change to 'more coming' arg
285         """
286         nodes = self.table.findNodes(key)
287         
288         # get locals
289         if searchlocal:
290             l = self.retrieveValues(key)
291             if len(l) > 0:
292                 reactor.callLater(0, callback, (l))
293         else:
294             l = []
295         
296         # create our search state
297         state = GetValue(self, key, callback, self.config)
298         reactor.callLater(0, state.goWithNodes, nodes, l)
299
300     def krpc_find_value(self, key, id, _krpc_sender):
301         sender = {'id' : id}
302         sender['host'] = _krpc_sender[0]
303         sender['port'] = _krpc_sender[1]        
304         n = self.Node().initWithDict(sender)
305         n.conn = self.udp.connectionForAddr((n.host, n.port))
306         self.insertNode(n, contacted=0)
307     
308         l = self.retrieveValues(key)
309         if len(l) > 0:
310             return {'values' : l, "id": self.node.id}
311         else:
312             nodes = self.table.findNodes(key)
313             nodes = map(lambda node: node.senderDict(), nodes)
314             return {'nodes' : nodes, "id": self.node.id}
315
316 ###  provides a generic write method, you probably don't want to deploy something that allows
317 ###  arbitrary value storage
318 class KhashmirWrite(KhashmirRead):
319     _Node = KNodeWrite
320     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
321     def storeValueForKey(self, key, value, callback=None):
322         """ stores the value for key in the global table, returns immediately, no status 
323             in this implementation, peers respond but don't indicate status to storing values
324             a key can have many values
325         """
326         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
327             if not response:
328                 # default callback
329                 def _storedValueHandler(sender):
330                     pass
331                 response=_storedValueHandler
332             action = StoreValue(self.table, key, value, response, self.config)
333             reactor.callLater(0, action.goWithNodes, nodes)
334             
335         # this call is asynch
336         self.findNode(key, _storeValueForKey)
337                     
338     def krpc_store_value(self, key, value, id, _krpc_sender):
339         t = "%0.6f" % time()
340         c = self.store.cursor()
341         try:
342             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
343         except sqlite.IntegrityError, reason:
344             # update last insert time
345             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
346         self.store.commit()
347         sender = {'id' : id}
348         sender['host'] = _krpc_sender[0]
349         sender['port'] = _krpc_sender[1]        
350         n = self.Node().initWithDict(sender)
351         n.conn = self.udp.connectionForAddr((n.host, n.port))
352         self.insertNode(n, contacted=0)
353         return {"id" : self.node.id}
354
355 # the whole shebang, for testing
356 class Khashmir(KhashmirWrite):
357     _Node = KNodeWrite
358
359 class SimpleTests(unittest.TestCase):
360     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
361                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
362                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
363                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
364                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
365                     'KE_AGE': 3600, }
366
367     def setUp(self):
368         krpc.KRPC.noisy = 0
369         d = self.DHT_DEFAULTS.copy()
370         d['PORT'] = 4044
371         self.a = Khashmir(d)
372         d = self.DHT_DEFAULTS.copy()
373         d['PORT'] = 4045
374         self.b = Khashmir(d)
375         
376     def tearDown(self):
377         self.a.listenport.stopListening()
378         self.b.listenport.stopListening()
379         try:
380             self.a.next_checkpoint.cancel()
381         except:
382             pass
383         try:
384             self.b.next_checkpoint.cancel()
385         except:
386             pass
387         try:
388             self.a.expirer.next_expire.cancel()
389         except:
390             pass
391         try:
392             self.b.expirer.next_expire.cancel()
393         except:
394             pass
395         self.a.store.close()
396         self.b.store.close()
397         os.unlink(self.a.db)
398         os.unlink(self.b.db)
399
400     def testAddContact(self):
401         self.assertEqual(len(self.a.table.buckets), 1)
402         self.assertEqual(len(self.a.table.buckets[0].l), 0)
403
404         self.assertEqual(len(self.b.table.buckets), 1)
405         self.assertEqual(len(self.b.table.buckets[0].l), 0)
406
407         self.a.addContact('127.0.0.1', 4045)
408         reactor.iterate()
409         reactor.iterate()
410         reactor.iterate()
411         reactor.iterate()
412
413         self.assertEqual(len(self.a.table.buckets), 1)
414         self.assertEqual(len(self.a.table.buckets[0].l), 1)
415         self.assertEqual(len(self.b.table.buckets), 1)
416         self.assertEqual(len(self.b.table.buckets[0].l), 1)
417
418     def testStoreRetrieve(self):
419         self.a.addContact('127.0.0.1', 4045)
420         reactor.iterate()
421         reactor.iterate()
422         reactor.iterate()
423         reactor.iterate()
424         self.got = 0
425         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
426         reactor.iterate()
427         reactor.iterate()
428         reactor.iterate()
429         reactor.iterate()
430         reactor.iterate()
431         reactor.iterate()
432         self.a.valueForKey(sha('foo').digest(), self._cb)
433         reactor.iterate()
434         reactor.iterate()
435         reactor.iterate()
436         reactor.iterate()
437         reactor.iterate()
438         reactor.iterate()
439         reactor.iterate()
440
441     def _cb(self, val):
442         if not val:
443             self.assertEqual(self.got, 1)
444         elif 'foobar' in val:
445             self.got = 1
446
447
448 class MultiTest(unittest.TestCase):
449     num = 20
450     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
451                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
452                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
453                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
454                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
455                     'KE_AGE': 3600, }
456
457     def _done(self, val):
458         self.done = 1
459         
460     def setUp(self):
461         self.l = []
462         self.startport = 4088
463         for i in range(self.num):
464             d = self.DHT_DEFAULTS.copy()
465             d['PORT'] = self.startport + i
466             self.l.append(Khashmir(d))
467         reactor.iterate()
468         reactor.iterate()
469         
470         for i in self.l:
471             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
472             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
473             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
474             reactor.iterate()
475             reactor.iterate()
476             reactor.iterate() 
477             
478         for i in self.l:
479             self.done = 0
480             i.findCloseNodes(self._done)
481             while not self.done:
482                 reactor.iterate()
483         for i in self.l:
484             self.done = 0
485             i.findCloseNodes(self._done)
486             while not self.done:
487                 reactor.iterate()
488
489     def tearDown(self):
490         for i in self.l:
491             i.listenport.stopListening()
492             try:
493                 i.next_checkpoint.cancel()
494             except:
495                 pass
496             try:
497                 i.expirer.next_expire.cancel()
498             except:
499                 pass
500             i.store.close()
501             os.unlink(i.db)
502             
503         reactor.iterate()
504         
505     def testStoreRetrieve(self):
506         for i in range(10):
507             K = newID()
508             V = newID()
509             
510             for a in range(3):
511                 self.done = 0
512                 def _scb(val):
513                     self.done = 1
514                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
515                 while not self.done:
516                     reactor.iterate()
517
518
519                 def _rcb(val):
520                     if not val:
521                         self.done = 1
522                         self.assertEqual(self.got, 1)
523                     elif V in val:
524                         self.got = 1
525                 for x in range(3):
526                     self.got = 0
527                     self.done = 0
528                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
529                     while not self.done:
530                         reactor.iterate()