]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/khashmir.py
Improve the creation of nodes and move all to the main khashmir class.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from datetime import datetime, timedelta
8 from random import randrange
9 from sha import sha
10 import os
11
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
15
16 from db import DB
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
21 import krpc
22
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
25     _Node = KNodeBase
26     def __init__(self, config, cache_dir='/tmp'):
27         self.config = None
28         self.setup(config, cache_dir)
29         
30     def setup(self, config, cache_dir):
31         self.config = config
32         self.port = config['PORT']
33         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34         self.node = self._loadSelfNode('', self.port)
35         self.table = KTable(self.node, config)
36         #self.app = service.Application("krpc")
37         self.udp = krpc.hostbroker(self)
38         self.udp.protocol = krpc.KRPC
39         self.listenport = reactor.listenUDP(self.port, self.udp)
40         self._loadRoutingTable()
41         self.expirer = KeyExpirer(self.store, config)
42         self.refreshTable(force=1)
43         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
44
45     def Node(self, id, host = None, port = None):
46         """Create a new node."""
47         n = self._Node(id, host, port)
48         n.table = self.table
49         n.conn = self.udp.connectionForAddr((n.host, n.port))
50         return n
51     
52     def __del__(self):
53         self.listenport.stopListening()
54         
55     def _loadSelfNode(self, host, port):
56         id = self.store.getSelfNode()
57         if not id:
58             id = newID()
59         return self._Node(id, host, port)
60         
61     def checkpoint(self, auto=0):
62         self.store.saveSelfNode(self.node.id)
63         self.store.dumpRoutingTable(self.table.buckets)
64         self.refreshTable()
65         if auto:
66             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
67                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
68                               self.checkpoint, (1,))
69         
70     def _loadRoutingTable(self):
71         """
72             load routing table nodes from database
73             it's usually a good idea to call refreshTable(force=1) after loading the table
74         """
75         nodes = self.store.getRoutingTable()
76         for rec in nodes:
77             n = self.Node(rec[0], rec[1], int(rec[2]))
78             self.table.insertNode(n, contacted=0)
79             
80
81     #######
82     #######  LOCAL INTERFACE    - use these methods!
83     def addContact(self, host, port, callback=None):
84         """
85             ping this node and add the contact info to the table on pong!
86         """
87         n = self.Node(NULL_ID, host, port)
88         self.sendPing(n, callback=callback)
89
90     ## this call is async!
91     def findNode(self, id, callback, errback=None):
92         """ returns the contact info for node, or the k closest nodes, from the global table """
93         # get K nodes out of local table/cache, or the node we want
94         nodes = self.table.findNodes(id)
95         d = Deferred()
96         if errback:
97             d.addCallbacks(callback, errback)
98         else:
99             d.addCallback(callback)
100         if len(nodes) == 1 and nodes[0].id == id :
101             d.callback(nodes)
102         else:
103             # create our search state
104             state = FindNode(self, id, d.callback, self.config)
105             reactor.callLater(0, state.goWithNodes, nodes)
106     
107     def insertNode(self, n, contacted=1):
108         """
109         insert a node in our local table, pinging oldest contact in bucket, if necessary
110         
111         If all you have is a host/port, then use addContact, which calls this method after
112         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
113         a node into the table without it's peer-ID.  That means of course the node passed into this
114         method needs to be a properly formed Node object with a valid ID.
115         """
116         old = self.table.insertNode(n, contacted=contacted)
117         if (old and old.id != self.node.id and
118             (datetime.now() - old.lastSeen) > 
119              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
120             # the bucket is full, check to see if old node is still around and if so, replace it
121             
122             ## these are the callbacks used when we ping the oldest node in a bucket
123             def _staleNodeHandler(oldnode=old, newnode = n):
124                 """ called if the pinged node never responds """
125                 self.table.replaceStaleNode(old, newnode)
126             
127             def _notStaleNodeHandler(dict, old=old):
128                 """ called when we get a pong from the old node """
129                 dict = dict['rsp']
130                 if dict['id'] == old.id:
131                     self.table.justSeenNode(old.id)
132             
133             df = old.ping(self.node.id)
134             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
135
136     def sendPing(self, node, callback=None):
137         """
138             ping a node
139         """
140         df = node.ping(self.node.id)
141         ## these are the callbacks we use when we issue a PING
142         def _pongHandler(dict, node=node, self=self, callback=callback):
143             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
144             self.insertNode(n)
145             if callback:
146                 callback()
147         def _defaultPong(err, node=node, table=self.table, callback=callback):
148             table.nodeFailed(node)
149             if callback:
150                 callback()
151         
152         df.addCallbacks(_pongHandler,_defaultPong)
153
154     def findCloseNodes(self, callback=lambda a: None):
155         """
156             This does a findNode on the ID one away from our own.  
157             This will allow us to populate our table with nodes on our network closest to our own.
158             This is called as soon as we start up with an empty table
159         """
160         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
161         self.findNode(id, callback)
162
163     def refreshTable(self, force=0):
164         """
165             force=1 will refresh table regardless of last bucket access time
166         """
167         def callback(nodes):
168             pass
169     
170         for bucket in self.table.buckets:
171             if force or (datetime.now() - bucket.lastAccessed > 
172                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
173                 id = newIDInRange(bucket.min, bucket.max)
174                 self.findNode(id, callback)
175
176     def stats(self):
177         """
178         Returns (num_contacts, num_nodes)
179         num_contacts: number contacts in our routing table
180         num_nodes: number of nodes estimated in the entire dht
181         """
182         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
183         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
184         return (num_contacts, num_nodes)
185     
186     def shutdown(self):
187         """Closes the port and cancels pending later calls."""
188         self.listenport.stopListening()
189         try:
190             self.next_checkpoint.cancel()
191         except:
192             pass
193         self.expirer.shutdown()
194         self.store.close()
195
196     #### Remote Interface - called by remote nodes
197     def krpc_ping(self, id, _krpc_sender):
198         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
199         self.insertNode(n, contacted=0)
200         return {"id" : self.node.id}
201         
202     def krpc_find_node(self, target, id, _krpc_sender):
203         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
204         self.insertNode(n, contacted=0)
205         nodes = self.table.findNodes(target)
206         nodes = map(lambda node: node.senderDict(), nodes)
207         return {"nodes" : nodes, "id" : self.node.id}
208
209
210 ## This class provides read-only access to the DHT, valueForKey
211 ## you probably want to use this mixin and provide your own write methods
212 class KhashmirRead(KhashmirBase):
213     _Node = KNodeRead
214
215     ## also async
216     def valueForKey(self, key, callback, searchlocal = 1):
217         """ returns the values found for key in global table
218             callback will be called with a list of values for each peer that returns unique values
219             final callback will be an empty list - probably should change to 'more coming' arg
220         """
221         nodes = self.table.findNodes(key)
222         
223         # get locals
224         if searchlocal:
225             l = self.store.retrieveValues(key)
226             if len(l) > 0:
227                 reactor.callLater(0, callback, key, l)
228         else:
229             l = []
230         
231         # create our search state
232         state = GetValue(self, key, callback, self.config)
233         reactor.callLater(0, state.goWithNodes, nodes, l)
234
235     #### Remote Interface - called by remote nodes
236     def krpc_find_value(self, key, id, _krpc_sender):
237         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
238         self.insertNode(n, contacted=0)
239     
240         l = self.store.retrieveValues(key)
241         if len(l) > 0:
242             return {'values' : l, "id": self.node.id}
243         else:
244             nodes = self.table.findNodes(key)
245             nodes = map(lambda node: node.senderDict(), nodes)
246             return {'nodes' : nodes, "id": self.node.id}
247
248 ###  provides a generic write method, you probably don't want to deploy something that allows
249 ###  arbitrary value storage
250 class KhashmirWrite(KhashmirRead):
251     _Node = KNodeWrite
252     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
253     def storeValueForKey(self, key, value, callback=None):
254         """ stores the value for key in the global table, returns immediately, no status 
255             in this implementation, peers respond but don't indicate status to storing values
256             a key can have many values
257         """
258         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
259             if not response:
260                 # default callback
261                 def _storedValueHandler(key, value, sender):
262                     pass
263                 response=_storedValueHandler
264             action = StoreValue(self.table, key, value, response, self.config)
265             reactor.callLater(0, action.goWithNodes, nodes)
266             
267         # this call is asynch
268         self.findNode(key, _storeValueForKey)
269                     
270     #### Remote Interface - called by remote nodes
271     def krpc_store_value(self, key, value, id, _krpc_sender):
272         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
273         self.insertNode(n, contacted=0)
274         self.store.storeValue(key, value)
275         return {"id" : self.node.id}
276
277 # the whole shebang, for testing
278 class Khashmir(KhashmirWrite):
279     _Node = KNodeWrite
280
281 class SimpleTests(unittest.TestCase):
282     
283     timeout = 10
284     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
285                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
286                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
287                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
288                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
289                     'KE_AGE': 3600, }
290
291     def setUp(self):
292         krpc.KRPC.noisy = 0
293         d = self.DHT_DEFAULTS.copy()
294         d['PORT'] = 4044
295         self.a = Khashmir(d)
296         d = self.DHT_DEFAULTS.copy()
297         d['PORT'] = 4045
298         self.b = Khashmir(d)
299         
300     def tearDown(self):
301         self.a.shutdown()
302         self.b.shutdown()
303         os.unlink(self.a.store.db)
304         os.unlink(self.b.store.db)
305
306     def testAddContact(self):
307         self.assertEqual(len(self.a.table.buckets), 1)
308         self.assertEqual(len(self.a.table.buckets[0].l), 0)
309
310         self.assertEqual(len(self.b.table.buckets), 1)
311         self.assertEqual(len(self.b.table.buckets[0].l), 0)
312
313         self.a.addContact('127.0.0.1', 4045)
314         reactor.iterate()
315         reactor.iterate()
316         reactor.iterate()
317         reactor.iterate()
318
319         self.assertEqual(len(self.a.table.buckets), 1)
320         self.assertEqual(len(self.a.table.buckets[0].l), 1)
321         self.assertEqual(len(self.b.table.buckets), 1)
322         self.assertEqual(len(self.b.table.buckets[0].l), 1)
323
324     def testStoreRetrieve(self):
325         self.a.addContact('127.0.0.1', 4045)
326         reactor.iterate()
327         reactor.iterate()
328         reactor.iterate()
329         reactor.iterate()
330         self.got = 0
331         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
332         reactor.iterate()
333         reactor.iterate()
334         reactor.iterate()
335         reactor.iterate()
336         reactor.iterate()
337         reactor.iterate()
338         self.a.valueForKey(sha('foo').digest(), self._cb)
339         reactor.iterate()
340         reactor.iterate()
341         reactor.iterate()
342         reactor.iterate()
343         reactor.iterate()
344         reactor.iterate()
345         reactor.iterate()
346
347     def _cb(self, key, val):
348         if not val:
349             self.assertEqual(self.got, 1)
350         elif 'foobar' in val:
351             self.got = 1
352
353
354 class MultiTest(unittest.TestCase):
355     
356     timeout = 30
357     num = 20
358     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
359                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
360                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
361                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
362                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
363                     'KE_AGE': 3600, }
364
365     def _done(self, val):
366         self.done = 1
367         
368     def setUp(self):
369         self.l = []
370         self.startport = 4088
371         for i in range(self.num):
372             d = self.DHT_DEFAULTS.copy()
373             d['PORT'] = self.startport + i
374             self.l.append(Khashmir(d))
375         reactor.iterate()
376         reactor.iterate()
377         
378         for i in self.l:
379             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
380             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
381             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
382             reactor.iterate()
383             reactor.iterate()
384             reactor.iterate() 
385             
386         for i in self.l:
387             self.done = 0
388             i.findCloseNodes(self._done)
389             while not self.done:
390                 reactor.iterate()
391         for i in self.l:
392             self.done = 0
393             i.findCloseNodes(self._done)
394             while not self.done:
395                 reactor.iterate()
396
397     def tearDown(self):
398         for i in self.l:
399             i.shutdown()
400             os.unlink(i.store.db)
401             
402         reactor.iterate()
403         
404     def testStoreRetrieve(self):
405         for i in range(10):
406             K = newID()
407             V = newID()
408             
409             for a in range(3):
410                 self.done = 0
411                 def _scb(key, value, result):
412                     self.done = 1
413                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
414                 while not self.done:
415                     reactor.iterate()
416
417
418                 def _rcb(key, val):
419                     if not val:
420                         self.done = 1
421                         self.assertEqual(self.got, 1)
422                     elif V in val:
423                         self.got = 1
424                 for x in range(3):
425                     self.got = 0
426                     self.done = 0
427                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
428                     while not self.done:
429                         reactor.iterate()