]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
Switch from the time module to the datetime module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from datetime import datetime, timedelta
8 from random import randrange
9 from sha import sha
10 import os
11
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
15
16 from db import DB
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
25     _Node = KNodeBase
26     def __init__(self, config, cache_dir='/tmp'):
27         self.config = None
28         self.setup(config, cache_dir)
29         
30     def setup(self, config, cache_dir):
31         self.config = config
32         self.port = config['PORT']
33         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34         self.node = self._loadSelfNode('', self.port)
35         self.table = KTable(self.node, config)
36         #self.app = service.Application("krpc")
37         self.udp = krpc.hostbroker(self)
38         self.udp.protocol = krpc.KRPC
39         self.listenport = reactor.listenUDP(self.port, self.udp)
40         self._loadRoutingTable()
41         self.expirer = KeyExpirer(self.store, config)
42         self.refreshTable(force=1)
43         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
44
45     def Node(self):
46         n = self._Node()
47         n.table = self.table
48         return n
49     
50     def __del__(self):
51         self.listenport.stopListening()
52         
53     def _loadSelfNode(self, host, port):
54         id = self.store.getSelfNode()
55         if not id:
56             id = newID()
57         return self._Node().init(id, host, port)
58         
59     def checkpoint(self, auto=0):
60         self.store.saveSelfNode(self.node.id)
61         self.store.dumpRoutingTable(self.table.buckets)
62         self.refreshTable()
63         if auto:
64             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
65                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
66                               self.checkpoint, (1,))
67         
68     def _loadRoutingTable(self):
69         """
70             load routing table nodes from database
71             it's usually a good idea to call refreshTable(force=1) after loading the table
72         """
73         nodes = self.store.getRoutingTable()
74         for rec in nodes:
75             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
76             n.conn = self.udp.connectionForAddr((n.host, n.port))
77             self.table.insertNode(n, contacted=0)
78             
79
80     #######
81     #######  LOCAL INTERFACE    - use these methods!
82     def addContact(self, host, port, callback=None):
83         """
84             ping this node and add the contact info to the table on pong!
85         """
86         n =self.Node().init(NULL_ID, host, port) 
87         n.conn = self.udp.connectionForAddr((n.host, n.port))
88         self.sendPing(n, callback=callback)
89
90     ## this call is async!
91     def findNode(self, id, callback, errback=None):
92         """ returns the contact info for node, or the k closest nodes, from the global table """
93         # get K nodes out of local table/cache, or the node we want
94         nodes = self.table.findNodes(id)
95         d = Deferred()
96         if errback:
97             d.addCallbacks(callback, errback)
98         else:
99             d.addCallback(callback)
100         if len(nodes) == 1 and nodes[0].id == id :
101             d.callback(nodes)
102         else:
103             # create our search state
104             state = FindNode(self, id, d.callback, self.config)
105             reactor.callLater(0, state.goWithNodes, nodes)
106     
107     def insertNode(self, n, contacted=1):
108         """
109         insert a node in our local table, pinging oldest contact in bucket, if necessary
110         
111         If all you have is a host/port, then use addContact, which calls this method after
112         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
113         a node into the table without it's peer-ID.  That means of course the node passed into this
114         method needs to be a properly formed Node object with a valid ID.
115         """
116         old = self.table.insertNode(n, contacted=contacted)
117         if (old and old.id != self.node.id and
118             (datetime.now() - old.lastSeen) > 
119              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
120             # the bucket is full, check to see if old node is still around and if so, replace it
121             
122             ## these are the callbacks used when we ping the oldest node in a bucket
123             def _staleNodeHandler(oldnode=old, newnode = n):
124                 """ called if the pinged node never responds """
125                 self.table.replaceStaleNode(old, newnode)
126             
127             def _notStaleNodeHandler(dict, old=old):
128                 """ called when we get a pong from the old node """
129                 dict = dict['rsp']
130                 if dict['id'] == old.id:
131                     self.table.justSeenNode(old.id)
132             
133             df = old.ping(self.node.id)
134             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
135
136     def sendPing(self, node, callback=None):
137         """
138             ping a node
139         """
140         df = node.ping(self.node.id)
141         ## these are the callbacks we use when we issue a PING
142         def _pongHandler(dict, node=node, table=self.table, callback=callback):
143             _krpc_sender = dict['_krpc_sender']
144             dict = dict['rsp']
145             sender = {'id' : dict['id']}
146             sender['host'] = _krpc_sender[0]
147             sender['port'] = _krpc_sender[1]
148             n = self.Node().initWithDict(sender)
149             n.conn = self.udp.connectionForAddr((n.host, n.port))
150             table.insertNode(n)
151             if callback:
152                 callback()
153         def _defaultPong(err, node=node, table=self.table, callback=callback):
154             table.nodeFailed(node)
155             if callback:
156                 callback()
157         
158         df.addCallbacks(_pongHandler,_defaultPong)
159
160     def findCloseNodes(self, callback=lambda a: None):
161         """
162             This does a findNode on the ID one away from our own.  
163             This will allow us to populate our table with nodes on our network closest to our own.
164             This is called as soon as we start up with an empty table
165         """
166         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
167         self.findNode(id, callback)
168
169     def refreshTable(self, force=0):
170         """
171             force=1 will refresh table regardless of last bucket access time
172         """
173         def callback(nodes):
174             pass
175     
176         for bucket in self.table.buckets:
177             if force or (datetime.now() - bucket.lastAccessed > 
178                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
179                 id = newIDInRange(bucket.min, bucket.max)
180                 self.findNode(id, callback)
181
182     def stats(self):
183         """
184         Returns (num_contacts, num_nodes)
185         num_contacts: number contacts in our routing table
186         num_nodes: number of nodes estimated in the entire dht
187         """
188         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
189         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
190         return (num_contacts, num_nodes)
191     
192     def shutdown(self):
193         """Closes the port and cancels pending later calls."""
194         self.listenport.stopListening()
195         try:
196             self.next_checkpoint.cancel()
197         except:
198             pass
199         self.expirer.shutdown()
200         self.store.close()
201
202     def krpc_ping(self, id, _krpc_sender):
203         sender = {'id' : id}
204         sender['host'] = _krpc_sender[0]
205         sender['port'] = _krpc_sender[1]        
206         n = self.Node().initWithDict(sender)
207         n.conn = self.udp.connectionForAddr((n.host, n.port))
208         self.insertNode(n, contacted=0)
209         return {"id" : self.node.id}
210         
211     def krpc_find_node(self, target, id, _krpc_sender):
212         nodes = self.table.findNodes(target)
213         nodes = map(lambda node: node.senderDict(), nodes)
214         sender = {'id' : id}
215         sender['host'] = _krpc_sender[0]
216         sender['port'] = _krpc_sender[1]        
217         n = self.Node().initWithDict(sender)
218         n.conn = self.udp.connectionForAddr((n.host, n.port))
219         self.insertNode(n, contacted=0)
220         return {"nodes" : nodes, "id" : self.node.id}
221
222
223 ## This class provides read-only access to the DHT, valueForKey
224 ## you probably want to use this mixin and provide your own write methods
225 class KhashmirRead(KhashmirBase):
226     _Node = KNodeRead
227
228     ## also async
229     def valueForKey(self, key, callback, searchlocal = 1):
230         """ returns the values found for key in global table
231             callback will be called with a list of values for each peer that returns unique values
232             final callback will be an empty list - probably should change to 'more coming' arg
233         """
234         nodes = self.table.findNodes(key)
235         
236         # get locals
237         if searchlocal:
238             l = self.store.retrieveValues(key)
239             if len(l) > 0:
240                 reactor.callLater(0, callback, key, l)
241         else:
242             l = []
243         
244         # create our search state
245         state = GetValue(self, key, callback, self.config)
246         reactor.callLater(0, state.goWithNodes, nodes, l)
247
248     def krpc_find_value(self, key, id, _krpc_sender):
249         sender = {'id' : id}
250         sender['host'] = _krpc_sender[0]
251         sender['port'] = _krpc_sender[1]        
252         n = self.Node().initWithDict(sender)
253         n.conn = self.udp.connectionForAddr((n.host, n.port))
254         self.insertNode(n, contacted=0)
255     
256         l = self.store.retrieveValues(key)
257         if len(l) > 0:
258             return {'values' : l, "id": self.node.id}
259         else:
260             nodes = self.table.findNodes(key)
261             nodes = map(lambda node: node.senderDict(), nodes)
262             return {'nodes' : nodes, "id": self.node.id}
263
264 ###  provides a generic write method, you probably don't want to deploy something that allows
265 ###  arbitrary value storage
266 class KhashmirWrite(KhashmirRead):
267     _Node = KNodeWrite
268     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
269     def storeValueForKey(self, key, value, callback=None):
270         """ stores the value for key in the global table, returns immediately, no status 
271             in this implementation, peers respond but don't indicate status to storing values
272             a key can have many values
273         """
274         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
275             if not response:
276                 # default callback
277                 def _storedValueHandler(key, value, sender):
278                     pass
279                 response=_storedValueHandler
280             action = StoreValue(self.table, key, value, response, self.config)
281             reactor.callLater(0, action.goWithNodes, nodes)
282             
283         # this call is asynch
284         self.findNode(key, _storeValueForKey)
285                     
286     def krpc_store_value(self, key, value, id, _krpc_sender):
287         self.store.storeValue(key, value)
288         sender = {'id' : id}
289         sender['host'] = _krpc_sender[0]
290         sender['port'] = _krpc_sender[1]        
291         n = self.Node().initWithDict(sender)
292         n.conn = self.udp.connectionForAddr((n.host, n.port))
293         self.insertNode(n, contacted=0)
294         return {"id" : self.node.id}
295
296 # the whole shebang, for testing
297 class Khashmir(KhashmirWrite):
298     _Node = KNodeWrite
299
300 class SimpleTests(unittest.TestCase):
301     
302     timeout = 10
303     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
304                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
305                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
306                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
307                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
308                     'KE_AGE': 3600, }
309
310     def setUp(self):
311         krpc.KRPC.noisy = 0
312         d = self.DHT_DEFAULTS.copy()
313         d['PORT'] = 4044
314         self.a = Khashmir(d)
315         d = self.DHT_DEFAULTS.copy()
316         d['PORT'] = 4045
317         self.b = Khashmir(d)
318         
319     def tearDown(self):
320         self.a.shutdown()
321         self.b.shutdown()
322         os.unlink(self.a.store.db)
323         os.unlink(self.b.store.db)
324
325     def testAddContact(self):
326         self.assertEqual(len(self.a.table.buckets), 1)
327         self.assertEqual(len(self.a.table.buckets[0].l), 0)
328
329         self.assertEqual(len(self.b.table.buckets), 1)
330         self.assertEqual(len(self.b.table.buckets[0].l), 0)
331
332         self.a.addContact('127.0.0.1', 4045)
333         reactor.iterate()
334         reactor.iterate()
335         reactor.iterate()
336         reactor.iterate()
337
338         self.assertEqual(len(self.a.table.buckets), 1)
339         self.assertEqual(len(self.a.table.buckets[0].l), 1)
340         self.assertEqual(len(self.b.table.buckets), 1)
341         self.assertEqual(len(self.b.table.buckets[0].l), 1)
342
343     def testStoreRetrieve(self):
344         self.a.addContact('127.0.0.1', 4045)
345         reactor.iterate()
346         reactor.iterate()
347         reactor.iterate()
348         reactor.iterate()
349         self.got = 0
350         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
351         reactor.iterate()
352         reactor.iterate()
353         reactor.iterate()
354         reactor.iterate()
355         reactor.iterate()
356         reactor.iterate()
357         self.a.valueForKey(sha('foo').digest(), self._cb)
358         reactor.iterate()
359         reactor.iterate()
360         reactor.iterate()
361         reactor.iterate()
362         reactor.iterate()
363         reactor.iterate()
364         reactor.iterate()
365
366     def _cb(self, key, val):
367         if not val:
368             self.assertEqual(self.got, 1)
369         elif 'foobar' in val:
370             self.got = 1
371
372
373 class MultiTest(unittest.TestCase):
374     
375     timeout = 30
376     num = 20
377     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
378                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
379                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
380                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
381                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
382                     'KE_AGE': 3600, }
383
384     def _done(self, val):
385         self.done = 1
386         
387     def setUp(self):
388         self.l = []
389         self.startport = 4088
390         for i in range(self.num):
391             d = self.DHT_DEFAULTS.copy()
392             d['PORT'] = self.startport + i
393             self.l.append(Khashmir(d))
394         reactor.iterate()
395         reactor.iterate()
396         
397         for i in self.l:
398             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
399             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
400             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
401             reactor.iterate()
402             reactor.iterate()
403             reactor.iterate() 
404             
405         for i in self.l:
406             self.done = 0
407             i.findCloseNodes(self._done)
408             while not self.done:
409                 reactor.iterate()
410         for i in self.l:
411             self.done = 0
412             i.findCloseNodes(self._done)
413             while not self.done:
414                 reactor.iterate()
415
416     def tearDown(self):
417         for i in self.l:
418             i.shutdown()
419             os.unlink(i.store.db)
420             
421         reactor.iterate()
422         
423     def testStoreRetrieve(self):
424         for i in range(10):
425             K = newID()
426             V = newID()
427             
428             for a in range(3):
429                 self.done = 0
430                 def _scb(key, value, result):
431                     self.done = 1
432                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
433                 while not self.done:
434                     reactor.iterate()
435
436
437                 def _rcb(key, val):
438                     if not val:
439                         self.done = 1
440                         self.assertEqual(self.got, 1)
441                     elif V in val:
442                         self.got = 1
443                 for x in range(3):
444                     self.got = 0
445                     self.done = 0
446                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
447                     while not self.done:
448                         reactor.iterate()