Only add nodes to the routing table that have responded to a request.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
21
22 from db import DB
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
28 import krpc
29
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(172\.0?([1][6-9])|([2][0-9])|([3][0-1])\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
34
35 class KhashmirBase(protocol.Factory):
36     """The base Khashmir class, with base functionality and find node, no key-value mappings.
37     
38     @type _Node: L{node.Node}
39     @ivar _Node: the knode implementation to use for this class of DHT
40     @type config: C{dictionary}
41     @ivar config: the configuration parameters for the DHT
42     @type port: C{int}
43     @ivar port: the port to listen on
44     @type store: L{db.DB}
45     @ivar store: the database to store nodes and key/value pairs in
46     @type node: L{node.Node}
47     @ivar node: this node
48     @type table: L{ktable.KTable}
49     @ivar table: the routing table
50     @type token_secrets: C{list} of C{string}
51     @ivar token_secrets: the current secrets to use to create tokens
52     @type stats: L{stats.StatsLogger}
53     @ivar stats: the statistics gatherer
54     @type udp: L{krpc.hostbroker}
55     @ivar udp: the factory for the KRPC protocol
56     @type listenport: L{twisted.internet.interfaces.IListeningPort}
57     @ivar listenport: the UDP listening port
58     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
59     @ivar next_checkpoint: the delayed call for the next checkpoint
60     """
61     
62     _Node = KNodeBase
63     
64     def __init__(self, config, cache_dir='/tmp'):
65         """Initialize the Khashmir class and call the L{setup} method.
66         
67         @type config: C{dictionary}
68         @param config: the configuration parameters for the DHT
69         @type cache_dir: C{string}
70         @param cache_dir: the directory to store all files in
71             (optional, defaults to the /tmp directory)
72         """
73         self.config = None
74         self.setup(config, cache_dir)
75         
76     def setup(self, config, cache_dir):
77         """Setup all the Khashmir sub-modules.
78         
79         @type config: C{dictionary}
80         @param config: the configuration parameters for the DHT
81         @type cache_dir: C{string}
82         @param cache_dir: the directory to store all files in
83         """
84         self.config = config
85         self.port = config['PORT']
86         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
87         self.node = self._loadSelfNode('', self.port)
88         self.table = KTable(self.node, config)
89         self.token_secrets = [newID()]
90         self.stats = StatsLogger(self.table, self.store)
91         
92         # Start listening
93         self.udp = krpc.hostbroker(self, self.stats, config)
94         self.udp.protocol = krpc.KRPC
95         self.listenport = reactor.listenUDP(self.port, self.udp)
96         
97         # Load the routing table and begin checkpointing
98         self._loadRoutingTable()
99         self.refreshTable(force = True)
100         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
101
102     def Node(self, id, host = None, port = None):
103         """Create a new node.
104         
105         @see: L{node.Node.__init__}
106         """
107         n = self._Node(id, host, port)
108         n.table = self.table
109         n.conn = self.udp.connectionForAddr((n.host, n.port))
110         return n
111     
112     def __del__(self):
113         """Stop listening for packets."""
114         self.listenport.stopListening()
115         
116     def _loadSelfNode(self, host, port):
117         """Create this node, loading any previously saved one."""
118         id = self.store.getSelfNode()
119         if not id:
120             id = newID()
121         return self._Node(id, host, port)
122         
123     def checkpoint(self):
124         """Perform some periodic maintenance operations."""
125         # Create a new token secret
126         self.token_secrets.insert(0, newID())
127         if len(self.token_secrets) > 3:
128             self.token_secrets.pop()
129             
130         # Save some parameters for reloading
131         self.store.saveSelfNode(self.node.id)
132         self.store.dumpRoutingTable(self.table.buckets)
133         
134         # DHT maintenance
135         self.store.expireValues(self.config['KEY_EXPIRE'])
136         self.refreshTable()
137         
138         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
139                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
140                                                  self.checkpoint)
141         
142     def _loadRoutingTable(self):
143         """Load the previous routing table nodes from the database.
144         
145         It's usually a good idea to call refreshTable(force = True) after
146         loading the table.
147         """
148         nodes = self.store.getRoutingTable()
149         for rec in nodes:
150             n = self.Node(rec[0], rec[1], int(rec[2]))
151             self.table.insertNode(n, contacted = False)
152             
153     #{ Local interface
154     def addContact(self, host, port, callback=None, errback=None):
155         """Ping this node and add the contact info to the table on pong.
156         
157         @type host: C{string}
158         @param host: the IP address of the node to contact
159         @type port: C{int}
160         @param port:the port of the node to contact
161         @type callback: C{method}
162         @param callback: the method to call with the results, it must take 1
163             parameter, the contact info returned by the node
164             (optional, defaults to doing nothing with the results)
165         @type errback: C{method}
166         @param errback: the method to call if an error occurs
167             (optional, defaults to calling the callback with the error)
168         """
169         n = self.Node(NULL_ID, host, port)
170         self.sendJoin(n, callback=callback, errback=errback)
171
172     def findNode(self, id, callback):
173         """Find the contact info for the K closest nodes in the global table.
174         
175         @type id: C{string}
176         @param id: the target ID to find the K closest nodes of
177         @type callback: C{method}
178         @param callback: the method to call with the results, it must take 1
179             parameter, the list of K closest nodes
180         """
181         # Start with our node
182         nodes = [copy(self.node)]
183
184         # Start the finding nodes action
185         state = FindNode(self, id, callback, self.config, self.stats)
186         reactor.callLater(0, state.goWithNodes, nodes)
187     
188     def insertNode(self, node, contacted = True):
189         """Try to insert a node in our local table, pinging oldest contact if necessary.
190         
191         If all you have is a host/port, then use L{addContact}, which calls this
192         method after receiving the PONG from the remote node. The reason for
193         the separation is we can't insert a node into the table without its
194         node ID. That means of course the node passed into this method needs
195         to be a properly formed Node object with a valid ID.
196
197         @type node: L{node.Node}
198         @param node: the new node to try and insert
199         @type contacted: C{boolean}
200         @param contacted: whether the new node is known to be good, i.e.
201             responded to a request (optional, defaults to True)
202         """
203         # Don't add any local nodes to the routing table
204         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
205             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
206             return
207         
208         old = self.table.insertNode(node, contacted=contacted)
209
210         if (isinstance(old, self._Node) and old.id != self.node.id and
211             (datetime.now() - old.lastSeen) > 
212              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
213             
214             # Bucket is full, check to see if old node is still available
215             self.stats.startedAction('ping')
216             df = old.ping(self.node.id)
217             df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
218                             callbackArgs = (old, datetime.now()),
219                             errbackArgs = (old, datetime.now(), node, contacted))
220         elif not old and not contacted:
221             # There's room, we just need to contact the node first
222             self.stats.startedAction('ping')
223             df = node.ping(self.node.id)
224             # Convert the returned contact info into a node
225             df.addCallback(self._pongHandler, datetime.now())
226             # Try adding the contacted node
227             df.addCallbacks(self.insertNode, self._pongError,
228                             errbackArgs = (node, datetime.now()))
229
230     def _freshNodeHandler(self, dict, old, start):
231         """Got a pong from the old node, so update it."""
232         self.stats.completedAction('ping', start)
233         if dict['id'] == old.id:
234             self.table.justSeenNode(old.id)
235     
236     def _staleNodeHandler(self, err, old, start, node, contacted):
237         """The pinged node never responded, so replace it."""
238         log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
239         self.stats.completedAction('ping', start)
240         self.table.invalidateNode(old)
241         self.insertNode(node, contacted)
242     
243     def _pongHandler(self, dict, start):
244         """Node responded properly, change response into a node to insert."""
245         self.stats.completedAction('ping', start)
246         # Create the node using the returned contact info
247         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
248         return n
249
250     def _pongError(self, err, node, start):
251         """Error occurred, fail node and errback or callback with error."""
252         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
253         self.stats.completedAction('ping', start)
254         self.table.nodeFailed(node)
255     
256     def sendJoin(self, node, callback=None, errback=None):
257         """Join the DHT by pinging a bootstrap node.
258         
259         @type node: L{node.Node}
260         @param node: the node to send the join to
261         @type callback: C{method}
262         @param callback: the method to call with the results, it must take 1
263             parameter, the contact info returned by the node
264             (optional, defaults to doing nothing with the results)
265         @type errback: C{method}
266         @param errback: the method to call if an error occurs
267             (optional, defaults to calling the callback with the error)
268         """
269         if errback is None:
270             errback = callback
271         self.stats.startedAction('join')
272         df = node.join(self.node.id)
273         df.addCallbacks(self._joinHandler, self._joinError,
274                         callbackArgs = (node, datetime.now()),
275                         errbackArgs = (node, datetime.now()))
276         if callback:
277             df.addCallbacks(callback, errback)
278
279     def _joinHandler(self, dict, node, start):
280         """Node responded properly, extract the response."""
281         self.stats.completedAction('join', start)
282         # Create the node using the returned contact info
283         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284         reactor.callLater(0, self.insertNode, n)
285         return (dict['ip_addr'], dict['port'])
286
287     def _joinError(self, err, node, start):
288         """Error occurred, fail node."""
289         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290         self.stats.completedAction('join', start)
291         self.table.nodeFailed(node)
292         return err
293         
294     def findCloseNodes(self, callback=lambda a: None):
295         """Perform a findNode on the ID one away from our own.
296
297         This will allow us to populate our table with nodes on our network
298         closest to our own. This is called as soon as we start up with an
299         empty table.
300
301         @type callback: C{method}
302         @param callback: the method to call with the results, it must take 1
303             parameter, the list of K closest nodes
304             (optional, defaults to doing nothing with the results)
305         """
306         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
307         self.findNode(id, callback)
308
309     def refreshTable(self, force = False):
310         """Check all the buckets for those that need refreshing.
311         
312         @param force: refresh all buckets regardless of last bucket access time
313             (optional, defaults to False)
314         """
315         def callback(nodes):
316             pass
317     
318         for bucket in self.table.buckets:
319             if force or (datetime.now() - bucket.lastAccessed > 
320                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
321                 # Choose a random ID in the bucket and try and find it
322                 id = newIDInRange(bucket.min, bucket.max)
323                 self.findNode(id, callback)
324
325     def shutdown(self):
326         """Closes the port and cancels pending later calls."""
327         self.listenport.stopListening()
328         try:
329             self.next_checkpoint.cancel()
330         except:
331             pass
332         self.store.close()
333     
334     def getStats(self):
335         """Gather the statistics for the DHT."""
336         return self.stats.formatHTML()
337
338     #{ Remote interface
339     def krpc_ping(self, id, _krpc_sender = None):
340         """Pong with our ID.
341         
342         @type id: C{string}
343         @param id: the node ID of the sender node
344         @type _krpc_sender: (C{string}, C{int})
345         @param _krpc_sender: the sender node's IP address and port
346         """
347         if _krpc_sender is not None:
348             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
349             reactor.callLater(0, self.insertNode, n, False)
350
351         return {"id" : self.node.id}
352         
353     def krpc_join(self, id, _krpc_sender = None):
354         """Add the node by responding with its address and port.
355         
356         @type id: C{string}
357         @param id: the node ID of the sender node
358         @type _krpc_sender: (C{string}, C{int})
359         @param _krpc_sender: the sender node's IP address and port
360         """
361         if _krpc_sender is not None:
362             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
363             reactor.callLater(0, self.insertNode, n, False)
364         else:
365             _krpc_sender = ('127.0.0.1', self.port)
366
367         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
368         
369     def krpc_find_node(self, id, target, _krpc_sender = None):
370         """Find the K closest nodes to the target in the local routing table.
371         
372         @type target: C{string}
373         @param target: the target ID to find nodes for
374         @type id: C{string}
375         @param id: the node ID of the sender node
376         @type _krpc_sender: (C{string}, C{int})
377         @param _krpc_sender: the sender node's IP address and port
378         """
379         if _krpc_sender is not None:
380             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
381             reactor.callLater(0, self.insertNode, n, False)
382         else:
383             _krpc_sender = ('127.0.0.1', self.port)
384
385         nodes = self.table.findNodes(target)
386         nodes = map(lambda node: node.contactInfo(), nodes)
387         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
388         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
389
390
391 class KhashmirRead(KhashmirBase):
392     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
393
394     _Node = KNodeRead
395
396     #{ Local interface
397     def findValue(self, key, callback):
398         """Get the nodes that have values for the key from the global table.
399         
400         @type key: C{string}
401         @param key: the target key to find the values for
402         @type callback: C{method}
403         @param callback: the method to call with the results, it must take 1
404             parameter, the list of nodes with values
405         """
406         # Start with ourself
407         nodes = [copy(self.node)]
408         
409         # Search for others starting with the locally found ones
410         state = FindValue(self, key, callback, self.config, self.stats)
411         reactor.callLater(0, state.goWithNodes, nodes)
412
413     def valueForKey(self, key, callback, searchlocal = True):
414         """Get the values found for key in global table.
415         
416         Callback will be called with a list of values for each peer that
417         returns unique values. The final callback will be an empty list.
418
419         @type key: C{string}
420         @param key: the target key to get the values for
421         @type callback: C{method}
422         @param callback: the method to call with the results, it must take 2
423             parameters: the key, and the values found
424         @type searchlocal: C{boolean}
425         @param searchlocal: whether to also look for any local values
426         """
427
428         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
429             """Use the found nodes to send requests for values to."""
430             # Get any local values
431             if searchlocal:
432                 l = self.store.retrieveValues(key)
433                 if len(l) > 0:
434                     node = copy(self.node)
435                     node.updateNumValues(len(l))
436                     nodes = nodes + [node]
437
438             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
439             reactor.callLater(0, state.goWithNodes, nodes)
440             
441         # First lookup nodes that have values for the key
442         self.findValue(key, _getValueForKey)
443
444     #{ Remote interface
445     def krpc_find_value(self, id, key, _krpc_sender = None):
446         """Find the number of values stored locally for the key, and the K closest nodes.
447         
448         @type key: C{string}
449         @param key: the target key to find the values and nodes for
450         @type id: C{string}
451         @param id: the node ID of the sender node
452         @type _krpc_sender: (C{string}, C{int})
453         @param _krpc_sender: the sender node's IP address and port
454         """
455         if _krpc_sender is not None:
456             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
457             reactor.callLater(0, self.insertNode, n, False)
458     
459         nodes = self.table.findNodes(key)
460         nodes = map(lambda node: node.contactInfo(), nodes)
461         num_values = self.store.countValues(key)
462         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
463
464     def krpc_get_value(self, id, key, num, _krpc_sender = None):
465         """Retrieve the values stored locally for the key.
466         
467         @type key: C{string}
468         @param key: the target key to retrieve the values for
469         @type num: C{int}
470         @param num: the maximum number of values to retrieve, or 0 to
471             retrieve all of them
472         @type id: C{string}
473         @param id: the node ID of the sender node
474         @type _krpc_sender: (C{string}, C{int})
475         @param _krpc_sender: the sender node's IP address and port
476         """
477         if _krpc_sender is not None:
478             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
479             reactor.callLater(0, self.insertNode, n, False)
480     
481         l = self.store.retrieveValues(key)
482         if num == 0 or num >= len(l):
483             return {'values' : l, "id": self.node.id}
484         else:
485             shuffle(l)
486             return {'values' : l[:num], "id": self.node.id}
487
488
489 class KhashmirWrite(KhashmirRead):
490     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
491
492     _Node = KNodeWrite
493
494     #{ Local interface
495     def storeValueForKey(self, key, value, callback=None):
496         """Stores the value for the key in the global table.
497         
498         No status in this implementation, peers respond but don't indicate
499         status of storing values.
500
501         @type key: C{string}
502         @param key: the target key to store the value for
503         @type value: C{string}
504         @param value: the value to store with the key
505         @type callback: C{method}
506         @param callback: the method to call with the results, it must take 3
507             parameters: the key, the value stored, and the result of the store
508             (optional, defaults to doing nothing with the results)
509         """
510         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
511             """Use the returned K closest nodes to store the key at."""
512             if not response:
513                 def _storedValueHandler(key, value, sender):
514                     """Default callback that does nothing."""
515                     pass
516                 response = _storedValueHandler
517             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
518             reactor.callLater(0, action.goWithNodes, nodes)
519             
520         # First find the K closest nodes to operate on.
521         self.findNode(key, _storeValueForKey)
522                     
523     #{ Remote interface
524     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
525         """Store the value locally with the key.
526         
527         @type key: C{string}
528         @param key: the target key to store the value for
529         @type value: C{string}
530         @param value: the value to store with the key
531         @param token: the token to confirm that this peer contacted us previously
532         @type id: C{string}
533         @param id: the node ID of the sender node
534         @type _krpc_sender: (C{string}, C{int})
535         @param _krpc_sender: the sender node's IP address and port
536         """
537         if _krpc_sender is not None:
538             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
539             reactor.callLater(0, self.insertNode, n, False)
540         else:
541             _krpc_sender = ('127.0.0.1', self.port)
542
543         for secret in self.token_secrets:
544             this_token = sha(secret + _krpc_sender[0]).digest()
545             if token == this_token:
546                 self.store.storeValue(key, value)
547                 return {"id" : self.node.id}
548         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
549
550
551 class Khashmir(KhashmirWrite):
552     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
553     _Node = KNodeWrite
554
555
556 class SimpleTests(unittest.TestCase):
557     
558     timeout = 10
559     DHT_DEFAULTS = {'PORT': 9977,
560                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
561                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
562                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
563                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
564                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
565                     'KEY_EXPIRE': 3600, 'SPEW': True, }
566
567     def setUp(self):
568         d = self.DHT_DEFAULTS.copy()
569         d['PORT'] = 4044
570         self.a = Khashmir(d)
571         d = self.DHT_DEFAULTS.copy()
572         d['PORT'] = 4045
573         self.b = Khashmir(d)
574         
575     def tearDown(self):
576         self.a.shutdown()
577         self.b.shutdown()
578         os.unlink(self.a.store.db)
579         os.unlink(self.b.store.db)
580
581     def testAddContact(self):
582         self.failUnlessEqual(len(self.a.table.buckets), 1)
583         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
584
585         self.failUnlessEqual(len(self.b.table.buckets), 1)
586         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
587
588         self.a.addContact('127.0.0.1', 4045)
589         reactor.iterate()
590         reactor.iterate()
591         reactor.iterate()
592         reactor.iterate()
593
594         self.failUnlessEqual(len(self.a.table.buckets), 1)
595         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
596         self.failUnlessEqual(len(self.b.table.buckets), 1)
597         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
598
599     def testStoreRetrieve(self):
600         self.a.addContact('127.0.0.1', 4045)
601         reactor.iterate()
602         reactor.iterate()
603         reactor.iterate()
604         reactor.iterate()
605         self.got = 0
606         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
607         reactor.iterate()
608         reactor.iterate()
609         reactor.iterate()
610         reactor.iterate()
611         reactor.iterate()
612         reactor.iterate()
613         self.a.valueForKey(sha('foo').digest(), self._cb)
614         reactor.iterate()
615         reactor.iterate()
616         reactor.iterate()
617         reactor.iterate()
618         reactor.iterate()
619         reactor.iterate()
620         reactor.iterate()
621
622     def _cb(self, key, val):
623         if not val:
624             self.failUnlessEqual(self.got, 1)
625         elif 'foobar' in val:
626             self.got = 1
627
628
629 class MultiTest(unittest.TestCase):
630     
631     timeout = 30
632     num = 20
633     DHT_DEFAULTS = {'PORT': 9977,
634                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
635                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
636                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
637                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
638                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
639                     'KEY_EXPIRE': 3600, 'SPEW': True, }
640
641     def _done(self, val):
642         self.done = 1
643         
644     def setUp(self):
645         self.l = []
646         self.startport = 4088
647         for i in range(self.num):
648             d = self.DHT_DEFAULTS.copy()
649             d['PORT'] = self.startport + i
650             self.l.append(Khashmir(d))
651         reactor.iterate()
652         reactor.iterate()
653         
654         for i in self.l:
655             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
656             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
657             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
658             reactor.iterate()
659             reactor.iterate()
660             reactor.iterate() 
661             
662         for i in self.l:
663             self.done = 0
664             i.findCloseNodes(self._done)
665             while not self.done:
666                 reactor.iterate()
667         for i in self.l:
668             self.done = 0
669             i.findCloseNodes(self._done)
670             while not self.done:
671                 reactor.iterate()
672
673     def tearDown(self):
674         for i in self.l:
675             i.shutdown()
676             os.unlink(i.store.db)
677             
678         reactor.iterate()
679         
680     def testStoreRetrieve(self):
681         for i in range(10):
682             K = newID()
683             V = newID()
684             
685             for a in range(3):
686                 self.done = 0
687                 def _scb(key, value, result):
688                     self.done = 1
689                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
690                 while not self.done:
691                     reactor.iterate()
692
693
694                 def _rcb(key, val):
695                     if not val:
696                         self.done = 1
697                         self.failUnlessEqual(self.got, 1)
698                     elif V in val:
699                         self.got = 1
700                 for x in range(3):
701                     self.got = 0
702                     self.done = 0
703                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
704                     while not self.done:
705                         reactor.iterate()