]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
Move all the khashmir database operations to a separate module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from time import time
8 from random import randrange
9 from sha import sha
10 import os
11
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
15
16 from db import DB
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
25     _Node = KNodeBase
26     def __init__(self, config, cache_dir='/tmp'):
27         self.config = None
28         self.setup(config, cache_dir)
29         
30     def setup(self, config, cache_dir):
31         self.config = config
32         self.port = config['PORT']
33         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34         self.node = self._loadSelfNode('', self.port)
35         self.table = KTable(self.node, config)
36         #self.app = service.Application("krpc")
37         self.udp = krpc.hostbroker(self)
38         self.udp.protocol = krpc.KRPC
39         self.listenport = reactor.listenUDP(self.port, self.udp)
40         self.last = time()
41         self._loadRoutingTable()
42         self.expirer = KeyExpirer(self.store, config)
43         self.refreshTable(force=1)
44         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
45
46     def Node(self):
47         n = self._Node()
48         n.table = self.table
49         return n
50     
51     def __del__(self):
52         self.listenport.stopListening()
53         
54     def _loadSelfNode(self, host, port):
55         id = self.store.getSelfNode()
56         if not id:
57             id = newID()
58         return self._Node().init(id, host, port)
59         
60     def checkpoint(self, auto=0):
61         self.store.saveSelfNode(self.node.id)
62         self.store.dumpRoutingTable(self.table.buckets)
63         self.refreshTable()
64         if auto:
65             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
66                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
67                               self.checkpoint, (1,))
68         
69     def _loadRoutingTable(self):
70         """
71             load routing table nodes from database
72             it's usually a good idea to call refreshTable(force=1) after loading the table
73         """
74         nodes = self.store.getRoutingTable()
75         for rec in nodes:
76             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
77             n.conn = self.udp.connectionForAddr((n.host, n.port))
78             self.table.insertNode(n, contacted=0)
79             
80
81     #######
82     #######  LOCAL INTERFACE    - use these methods!
83     def addContact(self, host, port, callback=None):
84         """
85             ping this node and add the contact info to the table on pong!
86         """
87         n =self.Node().init(NULL_ID, host, port) 
88         n.conn = self.udp.connectionForAddr((n.host, n.port))
89         self.sendPing(n, callback=callback)
90
91     ## this call is async!
92     def findNode(self, id, callback, errback=None):
93         """ returns the contact info for node, or the k closest nodes, from the global table """
94         # get K nodes out of local table/cache, or the node we want
95         nodes = self.table.findNodes(id)
96         d = Deferred()
97         if errback:
98             d.addCallbacks(callback, errback)
99         else:
100             d.addCallback(callback)
101         if len(nodes) == 1 and nodes[0].id == id :
102             d.callback(nodes)
103         else:
104             # create our search state
105             state = FindNode(self, id, d.callback, self.config)
106             reactor.callLater(0, state.goWithNodes, nodes)
107     
108     def insertNode(self, n, contacted=1):
109         """
110         insert a node in our local table, pinging oldest contact in bucket, if necessary
111         
112         If all you have is a host/port, then use addContact, which calls this method after
113         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
114         a node into the table without it's peer-ID.  That means of course the node passed into this
115         method needs to be a properly formed Node object with a valid ID.
116         """
117         old = self.table.insertNode(n, contacted=contacted)
118         if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
119             # the bucket is full, check to see if old node is still around and if so, replace it
120             
121             ## these are the callbacks used when we ping the oldest node in a bucket
122             def _staleNodeHandler(oldnode=old, newnode = n):
123                 """ called if the pinged node never responds """
124                 self.table.replaceStaleNode(old, newnode)
125             
126             def _notStaleNodeHandler(dict, old=old):
127                 """ called when we get a pong from the old node """
128                 dict = dict['rsp']
129                 if dict['id'] == old.id:
130                     self.table.justSeenNode(old.id)
131             
132             df = old.ping(self.node.id)
133             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
134
135     def sendPing(self, node, callback=None):
136         """
137             ping a node
138         """
139         df = node.ping(self.node.id)
140         ## these are the callbacks we use when we issue a PING
141         def _pongHandler(dict, node=node, table=self.table, callback=callback):
142             _krpc_sender = dict['_krpc_sender']
143             dict = dict['rsp']
144             sender = {'id' : dict['id']}
145             sender['host'] = _krpc_sender[0]
146             sender['port'] = _krpc_sender[1]
147             n = self.Node().initWithDict(sender)
148             n.conn = self.udp.connectionForAddr((n.host, n.port))
149             table.insertNode(n)
150             if callback:
151                 callback()
152         def _defaultPong(err, node=node, table=self.table, callback=callback):
153             table.nodeFailed(node)
154             if callback:
155                 callback()
156         
157         df.addCallbacks(_pongHandler,_defaultPong)
158
159     def findCloseNodes(self, callback=lambda a: None):
160         """
161             This does a findNode on the ID one away from our own.  
162             This will allow us to populate our table with nodes on our network closest to our own.
163             This is called as soon as we start up with an empty table
164         """
165         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
166         self.findNode(id, callback)
167
168     def refreshTable(self, force=0):
169         """
170             force=1 will refresh table regardless of last bucket access time
171         """
172         def callback(nodes):
173             pass
174     
175         for bucket in self.table.buckets:
176             if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
177                 id = newIDInRange(bucket.min, bucket.max)
178                 self.findNode(id, callback)
179
180     def stats(self):
181         """
182         Returns (num_contacts, num_nodes)
183         num_contacts: number contacts in our routing table
184         num_nodes: number of nodes estimated in the entire dht
185         """
186         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
187         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
188         return (num_contacts, num_nodes)
189     
190     def shutdown(self):
191         """Closes the port and cancels pending later calls."""
192         self.listenport.stopListening()
193         try:
194             self.next_checkpoint.cancel()
195         except:
196             pass
197         self.expirer.shutdown()
198         self.store.close()
199
200     def krpc_ping(self, id, _krpc_sender):
201         sender = {'id' : id}
202         sender['host'] = _krpc_sender[0]
203         sender['port'] = _krpc_sender[1]        
204         n = self.Node().initWithDict(sender)
205         n.conn = self.udp.connectionForAddr((n.host, n.port))
206         self.insertNode(n, contacted=0)
207         return {"id" : self.node.id}
208         
209     def krpc_find_node(self, target, id, _krpc_sender):
210         nodes = self.table.findNodes(target)
211         nodes = map(lambda node: node.senderDict(), nodes)
212         sender = {'id' : id}
213         sender['host'] = _krpc_sender[0]
214         sender['port'] = _krpc_sender[1]        
215         n = self.Node().initWithDict(sender)
216         n.conn = self.udp.connectionForAddr((n.host, n.port))
217         self.insertNode(n, contacted=0)
218         return {"nodes" : nodes, "id" : self.node.id}
219
220
221 ## This class provides read-only access to the DHT, valueForKey
222 ## you probably want to use this mixin and provide your own write methods
223 class KhashmirRead(KhashmirBase):
224     _Node = KNodeRead
225
226     ## also async
227     def valueForKey(self, key, callback, searchlocal = 1):
228         """ returns the values found for key in global table
229             callback will be called with a list of values for each peer that returns unique values
230             final callback will be an empty list - probably should change to 'more coming' arg
231         """
232         nodes = self.table.findNodes(key)
233         
234         # get locals
235         if searchlocal:
236             l = self.store.retrieveValues(key)
237             if len(l) > 0:
238                 reactor.callLater(0, callback, key, l)
239         else:
240             l = []
241         
242         # create our search state
243         state = GetValue(self, key, callback, self.config)
244         reactor.callLater(0, state.goWithNodes, nodes, l)
245
246     def krpc_find_value(self, key, id, _krpc_sender):
247         sender = {'id' : id}
248         sender['host'] = _krpc_sender[0]
249         sender['port'] = _krpc_sender[1]        
250         n = self.Node().initWithDict(sender)
251         n.conn = self.udp.connectionForAddr((n.host, n.port))
252         self.insertNode(n, contacted=0)
253     
254         l = self.store.retrieveValues(key)
255         if len(l) > 0:
256             return {'values' : l, "id": self.node.id}
257         else:
258             nodes = self.table.findNodes(key)
259             nodes = map(lambda node: node.senderDict(), nodes)
260             return {'nodes' : nodes, "id": self.node.id}
261
262 ###  provides a generic write method, you probably don't want to deploy something that allows
263 ###  arbitrary value storage
264 class KhashmirWrite(KhashmirRead):
265     _Node = KNodeWrite
266     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
267     def storeValueForKey(self, key, value, callback=None):
268         """ stores the value for key in the global table, returns immediately, no status 
269             in this implementation, peers respond but don't indicate status to storing values
270             a key can have many values
271         """
272         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
273             if not response:
274                 # default callback
275                 def _storedValueHandler(key, value, sender):
276                     pass
277                 response=_storedValueHandler
278             action = StoreValue(self.table, key, value, response, self.config)
279             reactor.callLater(0, action.goWithNodes, nodes)
280             
281         # this call is asynch
282         self.findNode(key, _storeValueForKey)
283                     
284     def krpc_store_value(self, key, value, id, _krpc_sender):
285         self.store.storeValue(key, value)
286         sender = {'id' : id}
287         sender['host'] = _krpc_sender[0]
288         sender['port'] = _krpc_sender[1]        
289         n = self.Node().initWithDict(sender)
290         n.conn = self.udp.connectionForAddr((n.host, n.port))
291         self.insertNode(n, contacted=0)
292         return {"id" : self.node.id}
293
294 # the whole shebang, for testing
295 class Khashmir(KhashmirWrite):
296     _Node = KNodeWrite
297
298 class SimpleTests(unittest.TestCase):
299     
300     timeout = 10
301     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
302                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
303                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
304                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
305                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
306                     'KE_AGE': 3600, }
307
308     def setUp(self):
309         krpc.KRPC.noisy = 0
310         d = self.DHT_DEFAULTS.copy()
311         d['PORT'] = 4044
312         self.a = Khashmir(d)
313         d = self.DHT_DEFAULTS.copy()
314         d['PORT'] = 4045
315         self.b = Khashmir(d)
316         
317     def tearDown(self):
318         self.a.shutdown()
319         self.b.shutdown()
320         os.unlink(self.a.store.db)
321         os.unlink(self.b.store.db)
322
323     def testAddContact(self):
324         self.assertEqual(len(self.a.table.buckets), 1)
325         self.assertEqual(len(self.a.table.buckets[0].l), 0)
326
327         self.assertEqual(len(self.b.table.buckets), 1)
328         self.assertEqual(len(self.b.table.buckets[0].l), 0)
329
330         self.a.addContact('127.0.0.1', 4045)
331         reactor.iterate()
332         reactor.iterate()
333         reactor.iterate()
334         reactor.iterate()
335
336         self.assertEqual(len(self.a.table.buckets), 1)
337         self.assertEqual(len(self.a.table.buckets[0].l), 1)
338         self.assertEqual(len(self.b.table.buckets), 1)
339         self.assertEqual(len(self.b.table.buckets[0].l), 1)
340
341     def testStoreRetrieve(self):
342         self.a.addContact('127.0.0.1', 4045)
343         reactor.iterate()
344         reactor.iterate()
345         reactor.iterate()
346         reactor.iterate()
347         self.got = 0
348         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
349         reactor.iterate()
350         reactor.iterate()
351         reactor.iterate()
352         reactor.iterate()
353         reactor.iterate()
354         reactor.iterate()
355         self.a.valueForKey(sha('foo').digest(), self._cb)
356         reactor.iterate()
357         reactor.iterate()
358         reactor.iterate()
359         reactor.iterate()
360         reactor.iterate()
361         reactor.iterate()
362         reactor.iterate()
363
364     def _cb(self, key, val):
365         if not val:
366             self.assertEqual(self.got, 1)
367         elif 'foobar' in val:
368             self.got = 1
369
370
371 class MultiTest(unittest.TestCase):
372     
373     timeout = 30
374     num = 20
375     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
376                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
377                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
378                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
379                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
380                     'KE_AGE': 3600, }
381
382     def _done(self, val):
383         self.done = 1
384         
385     def setUp(self):
386         self.l = []
387         self.startport = 4088
388         for i in range(self.num):
389             d = self.DHT_DEFAULTS.copy()
390             d['PORT'] = self.startport + i
391             self.l.append(Khashmir(d))
392         reactor.iterate()
393         reactor.iterate()
394         
395         for i in self.l:
396             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
397             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
398             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
399             reactor.iterate()
400             reactor.iterate()
401             reactor.iterate() 
402             
403         for i in self.l:
404             self.done = 0
405             i.findCloseNodes(self._done)
406             while not self.done:
407                 reactor.iterate()
408         for i in self.l:
409             self.done = 0
410             i.findCloseNodes(self._done)
411             while not self.done:
412                 reactor.iterate()
413
414     def tearDown(self):
415         for i in self.l:
416             i.shutdown()
417             os.unlink(i.store.db)
418             
419         reactor.iterate()
420         
421     def testStoreRetrieve(self):
422         for i in range(10):
423             K = newID()
424             V = newID()
425             
426             for a in range(3):
427                 self.done = 0
428                 def _scb(key, value, result):
429                     self.done = 1
430                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
431                 while not self.done:
432                     reactor.iterate()
433
434
435                 def _rcb(key, val):
436                     if not val:
437                         self.done = 1
438                         self.assertEqual(self.got, 1)
439                     elif V in val:
440                         self.got = 1
441                 for x in range(3):
442                     self.got = 0
443                     self.done = 0
444                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
445                     while not self.done:
446                         reactor.iterate()