]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
New DHT method 'join' like 'ping' but returns our IP and port.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from datetime import datetime, timedelta
8 from random import randrange
9 from sha import sha
10 import os
11
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
15
16 from db import DB
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
25     _Node = KNodeBase
26     def __init__(self, config, cache_dir='/tmp'):
27         self.config = None
28         self.setup(config, cache_dir)
29         
30     def setup(self, config, cache_dir):
31         self.config = config
32         self.port = config['PORT']
33         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34         self.node = self._loadSelfNode('', self.port)
35         self.table = KTable(self.node, config)
36         #self.app = service.Application("krpc")
37         self.udp = krpc.hostbroker(self, config)
38         self.udp.protocol = krpc.KRPC
39         self.listenport = reactor.listenUDP(self.port, self.udp)
40         self._loadRoutingTable()
41         self.expirer = KeyExpirer(self.store, config)
42         self.refreshTable(force=1)
43         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
44
45     def Node(self, id, host = None, port = None):
46         """Create a new node."""
47         n = self._Node(id, host, port)
48         n.table = self.table
49         n.conn = self.udp.connectionForAddr((n.host, n.port))
50         return n
51     
52     def __del__(self):
53         self.listenport.stopListening()
54         
55     def _loadSelfNode(self, host, port):
56         id = self.store.getSelfNode()
57         if not id:
58             id = newID()
59         return self._Node(id, host, port)
60         
61     def checkpoint(self, auto=0):
62         self.store.saveSelfNode(self.node.id)
63         self.store.dumpRoutingTable(self.table.buckets)
64         self.refreshTable()
65         if auto:
66             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
67                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
68                               self.checkpoint, (1,))
69         
70     def _loadRoutingTable(self):
71         """
72             load routing table nodes from database
73             it's usually a good idea to call refreshTable(force=1) after loading the table
74         """
75         nodes = self.store.getRoutingTable()
76         for rec in nodes:
77             n = self.Node(rec[0], rec[1], int(rec[2]))
78             self.table.insertNode(n, contacted=0)
79             
80
81     #######
82     #######  LOCAL INTERFACE    - use these methods!
83     def addContact(self, host, port, callback=None):
84         """
85             ping this node and add the contact info to the table on pong!
86         """
87         n = self.Node(NULL_ID, host, port)
88         self.sendJoin(n, callback=callback)
89
90     ## this call is async!
91     def findNode(self, id, callback, errback=None):
92         """ returns the contact info for node, or the k closest nodes, from the global table """
93         # get K nodes out of local table/cache, or the node we want
94         nodes = self.table.findNodes(id)
95         d = Deferred()
96         if errback:
97             d.addCallbacks(callback, errback)
98         else:
99             d.addCallback(callback)
100         if len(nodes) == 1 and nodes[0].id == id :
101             d.callback(nodes)
102         else:
103             # create our search state
104             state = FindNode(self, id, d.callback, self.config)
105             reactor.callLater(0, state.goWithNodes, nodes)
106     
107     def insertNode(self, n, contacted=1):
108         """
109         insert a node in our local table, pinging oldest contact in bucket, if necessary
110         
111         If all you have is a host/port, then use addContact, which calls this method after
112         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
113         a node into the table without it's peer-ID.  That means of course the node passed into this
114         method needs to be a properly formed Node object with a valid ID.
115         """
116         old = self.table.insertNode(n, contacted=contacted)
117         if (old and old.id != self.node.id and
118             (datetime.now() - old.lastSeen) > 
119              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
120             # the bucket is full, check to see if old node is still around and if so, replace it
121             
122             ## these are the callbacks used when we ping the oldest node in a bucket
123             def _staleNodeHandler(oldnode=old, newnode = n):
124                 """ called if the pinged node never responds """
125                 self.table.replaceStaleNode(old, newnode)
126             
127             def _notStaleNodeHandler(dict, old=old):
128                 """ called when we get a pong from the old node """
129                 dict = dict['rsp']
130                 if dict['id'] == old.id:
131                     self.table.justSeenNode(old.id)
132             
133             df = old.ping(self.node.id)
134             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
135
136     def sendJoin(self, node, callback=None):
137         """
138             ping a node
139         """
140         df = node.join(self.node.id)
141         ## these are the callbacks we use when we issue a PING
142         def _pongHandler(dict, node=node, self=self, callback=callback):
143             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
144             self.insertNode(n)
145             if callback:
146                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
147         def _defaultPong(err, node=node, table=self.table, callback=callback):
148             table.nodeFailed(node)
149             if callback:
150                 callback(None)
151         
152         df.addCallbacks(_pongHandler,_defaultPong)
153
154     def findCloseNodes(self, callback=lambda a: None):
155         """
156             This does a findNode on the ID one away from our own.  
157             This will allow us to populate our table with nodes on our network closest to our own.
158             This is called as soon as we start up with an empty table
159         """
160         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
161         self.findNode(id, callback)
162
163     def refreshTable(self, force=0):
164         """
165             force=1 will refresh table regardless of last bucket access time
166         """
167         def callback(nodes):
168             pass
169     
170         for bucket in self.table.buckets:
171             if force or (datetime.now() - bucket.lastAccessed > 
172                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
173                 id = newIDInRange(bucket.min, bucket.max)
174                 self.findNode(id, callback)
175
176     def stats(self):
177         """
178         Returns (num_contacts, num_nodes)
179         num_contacts: number contacts in our routing table
180         num_nodes: number of nodes estimated in the entire dht
181         """
182         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
183         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
184         return (num_contacts, num_nodes)
185     
186     def shutdown(self):
187         """Closes the port and cancels pending later calls."""
188         self.listenport.stopListening()
189         try:
190             self.next_checkpoint.cancel()
191         except:
192             pass
193         self.expirer.shutdown()
194         self.store.close()
195
196     #### Remote Interface - called by remote nodes
197     def krpc_ping(self, id, _krpc_sender):
198         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
199         self.insertNode(n, contacted=0)
200         return {"id" : self.node.id}
201         
202     def krpc_join(self, id, _krpc_sender):
203         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
204         self.insertNode(n, contacted=0)
205         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
206         
207     def krpc_find_node(self, target, id, _krpc_sender):
208         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
209         self.insertNode(n, contacted=0)
210         nodes = self.table.findNodes(target)
211         nodes = map(lambda node: node.senderDict(), nodes)
212         return {"nodes" : nodes, "id" : self.node.id}
213
214
215 ## This class provides read-only access to the DHT, valueForKey
216 ## you probably want to use this mixin and provide your own write methods
217 class KhashmirRead(KhashmirBase):
218     _Node = KNodeRead
219
220     ## also async
221     def valueForKey(self, key, callback, searchlocal = 1):
222         """ returns the values found for key in global table
223             callback will be called with a list of values for each peer that returns unique values
224             final callback will be an empty list - probably should change to 'more coming' arg
225         """
226         nodes = self.table.findNodes(key)
227         
228         # get locals
229         if searchlocal:
230             l = self.store.retrieveValues(key)
231             if len(l) > 0:
232                 reactor.callLater(0, callback, key, l)
233         else:
234             l = []
235         
236         # create our search state
237         state = GetValue(self, key, callback, self.config)
238         reactor.callLater(0, state.goWithNodes, nodes, l)
239
240     #### Remote Interface - called by remote nodes
241     def krpc_find_value(self, key, id, _krpc_sender):
242         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
243         self.insertNode(n, contacted=0)
244     
245         l = self.store.retrieveValues(key)
246         if len(l) > 0:
247             return {'values' : l, "id": self.node.id}
248         else:
249             nodes = self.table.findNodes(key)
250             nodes = map(lambda node: node.senderDict(), nodes)
251             return {'nodes' : nodes, "id": self.node.id}
252
253 ###  provides a generic write method, you probably don't want to deploy something that allows
254 ###  arbitrary value storage
255 class KhashmirWrite(KhashmirRead):
256     _Node = KNodeWrite
257     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
258     def storeValueForKey(self, key, value, callback=None):
259         """ stores the value for key in the global table, returns immediately, no status 
260             in this implementation, peers respond but don't indicate status to storing values
261             a key can have many values
262         """
263         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
264             if not response:
265                 # default callback
266                 def _storedValueHandler(key, value, sender):
267                     pass
268                 response=_storedValueHandler
269             action = StoreValue(self.table, key, value, response, self.config)
270             reactor.callLater(0, action.goWithNodes, nodes)
271             
272         # this call is asynch
273         self.findNode(key, _storeValueForKey)
274                     
275     #### Remote Interface - called by remote nodes
276     def krpc_store_value(self, key, value, id, _krpc_sender):
277         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
278         self.insertNode(n, contacted=0)
279         self.store.storeValue(key, value)
280         return {"id" : self.node.id}
281
282 # the whole shebang, for testing
283 class Khashmir(KhashmirWrite):
284     _Node = KNodeWrite
285
286 class SimpleTests(unittest.TestCase):
287     
288     timeout = 10
289     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
290                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
291                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
292                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
293                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
294                     'KE_AGE': 3600, 'SPEW': False, }
295
296     def setUp(self):
297         krpc.KRPC.noisy = 0
298         d = self.DHT_DEFAULTS.copy()
299         d['PORT'] = 4044
300         self.a = Khashmir(d)
301         d = self.DHT_DEFAULTS.copy()
302         d['PORT'] = 4045
303         self.b = Khashmir(d)
304         
305     def tearDown(self):
306         self.a.shutdown()
307         self.b.shutdown()
308         os.unlink(self.a.store.db)
309         os.unlink(self.b.store.db)
310
311     def testAddContact(self):
312         self.failUnlessEqual(len(self.a.table.buckets), 1)
313         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
314
315         self.failUnlessEqual(len(self.b.table.buckets), 1)
316         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
317
318         self.a.addContact('127.0.0.1', 4045)
319         reactor.iterate()
320         reactor.iterate()
321         reactor.iterate()
322         reactor.iterate()
323
324         self.failUnlessEqual(len(self.a.table.buckets), 1)
325         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
326         self.failUnlessEqual(len(self.b.table.buckets), 1)
327         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
328
329     def testStoreRetrieve(self):
330         self.a.addContact('127.0.0.1', 4045)
331         reactor.iterate()
332         reactor.iterate()
333         reactor.iterate()
334         reactor.iterate()
335         self.got = 0
336         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
337         reactor.iterate()
338         reactor.iterate()
339         reactor.iterate()
340         reactor.iterate()
341         reactor.iterate()
342         reactor.iterate()
343         self.a.valueForKey(sha('foo').digest(), self._cb)
344         reactor.iterate()
345         reactor.iterate()
346         reactor.iterate()
347         reactor.iterate()
348         reactor.iterate()
349         reactor.iterate()
350         reactor.iterate()
351
352     def _cb(self, key, val):
353         if not val:
354             self.failUnlessEqual(self.got, 1)
355         elif 'foobar' in val:
356             self.got = 1
357
358
359 class MultiTest(unittest.TestCase):
360     
361     timeout = 30
362     num = 20
363     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
364                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
365                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
366                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
367                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
368                     'KE_AGE': 3600, 'SPEW': False, }
369
370     def _done(self, val):
371         self.done = 1
372         
373     def setUp(self):
374         self.l = []
375         self.startport = 4088
376         for i in range(self.num):
377             d = self.DHT_DEFAULTS.copy()
378             d['PORT'] = self.startport + i
379             self.l.append(Khashmir(d))
380         reactor.iterate()
381         reactor.iterate()
382         
383         for i in self.l:
384             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
385             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
386             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
387             reactor.iterate()
388             reactor.iterate()
389             reactor.iterate() 
390             
391         for i in self.l:
392             self.done = 0
393             i.findCloseNodes(self._done)
394             while not self.done:
395                 reactor.iterate()
396         for i in self.l:
397             self.done = 0
398             i.findCloseNodes(self._done)
399             while not self.done:
400                 reactor.iterate()
401
402     def tearDown(self):
403         for i in self.l:
404             i.shutdown()
405             os.unlink(i.store.db)
406             
407         reactor.iterate()
408         
409     def testStoreRetrieve(self):
410         for i in range(10):
411             K = newID()
412             V = newID()
413             
414             for a in range(3):
415                 self.done = 0
416                 def _scb(key, value, result):
417                     self.done = 1
418                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
419                 while not self.done:
420                     reactor.iterate()
421
422
423                 def _rcb(key, val):
424                     if not val:
425                         self.done = 1
426                         self.failUnlessEqual(self.got, 1)
427                     elif V in val:
428                         self.got = 1
429                 for x in range(3):
430                     self.got = 0
431                     self.done = 0
432                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
433                     while not self.done:
434                         reactor.iterate()