Only touch a bucket if a find request targets it.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
21
22 from db import DB
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
28 import krpc
29
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
36
37 class KhashmirBase(protocol.Factory):
38     """The base Khashmir class, with base functionality and find node, no key-value mappings.
39     
40     @type _Node: L{node.Node}
41     @ivar _Node: the knode implementation to use for this class of DHT
42     @type config: C{dictionary}
43     @ivar config: the configuration parameters for the DHT
44     @type port: C{int}
45     @ivar port: the port to listen on
46     @type store: L{db.DB}
47     @ivar store: the database to store nodes and key/value pairs in
48     @type node: L{node.Node}
49     @ivar node: this node
50     @type table: L{ktable.KTable}
51     @ivar table: the routing table
52     @type token_secrets: C{list} of C{string}
53     @ivar token_secrets: the current secrets to use to create tokens
54     @type stats: L{stats.StatsLogger}
55     @ivar stats: the statistics gatherer
56     @type udp: L{krpc.hostbroker}
57     @ivar udp: the factory for the KRPC protocol
58     @type listenport: L{twisted.internet.interfaces.IListeningPort}
59     @ivar listenport: the UDP listening port
60     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
61     @ivar next_checkpoint: the delayed call for the next checkpoint
62     """
63     
64     _Node = KNodeBase
65     
66     def __init__(self, config, cache_dir='/tmp'):
67         """Initialize the Khashmir class and call the L{setup} method.
68         
69         @type config: C{dictionary}
70         @param config: the configuration parameters for the DHT
71         @type cache_dir: C{string}
72         @param cache_dir: the directory to store all files in
73             (optional, defaults to the /tmp directory)
74         """
75         self.config = None
76         self.setup(config, cache_dir)
77         
78     def setup(self, config, cache_dir):
79         """Setup all the Khashmir sub-modules.
80         
81         @type config: C{dictionary}
82         @param config: the configuration parameters for the DHT
83         @type cache_dir: C{string}
84         @param cache_dir: the directory to store all files in
85         """
86         self.config = config
87         self.port = config['PORT']
88         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
89         self.node = self._loadSelfNode('', self.port)
90         self.table = KTable(self.node, config)
91         self.token_secrets = [newID()]
92         self.stats = StatsLogger(self.table, self.store)
93         
94         # Start listening
95         self.udp = krpc.hostbroker(self, self.stats, config)
96         self.udp.protocol = krpc.KRPC
97         self.listenport = reactor.listenUDP(self.port, self.udp)
98         
99         # Load the routing table and begin checkpointing
100         self._loadRoutingTable()
101         self.refreshTable(force = True)
102         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
103
104     def Node(self, id, host = None, port = None):
105         """Create a new node.
106         
107         @see: L{node.Node.__init__}
108         """
109         n = self._Node(id, host, port)
110         n.table = self.table
111         n.conn = self.udp.connectionForAddr((n.host, n.port))
112         return n
113     
114     def __del__(self):
115         """Stop listening for packets."""
116         self.listenport.stopListening()
117         
118     def _loadSelfNode(self, host, port):
119         """Create this node, loading any previously saved one."""
120         id = self.store.getSelfNode()
121         if not id:
122             id = newID()
123         return self._Node(id, host, port)
124         
125     def checkpoint(self):
126         """Perform some periodic maintenance operations."""
127         # Create a new token secret
128         self.token_secrets.insert(0, newID())
129         if len(self.token_secrets) > 3:
130             self.token_secrets.pop()
131             
132         # Save some parameters for reloading
133         self.store.saveSelfNode(self.node.id)
134         self.store.dumpRoutingTable(self.table.buckets)
135         
136         # DHT maintenance
137         self.store.expireValues(self.config['KEY_EXPIRE'])
138         self.refreshTable()
139         
140         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
141                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
142                                                  self.checkpoint)
143         
144     def _loadRoutingTable(self):
145         """Load the previous routing table nodes from the database.
146         
147         It's usually a good idea to call refreshTable(force = True) after
148         loading the table.
149         """
150         nodes = self.store.getRoutingTable()
151         for rec in nodes:
152             n = self.Node(rec[0], rec[1], int(rec[2]))
153             self.table.insertNode(n, contacted = False)
154             
155     #{ Local interface
156     def addContact(self, host, port, callback=None, errback=None):
157         """Ping this node and add the contact info to the table on pong.
158         
159         @type host: C{string}
160         @param host: the IP address of the node to contact
161         @type port: C{int}
162         @param port:the port of the node to contact
163         @type callback: C{method}
164         @param callback: the method to call with the results, it must take 1
165             parameter, the contact info returned by the node
166             (optional, defaults to doing nothing with the results)
167         @type errback: C{method}
168         @param errback: the method to call if an error occurs
169             (optional, defaults to calling the callback with the error)
170         """
171         n = self.Node(NULL_ID, host, port)
172         self.sendJoin(n, callback=callback, errback=errback)
173
174     def findNode(self, id, callback):
175         """Find the contact info for the K closest nodes in the global table.
176         
177         @type id: C{string}
178         @param id: the target ID to find the K closest nodes of
179         @type callback: C{method}
180         @param callback: the method to call with the results, it must take 1
181             parameter, the list of K closest nodes
182         """
183         # Mark the bucket as having been accessed
184         self.table.touch(id)
185         
186         # Start with our node
187         nodes = [copy(self.node)]
188
189         # Start the finding nodes action
190         state = FindNode(self, id, callback, self.config, self.stats)
191         reactor.callLater(0, state.goWithNodes, nodes)
192     
193     def insertNode(self, node, contacted = True):
194         """Try to insert a node in our local table, pinging oldest contact if necessary.
195         
196         If all you have is a host/port, then use L{addContact}, which calls this
197         method after receiving the PONG from the remote node. The reason for
198         the separation is we can't insert a node into the table without its
199         node ID. That means of course the node passed into this method needs
200         to be a properly formed Node object with a valid ID.
201
202         @type node: L{node.Node}
203         @param node: the new node to try and insert
204         @type contacted: C{boolean}
205         @param contacted: whether the new node is known to be good, i.e.
206             responded to a request (optional, defaults to True)
207         """
208         # Don't add any local nodes to the routing table
209         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
210             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
211             return
212         
213         old = self.table.insertNode(node, contacted=contacted)
214
215         if (isinstance(old, self._Node) and old.id != self.node.id and
216             (datetime.now() - old.lastSeen) > 
217              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
218             
219             # Bucket is full, check to see if old node is still available
220             self.stats.startedAction('ping')
221             df = old.ping(self.node.id)
222             df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
223                             callbackArgs = (old, datetime.now()),
224                             errbackArgs = (old, datetime.now(), node, contacted))
225         elif not old and not contacted:
226             # There's room, we just need to contact the node first
227             self.stats.startedAction('ping')
228             df = node.ping(self.node.id)
229             # Convert the returned contact info into a node
230             df.addCallback(self._pongHandler, datetime.now())
231             # Try adding the contacted node
232             df.addCallbacks(self.insertNode, self._pongError,
233                             errbackArgs = (node, datetime.now()))
234
235     def _freshNodeHandler(self, dict, old, start):
236         """Got a pong from the old node, so update it."""
237         self.stats.completedAction('ping', start)
238         if dict['id'] == old.id:
239             self.table.justSeenNode(old.id)
240     
241     def _staleNodeHandler(self, err, old, start, node, contacted):
242         """The pinged node never responded, so replace it."""
243         log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
244         self.stats.completedAction('ping', start)
245         self.table.invalidateNode(old)
246         self.insertNode(node, contacted)
247     
248     def _pongHandler(self, dict, start):
249         """Node responded properly, change response into a node to insert."""
250         self.stats.completedAction('ping', start)
251         # Create the node using the returned contact info
252         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
253         return n
254
255     def _pongError(self, err, node, start):
256         """Error occurred, fail node and errback or callback with error."""
257         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
258         self.stats.completedAction('ping', start)
259         self.table.nodeFailed(node)
260     
261     def sendJoin(self, node, callback=None, errback=None):
262         """Join the DHT by pinging a bootstrap node.
263         
264         @type node: L{node.Node}
265         @param node: the node to send the join to
266         @type callback: C{method}
267         @param callback: the method to call with the results, it must take 1
268             parameter, the contact info returned by the node
269             (optional, defaults to doing nothing with the results)
270         @type errback: C{method}
271         @param errback: the method to call if an error occurs
272             (optional, defaults to calling the callback with the error)
273         """
274         if errback is None:
275             errback = callback
276         self.stats.startedAction('join')
277         df = node.join(self.node.id)
278         df.addCallbacks(self._joinHandler, self._joinError,
279                         callbackArgs = (node, datetime.now()),
280                         errbackArgs = (node, datetime.now()))
281         if callback:
282             df.addCallbacks(callback, errback)
283
284     def _joinHandler(self, dict, node, start):
285         """Node responded properly, extract the response."""
286         self.stats.completedAction('join', start)
287         # Create the node using the returned contact info
288         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
289         reactor.callLater(0, self.insertNode, n)
290         return (dict['ip_addr'], dict['port'])
291
292     def _joinError(self, err, node, start):
293         """Error occurred, fail node."""
294         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
295         self.stats.completedAction('join', start)
296         self.table.nodeFailed(node)
297         return err
298         
299     def findCloseNodes(self, callback=lambda a: None):
300         """Perform a findNode on the ID one away from our own.
301
302         This will allow us to populate our table with nodes on our network
303         closest to our own. This is called as soon as we start up with an
304         empty table.
305
306         @type callback: C{method}
307         @param callback: the method to call with the results, it must take 1
308             parameter, the list of K closest nodes
309             (optional, defaults to doing nothing with the results)
310         """
311         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
312         self.findNode(id, callback)
313
314     def refreshTable(self, force = False):
315         """Check all the buckets for those that need refreshing.
316         
317         @param force: refresh all buckets regardless of last bucket access time
318             (optional, defaults to False)
319         """
320         def callback(nodes):
321             pass
322     
323         for bucket in self.table.buckets:
324             if force or (datetime.now() - bucket.lastAccessed > 
325                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
326                 # Choose a random ID in the bucket and try and find it
327                 id = newIDInRange(bucket.min, bucket.max)
328                 self.findNode(id, callback)
329
330     def shutdown(self):
331         """Closes the port and cancels pending later calls."""
332         self.listenport.stopListening()
333         try:
334             self.next_checkpoint.cancel()
335         except:
336             pass
337         self.store.close()
338     
339     def getStats(self):
340         """Gather the statistics for the DHT."""
341         return self.stats.formatHTML()
342
343     #{ Remote interface
344     def krpc_ping(self, id, _krpc_sender = None):
345         """Pong with our ID.
346         
347         @type id: C{string}
348         @param id: the node ID of the sender node
349         @type _krpc_sender: (C{string}, C{int})
350         @param _krpc_sender: the sender node's IP address and port
351         """
352         if _krpc_sender is not None:
353             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
354             reactor.callLater(0, self.insertNode, n, False)
355
356         return {"id" : self.node.id}
357         
358     def krpc_join(self, id, _krpc_sender = None):
359         """Add the node by responding with its address and port.
360         
361         @type id: C{string}
362         @param id: the node ID of the sender node
363         @type _krpc_sender: (C{string}, C{int})
364         @param _krpc_sender: the sender node's IP address and port
365         """
366         if _krpc_sender is not None:
367             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
368             reactor.callLater(0, self.insertNode, n, False)
369         else:
370             _krpc_sender = ('127.0.0.1', self.port)
371
372         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
373         
374     def krpc_find_node(self, id, target, _krpc_sender = None):
375         """Find the K closest nodes to the target in the local routing table.
376         
377         @type target: C{string}
378         @param target: the target ID to find nodes for
379         @type id: C{string}
380         @param id: the node ID of the sender node
381         @type _krpc_sender: (C{string}, C{int})
382         @param _krpc_sender: the sender node's IP address and port
383         """
384         if _krpc_sender is not None:
385             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
386             reactor.callLater(0, self.insertNode, n, False)
387         else:
388             _krpc_sender = ('127.0.0.1', self.port)
389
390         nodes = self.table.findNodes(target)
391         nodes = map(lambda node: node.contactInfo(), nodes)
392         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
393         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
394
395
396 class KhashmirRead(KhashmirBase):
397     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
398
399     _Node = KNodeRead
400
401     #{ Local interface
402     def findValue(self, key, callback):
403         """Get the nodes that have values for the key from the global table.
404         
405         @type key: C{string}
406         @param key: the target key to find the values for
407         @type callback: C{method}
408         @param callback: the method to call with the results, it must take 1
409             parameter, the list of nodes with values
410         """
411         # Mark the bucket as having been accessed
412         self.table.touch(key)
413         
414         # Start with ourself
415         nodes = [copy(self.node)]
416         
417         # Search for others starting with the locally found ones
418         state = FindValue(self, key, callback, self.config, self.stats)
419         reactor.callLater(0, state.goWithNodes, nodes)
420
421     def valueForKey(self, key, callback, searchlocal = True):
422         """Get the values found for key in global table.
423         
424         Callback will be called with a list of values for each peer that
425         returns unique values. The final callback will be an empty list.
426
427         @type key: C{string}
428         @param key: the target key to get the values for
429         @type callback: C{method}
430         @param callback: the method to call with the results, it must take 2
431             parameters: the key, and the values found
432         @type searchlocal: C{boolean}
433         @param searchlocal: whether to also look for any local values
434         """
435
436         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
437             """Use the found nodes to send requests for values to."""
438             # Get any local values
439             if searchlocal:
440                 l = self.store.retrieveValues(key)
441                 if len(l) > 0:
442                     node = copy(self.node)
443                     node.updateNumValues(len(l))
444                     nodes = nodes + [node]
445
446             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
447             reactor.callLater(0, state.goWithNodes, nodes)
448             
449         # First lookup nodes that have values for the key
450         self.findValue(key, _getValueForKey)
451
452     #{ Remote interface
453     def krpc_find_value(self, id, key, _krpc_sender = None):
454         """Find the number of values stored locally for the key, and the K closest nodes.
455         
456         @type key: C{string}
457         @param key: the target key to find the values and nodes for
458         @type id: C{string}
459         @param id: the node ID of the sender node
460         @type _krpc_sender: (C{string}, C{int})
461         @param _krpc_sender: the sender node's IP address and port
462         """
463         if _krpc_sender is not None:
464             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
465             reactor.callLater(0, self.insertNode, n, False)
466     
467         nodes = self.table.findNodes(key)
468         nodes = map(lambda node: node.contactInfo(), nodes)
469         num_values = self.store.countValues(key)
470         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
471
472     def krpc_get_value(self, id, key, num, _krpc_sender = None):
473         """Retrieve the values stored locally for the key.
474         
475         @type key: C{string}
476         @param key: the target key to retrieve the values for
477         @type num: C{int}
478         @param num: the maximum number of values to retrieve, or 0 to
479             retrieve all of them
480         @type id: C{string}
481         @param id: the node ID of the sender node
482         @type _krpc_sender: (C{string}, C{int})
483         @param _krpc_sender: the sender node's IP address and port
484         """
485         if _krpc_sender is not None:
486             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
487             reactor.callLater(0, self.insertNode, n, False)
488     
489         l = self.store.retrieveValues(key)
490         if num == 0 or num >= len(l):
491             return {'values' : l, "id": self.node.id}
492         else:
493             shuffle(l)
494             return {'values' : l[:num], "id": self.node.id}
495
496
497 class KhashmirWrite(KhashmirRead):
498     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
499
500     _Node = KNodeWrite
501
502     #{ Local interface
503     def storeValueForKey(self, key, value, callback=None):
504         """Stores the value for the key in the global table.
505         
506         No status in this implementation, peers respond but don't indicate
507         status of storing values.
508
509         @type key: C{string}
510         @param key: the target key to store the value for
511         @type value: C{string}
512         @param value: the value to store with the key
513         @type callback: C{method}
514         @param callback: the method to call with the results, it must take 3
515             parameters: the key, the value stored, and the result of the store
516             (optional, defaults to doing nothing with the results)
517         """
518         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
519             """Use the returned K closest nodes to store the key at."""
520             if not response:
521                 def _storedValueHandler(key, value, sender):
522                     """Default callback that does nothing."""
523                     pass
524                 response = _storedValueHandler
525             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
526             reactor.callLater(0, action.goWithNodes, nodes)
527             
528         # First find the K closest nodes to operate on.
529         self.findNode(key, _storeValueForKey)
530                     
531     #{ Remote interface
532     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
533         """Store the value locally with the key.
534         
535         @type key: C{string}
536         @param key: the target key to store the value for
537         @type value: C{string}
538         @param value: the value to store with the key
539         @param token: the token to confirm that this peer contacted us previously
540         @type id: C{string}
541         @param id: the node ID of the sender node
542         @type _krpc_sender: (C{string}, C{int})
543         @param _krpc_sender: the sender node's IP address and port
544         """
545         if _krpc_sender is not None:
546             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
547             reactor.callLater(0, self.insertNode, n, False)
548         else:
549             _krpc_sender = ('127.0.0.1', self.port)
550
551         for secret in self.token_secrets:
552             this_token = sha(secret + _krpc_sender[0]).digest()
553             if token == this_token:
554                 self.store.storeValue(key, value)
555                 return {"id" : self.node.id}
556         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
557
558
559 class Khashmir(KhashmirWrite):
560     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
561     _Node = KNodeWrite
562
563
564 class SimpleTests(unittest.TestCase):
565     
566     timeout = 10
567     DHT_DEFAULTS = {'PORT': 9977,
568                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
569                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
570                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
571                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
572                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
573                     'KEY_EXPIRE': 3600, 'SPEW': True, }
574
575     def setUp(self):
576         d = self.DHT_DEFAULTS.copy()
577         d['PORT'] = 4044
578         self.a = Khashmir(d)
579         d = self.DHT_DEFAULTS.copy()
580         d['PORT'] = 4045
581         self.b = Khashmir(d)
582         
583     def tearDown(self):
584         self.a.shutdown()
585         self.b.shutdown()
586         os.unlink(self.a.store.db)
587         os.unlink(self.b.store.db)
588
589     def testAddContact(self):
590         self.failUnlessEqual(len(self.a.table.buckets), 1)
591         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
592
593         self.failUnlessEqual(len(self.b.table.buckets), 1)
594         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
595
596         self.a.addContact('127.0.0.1', 4045)
597         reactor.iterate()
598         reactor.iterate()
599         reactor.iterate()
600         reactor.iterate()
601
602         self.failUnlessEqual(len(self.a.table.buckets), 1)
603         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
604         self.failUnlessEqual(len(self.b.table.buckets), 1)
605         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
606
607     def testStoreRetrieve(self):
608         self.a.addContact('127.0.0.1', 4045)
609         reactor.iterate()
610         reactor.iterate()
611         reactor.iterate()
612         reactor.iterate()
613         self.got = 0
614         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
615         reactor.iterate()
616         reactor.iterate()
617         reactor.iterate()
618         reactor.iterate()
619         reactor.iterate()
620         reactor.iterate()
621         self.a.valueForKey(sha('foo').digest(), self._cb)
622         reactor.iterate()
623         reactor.iterate()
624         reactor.iterate()
625         reactor.iterate()
626         reactor.iterate()
627         reactor.iterate()
628         reactor.iterate()
629
630     def _cb(self, key, val):
631         if not val:
632             self.failUnlessEqual(self.got, 1)
633         elif 'foobar' in val:
634             self.got = 1
635
636
637 class MultiTest(unittest.TestCase):
638     
639     timeout = 30
640     num = 20
641     DHT_DEFAULTS = {'PORT': 9977,
642                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
643                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
644                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
645                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
646                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
647                     'KEY_EXPIRE': 3600, 'SPEW': True, }
648
649     def _done(self, val):
650         self.done = 1
651         
652     def setUp(self):
653         self.l = []
654         self.startport = 4088
655         for i in range(self.num):
656             d = self.DHT_DEFAULTS.copy()
657             d['PORT'] = self.startport + i
658             self.l.append(Khashmir(d))
659         reactor.iterate()
660         reactor.iterate()
661         
662         for i in self.l:
663             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
664             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
665             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
666             reactor.iterate()
667             reactor.iterate()
668             reactor.iterate() 
669             
670         for i in self.l:
671             self.done = 0
672             i.findCloseNodes(self._done)
673             while not self.done:
674                 reactor.iterate()
675         for i in self.l:
676             self.done = 0
677             i.findCloseNodes(self._done)
678             while not self.done:
679                 reactor.iterate()
680
681     def tearDown(self):
682         for i in self.l:
683             i.shutdown()
684             os.unlink(i.store.db)
685             
686         reactor.iterate()
687         
688     def testStoreRetrieve(self):
689         for i in range(10):
690             K = newID()
691             V = newID()
692             
693             for a in range(3):
694                 self.done = 0
695                 def _scb(key, value, result):
696                     self.done = 1
697                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
698                 while not self.done:
699                     reactor.iterate()
700
701
702                 def _rcb(key, val):
703                     if not val:
704                         self.done = 1
705                         self.failUnlessEqual(self.got, 1)
706                     elif V in val:
707                         self.got = 1
708                 for x in range(3):
709                     self.got = 0
710                     self.done = 0
711                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
712                     while not self.done:
713                         reactor.iterate()