Always return the result at the end of a callback function.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
22
23 from db import DB
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
29 import krpc
30
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37
38 class KhashmirBase(protocol.Factory):
39     """The base Khashmir class, with base functionality and find node, no key-value mappings.
40     
41     @type _Node: L{node.Node}
42     @ivar _Node: the knode implementation to use for this class of DHT
43     @type config: C{dictionary}
44     @ivar config: the configuration parameters for the DHT
45     @type pinging: C{dictionary}
46     @ivar pinging: the node's that are currently being pinged, keys are the
47         node id's, values are the Deferred or DelayedCall objects
48     @type port: C{int}
49     @ivar port: the port to listen on
50     @type store: L{db.DB}
51     @ivar store: the database to store nodes and key/value pairs in
52     @type node: L{node.Node}
53     @ivar node: this node
54     @type table: L{ktable.KTable}
55     @ivar table: the routing table
56     @type token_secrets: C{list} of C{string}
57     @ivar token_secrets: the current secrets to use to create tokens
58     @type stats: L{stats.StatsLogger}
59     @ivar stats: the statistics gatherer
60     @type udp: L{krpc.hostbroker}
61     @ivar udp: the factory for the KRPC protocol
62     @type listenport: L{twisted.internet.interfaces.IListeningPort}
63     @ivar listenport: the UDP listening port
64     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65     @ivar next_checkpoint: the delayed call for the next checkpoint
66     """
67     
68     _Node = KNodeBase
69     
70     def __init__(self, config, cache_dir='/tmp'):
71         """Initialize the Khashmir class and call the L{setup} method.
72         
73         @type config: C{dictionary}
74         @param config: the configuration parameters for the DHT
75         @type cache_dir: C{string}
76         @param cache_dir: the directory to store all files in
77             (optional, defaults to the /tmp directory)
78         """
79         self.config = None
80         self.pinging = {}
81         self.setup(config, cache_dir)
82         
83     def setup(self, config, cache_dir):
84         """Setup all the Khashmir sub-modules.
85         
86         @type config: C{dictionary}
87         @param config: the configuration parameters for the DHT
88         @type cache_dir: C{string}
89         @param cache_dir: the directory to store all files in
90         """
91         self.config = config
92         self.port = config['PORT']
93         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94         self.node = self._loadSelfNode('', self.port)
95         self.table = KTable(self.node, config)
96         self.token_secrets = [newID()]
97         self.stats = StatsLogger(self.table, self.store)
98         
99         # Start listening
100         self.udp = krpc.hostbroker(self, self.stats, config)
101         self.udp.protocol = krpc.KRPC
102         self.listenport = reactor.listenUDP(self.port, self.udp)
103         
104         # Load the routing table and begin checkpointing
105         self._loadRoutingTable()
106         self.refreshTable(force = True)
107         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
108
109     def Node(self, id, host = None, port = None):
110         """Create a new node.
111         
112         @see: L{node.Node.__init__}
113         """
114         n = self._Node(id, host, port)
115         n.table = self.table
116         n.conn = self.udp.connectionForAddr((n.host, n.port))
117         return n
118     
119     def __del__(self):
120         """Stop listening for packets."""
121         self.listenport.stopListening()
122         
123     def _loadSelfNode(self, host, port):
124         """Create this node, loading any previously saved one."""
125         id = self.store.getSelfNode()
126         if not id:
127             id = newID()
128         return self._Node(id, host, port)
129         
130     def checkpoint(self):
131         """Perform some periodic maintenance operations."""
132         # Create a new token secret
133         self.token_secrets.insert(0, newID())
134         if len(self.token_secrets) > 3:
135             self.token_secrets.pop()
136             
137         # Save some parameters for reloading
138         self.store.saveSelfNode(self.node.id)
139         self.store.dumpRoutingTable(self.table.buckets)
140         
141         # DHT maintenance
142         self.store.expireValues(self.config['KEY_EXPIRE'])
143         self.refreshTable()
144         
145         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
146                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
147                                                  self.checkpoint)
148         
149     def _loadRoutingTable(self):
150         """Load the previous routing table nodes from the database.
151         
152         It's usually a good idea to call refreshTable(force = True) after
153         loading the table.
154         """
155         nodes = self.store.getRoutingTable()
156         for rec in nodes:
157             n = self.Node(rec[0], rec[1], int(rec[2]))
158             self.table.insertNode(n, contacted = False)
159             
160     #{ Local interface
161     def addContact(self, host, port, callback=None, errback=None):
162         """Ping this node and add the contact info to the table on pong.
163         
164         @type host: C{string}
165         @param host: the IP address of the node to contact
166         @type port: C{int}
167         @param port:the port of the node to contact
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the contact info returned by the node
171             (optional, defaults to doing nothing with the results)
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to calling the callback with the error)
175         """
176         n = self.Node(NULL_ID, host, port)
177         self.sendJoin(n, callback=callback, errback=errback)
178
179     def findNode(self, id, callback):
180         """Find the contact info for the K closest nodes in the global table.
181         
182         @type id: C{string}
183         @param id: the target ID to find the K closest nodes of
184         @type callback: C{method}
185         @param callback: the method to call with the results, it must take 1
186             parameter, the list of K closest nodes
187         """
188         # Mark the bucket as having been accessed
189         self.table.touch(id)
190         
191         # Start with our node
192         nodes = [copy(self.node)]
193
194         # Start the finding nodes action
195         state = FindNode(self, id, callback, self.config, self.stats)
196         reactor.callLater(0, state.goWithNodes, nodes)
197     
198     def insertNode(self, node, contacted = True):
199         """Try to insert a node in our local table, pinging oldest contact if necessary.
200         
201         If all you have is a host/port, then use L{addContact}, which calls this
202         method after receiving the PONG from the remote node. The reason for
203         the separation is we can't insert a node into the table without its
204         node ID. That means of course the node passed into this method needs
205         to be a properly formed Node object with a valid ID.
206
207         @type node: L{node.Node}
208         @param node: the new node to try and insert
209         @type contacted: C{boolean}
210         @param contacted: whether the new node is known to be good, i.e.
211             responded to a request (optional, defaults to True)
212         """
213         # Don't add any local nodes to the routing table
214         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
216             return
217         
218         old = self.table.insertNode(node, contacted=contacted)
219
220         if (isinstance(old, self._Node) and old.id != self.node.id and
221             (datetime.now() - old.lastSeen) > 
222              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
223             
224             # Bucket is full, check to see if old node is still available
225             df = self.sendPing(old)
226             df.addErrback(self._staleNodeHandler, old, node, contacted)
227         elif not old and not contacted:
228             # There's room, we just need to contact the node first
229             df = self.sendPing(node)
230             # Also schedule a future ping to make sure the node works
231             def rePing(newnode, self = self):
232                 if newnode.id not in self.pinging:
233                     self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234                                                                  self.sendPing, newnode)
235                 return newnode
236             df.addCallback(rePing)
237
238     def _staleNodeHandler(self, err, old, node, contacted):
239         """The pinged node never responded, so replace it."""
240         self.table.invalidateNode(old)
241         self.insertNode(node, contacted)
242         return err
243     
244     def nodeFailed(self, node):
245         """Mark a node as having failed a request and schedule a future check.
246         
247         @type node: L{node.Node}
248         @param node: the new node to try and insert
249         """
250         exists = self.table.nodeFailed(node)
251         
252         # If in the table, schedule a ping, if one isn't already sent/scheduled
253         if exists and node.id not in self.pinging:
254             self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
255                                                       self.sendPing, node)
256     
257     def sendPing(self, node):
258         """Ping the node to see if it's still alive.
259         
260         @type node: L{node.Node}
261         @param node: the node to send the join to
262         """
263         # Check for a ping already underway
264         if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265             self.pinging[node.id].active()):
266             self.pinging[node.id].cancel()
267         elif isinstance(self.pinging.get(node.id, None), Deferred):
268             return self.pinging[node.id]
269
270         self.stats.startedAction('ping')
271         df = node.ping(self.node.id)
272         self.pinging[node.id] = df
273         df.addCallbacks(self._pingHandler, self._pingError,
274                         callbackArgs = (node, datetime.now()),
275                         errbackArgs = (node, datetime.now()))
276         return df
277
278     def _pingHandler(self, dict, node, start):
279         """Node responded properly, update it and return the node object."""
280         self.stats.completedAction('ping', start)
281         del self.pinging[node.id]
282         # Create the node using the returned contact info
283         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284         reactor.callLater(0, self.insertNode, n)
285         return n
286
287     def _pingError(self, err, node, start):
288         """Error occurred, fail node."""
289         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290         self.stats.completedAction('ping', start)
291         del self.pinging[node.id]
292         self.nodeFailed(node)
293         return err
294         
295     def sendJoin(self, node, callback=None, errback=None):
296         """Join the DHT by pinging a bootstrap node.
297         
298         @type node: L{node.Node}
299         @param node: the node to send the join to
300         @type callback: C{method}
301         @param callback: the method to call with the results, it must take 1
302             parameter, the contact info returned by the node
303             (optional, defaults to doing nothing with the results)
304         @type errback: C{method}
305         @param errback: the method to call if an error occurs
306             (optional, defaults to calling the callback with the error)
307         """
308         if errback is None:
309             errback = callback
310         self.stats.startedAction('join')
311         df = node.join(self.node.id)
312         df.addCallbacks(self._joinHandler, self._joinError,
313                         callbackArgs = (node, datetime.now()),
314                         errbackArgs = (node, datetime.now()))
315         if callback:
316             df.addCallbacks(callback, errback)
317
318     def _joinHandler(self, dict, node, start):
319         """Node responded properly, extract the response."""
320         self.stats.completedAction('join', start)
321         # Create the node using the returned contact info
322         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
323         reactor.callLater(0, self.insertNode, n)
324         return (dict['ip_addr'], dict['port'])
325
326     def _joinError(self, err, node, start):
327         """Error occurred, fail node."""
328         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
329         self.stats.completedAction('join', start)
330         self.nodeFailed(node)
331         return err
332         
333     def findCloseNodes(self, callback=lambda a: None):
334         """Perform a findNode on the ID one away from our own.
335
336         This will allow us to populate our table with nodes on our network
337         closest to our own. This is called as soon as we start up with an
338         empty table.
339
340         @type callback: C{method}
341         @param callback: the method to call with the results, it must take 1
342             parameter, the list of K closest nodes
343             (optional, defaults to doing nothing with the results)
344         """
345         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
346         self.findNode(id, callback)
347
348     def refreshTable(self, force = False):
349         """Check all the buckets for those that need refreshing.
350         
351         @param force: refresh all buckets regardless of last bucket access time
352             (optional, defaults to False)
353         """
354         def callback(nodes):
355             pass
356     
357         for bucket in self.table.buckets:
358             if force or (datetime.now() - bucket.lastAccessed > 
359                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
360                 # Choose a random ID in the bucket and try and find it
361                 id = newIDInRange(bucket.min, bucket.max)
362                 self.findNode(id, callback)
363
364     def shutdown(self):
365         """Closes the port and cancels pending later calls."""
366         self.listenport.stopListening()
367         try:
368             self.next_checkpoint.cancel()
369         except:
370             pass
371         for call in self.pinging:
372             if isinstance(call, DelayedCall) and call.active():
373                 call.cancel()
374         self.store.close()
375     
376     def getStats(self):
377         """Gather the statistics for the DHT."""
378         return self.stats.formatHTML()
379
380     #{ Remote interface
381     def krpc_ping(self, id, _krpc_sender = None):
382         """Pong with our ID.
383         
384         @type id: C{string}
385         @param id: the node ID of the sender node
386         @type _krpc_sender: (C{string}, C{int})
387         @param _krpc_sender: the sender node's IP address and port
388         """
389         if _krpc_sender is not None:
390             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
391             reactor.callLater(0, self.insertNode, n, False)
392
393         return {"id" : self.node.id}
394         
395     def krpc_join(self, id, _krpc_sender = None):
396         """Add the node by responding with its address and port.
397         
398         @type id: C{string}
399         @param id: the node ID of the sender node
400         @type _krpc_sender: (C{string}, C{int})
401         @param _krpc_sender: the sender node's IP address and port
402         """
403         if _krpc_sender is not None:
404             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
405             reactor.callLater(0, self.insertNode, n, False)
406         else:
407             _krpc_sender = ('127.0.0.1', self.port)
408
409         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
410         
411     def krpc_find_node(self, id, target, _krpc_sender = None):
412         """Find the K closest nodes to the target in the local routing table.
413         
414         @type target: C{string}
415         @param target: the target ID to find nodes for
416         @type id: C{string}
417         @param id: the node ID of the sender node
418         @type _krpc_sender: (C{string}, C{int})
419         @param _krpc_sender: the sender node's IP address and port
420         """
421         if _krpc_sender is not None:
422             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
423             reactor.callLater(0, self.insertNode, n, False)
424         else:
425             _krpc_sender = ('127.0.0.1', self.port)
426
427         nodes = self.table.findNodes(target)
428         nodes = map(lambda node: node.contactInfo(), nodes)
429         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
430         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
431
432
433 class KhashmirRead(KhashmirBase):
434     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
435
436     _Node = KNodeRead
437
438     #{ Local interface
439     def findValue(self, key, callback):
440         """Get the nodes that have values for the key from the global table.
441         
442         @type key: C{string}
443         @param key: the target key to find the values for
444         @type callback: C{method}
445         @param callback: the method to call with the results, it must take 1
446             parameter, the list of nodes with values
447         """
448         # Mark the bucket as having been accessed
449         self.table.touch(key)
450         
451         # Start with ourself
452         nodes = [copy(self.node)]
453         
454         # Search for others starting with the locally found ones
455         state = FindValue(self, key, callback, self.config, self.stats)
456         reactor.callLater(0, state.goWithNodes, nodes)
457
458     def valueForKey(self, key, callback, searchlocal = True):
459         """Get the values found for key in global table.
460         
461         Callback will be called with a list of values for each peer that
462         returns unique values. The final callback will be an empty list.
463
464         @type key: C{string}
465         @param key: the target key to get the values for
466         @type callback: C{method}
467         @param callback: the method to call with the results, it must take 2
468             parameters: the key, and the values found
469         @type searchlocal: C{boolean}
470         @param searchlocal: whether to also look for any local values
471         """
472
473         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
474             """Use the found nodes to send requests for values to."""
475             # Get any local values
476             if searchlocal:
477                 l = self.store.retrieveValues(key)
478                 if len(l) > 0:
479                     node = copy(self.node)
480                     node.updateNumValues(len(l))
481                     nodes = nodes + [node]
482
483             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
484             reactor.callLater(0, state.goWithNodes, nodes)
485             
486         # First lookup nodes that have values for the key
487         self.findValue(key, _getValueForKey)
488
489     #{ Remote interface
490     def krpc_find_value(self, id, key, _krpc_sender = None):
491         """Find the number of values stored locally for the key, and the K closest nodes.
492         
493         @type key: C{string}
494         @param key: the target key to find the values and nodes for
495         @type id: C{string}
496         @param id: the node ID of the sender node
497         @type _krpc_sender: (C{string}, C{int})
498         @param _krpc_sender: the sender node's IP address and port
499         """
500         if _krpc_sender is not None:
501             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
502             reactor.callLater(0, self.insertNode, n, False)
503     
504         nodes = self.table.findNodes(key)
505         nodes = map(lambda node: node.contactInfo(), nodes)
506         num_values = self.store.countValues(key)
507         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
508
509     def krpc_get_value(self, id, key, num, _krpc_sender = None):
510         """Retrieve the values stored locally for the key.
511         
512         @type key: C{string}
513         @param key: the target key to retrieve the values for
514         @type num: C{int}
515         @param num: the maximum number of values to retrieve, or 0 to
516             retrieve all of them
517         @type id: C{string}
518         @param id: the node ID of the sender node
519         @type _krpc_sender: (C{string}, C{int})
520         @param _krpc_sender: the sender node's IP address and port
521         """
522         if _krpc_sender is not None:
523             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
524             reactor.callLater(0, self.insertNode, n, False)
525     
526         l = self.store.retrieveValues(key)
527         if num == 0 or num >= len(l):
528             return {'values' : l, "id": self.node.id}
529         else:
530             shuffle(l)
531             return {'values' : l[:num], "id": self.node.id}
532
533
534 class KhashmirWrite(KhashmirRead):
535     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
536
537     _Node = KNodeWrite
538
539     #{ Local interface
540     def storeValueForKey(self, key, value, callback=None):
541         """Stores the value for the key in the global table.
542         
543         No status in this implementation, peers respond but don't indicate
544         status of storing values.
545
546         @type key: C{string}
547         @param key: the target key to store the value for
548         @type value: C{string}
549         @param value: the value to store with the key
550         @type callback: C{method}
551         @param callback: the method to call with the results, it must take 3
552             parameters: the key, the value stored, and the result of the store
553             (optional, defaults to doing nothing with the results)
554         """
555         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
556             """Use the returned K closest nodes to store the key at."""
557             if not response:
558                 def _storedValueHandler(key, value, sender):
559                     """Default callback that does nothing."""
560                     pass
561                 response = _storedValueHandler
562             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
563             reactor.callLater(0, action.goWithNodes, nodes)
564             
565         # First find the K closest nodes to operate on.
566         self.findNode(key, _storeValueForKey)
567                     
568     #{ Remote interface
569     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
570         """Store the value locally with the key.
571         
572         @type key: C{string}
573         @param key: the target key to store the value for
574         @type value: C{string}
575         @param value: the value to store with the key
576         @param token: the token to confirm that this peer contacted us previously
577         @type id: C{string}
578         @param id: the node ID of the sender node
579         @type _krpc_sender: (C{string}, C{int})
580         @param _krpc_sender: the sender node's IP address and port
581         """
582         if _krpc_sender is not None:
583             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
584             reactor.callLater(0, self.insertNode, n, False)
585         else:
586             _krpc_sender = ('127.0.0.1', self.port)
587
588         for secret in self.token_secrets:
589             this_token = sha(secret + _krpc_sender[0]).digest()
590             if token == this_token:
591                 self.store.storeValue(key, value)
592                 return {"id" : self.node.id}
593         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
594
595
596 class Khashmir(KhashmirWrite):
597     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
598     _Node = KNodeWrite
599
600
601 class SimpleTests(unittest.TestCase):
602     
603     timeout = 10
604     DHT_DEFAULTS = {'PORT': 9977,
605                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
606                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
607                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
608                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
609                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
610                     'KEY_EXPIRE': 3600, 'SPEW': True, }
611
612     def setUp(self):
613         d = self.DHT_DEFAULTS.copy()
614         d['PORT'] = 4044
615         self.a = Khashmir(d)
616         d = self.DHT_DEFAULTS.copy()
617         d['PORT'] = 4045
618         self.b = Khashmir(d)
619         
620     def tearDown(self):
621         self.a.shutdown()
622         self.b.shutdown()
623         os.unlink(self.a.store.db)
624         os.unlink(self.b.store.db)
625
626     def testAddContact(self):
627         self.failUnlessEqual(len(self.a.table.buckets), 1)
628         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
629
630         self.failUnlessEqual(len(self.b.table.buckets), 1)
631         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
632
633         self.a.addContact('127.0.0.1', 4045)
634         reactor.iterate()
635         reactor.iterate()
636         reactor.iterate()
637         reactor.iterate()
638         reactor.iterate()
639         reactor.iterate()
640         reactor.iterate()
641         reactor.iterate()
642
643         self.failUnlessEqual(len(self.a.table.buckets), 1)
644         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
645         self.failUnlessEqual(len(self.b.table.buckets), 1)
646         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
647
648     def testStoreRetrieve(self):
649         self.a.addContact('127.0.0.1', 4045)
650         reactor.iterate()
651         reactor.iterate()
652         reactor.iterate()
653         reactor.iterate()
654         self.got = 0
655         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
656         reactor.iterate()
657         reactor.iterate()
658         reactor.iterate()
659         reactor.iterate()
660         reactor.iterate()
661         reactor.iterate()
662         self.a.valueForKey(sha('foo').digest(), self._cb)
663         reactor.iterate()
664         reactor.iterate()
665         reactor.iterate()
666         reactor.iterate()
667         reactor.iterate()
668         reactor.iterate()
669         reactor.iterate()
670         reactor.iterate()
671         reactor.iterate()
672         reactor.iterate()
673         reactor.iterate()
674         reactor.iterate()
675         reactor.iterate()
676         reactor.iterate()
677         reactor.iterate()
678         reactor.iterate()
679         reactor.iterate()
680         reactor.iterate()
681         reactor.iterate()
682         reactor.iterate()
683         reactor.iterate()
684
685     def _cb(self, key, val):
686         if not val:
687             self.failUnlessEqual(self.got, 1)
688         elif 'foobar' in val:
689             self.got = 1
690
691
692 class MultiTest(unittest.TestCase):
693     
694     timeout = 30
695     num = 20
696     DHT_DEFAULTS = {'PORT': 9977,
697                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
698                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
699                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
700                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
701                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
702                     'KEY_EXPIRE': 3600, 'SPEW': True, }
703
704     def _done(self, val):
705         self.done = 1
706         
707     def setUp(self):
708         self.l = []
709         self.startport = 4088
710         for i in range(self.num):
711             d = self.DHT_DEFAULTS.copy()
712             d['PORT'] = self.startport + i
713             self.l.append(Khashmir(d))
714         reactor.iterate()
715         reactor.iterate()
716         
717         for i in self.l:
718             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
719             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
720             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
721             reactor.iterate()
722             reactor.iterate()
723             reactor.iterate() 
724             reactor.iterate()
725             reactor.iterate()
726             reactor.iterate() 
727             
728         for i in self.l:
729             self.done = 0
730             i.findCloseNodes(self._done)
731             while not self.done:
732                 reactor.iterate()
733         for i in self.l:
734             self.done = 0
735             i.findCloseNodes(self._done)
736             while not self.done:
737                 reactor.iterate()
738
739     def tearDown(self):
740         for i in self.l:
741             i.shutdown()
742             os.unlink(i.store.db)
743             
744         reactor.iterate()
745         
746     def testStoreRetrieve(self):
747         for i in range(10):
748             K = newID()
749             V = newID()
750             
751             for a in range(3):
752                 self.done = 0
753                 def _scb(key, value, result):
754                     self.done = 1
755                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
756                 while not self.done:
757                     reactor.iterate()
758
759
760                 def _rcb(key, val):
761                     if not val:
762                         self.done = 1
763                         self.failUnlessEqual(self.got, 1)
764                     elif V in val:
765                         self.got = 1
766                 for x in range(3):
767                     self.got = 0
768                     self.done = 0
769                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
770                     while not self.done:
771                         reactor.iterate()