Use the version number in the Khashmir node ID.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
22
23 from db import DB
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
29 import krpc
30
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37
38 class KhashmirBase(protocol.Factory):
39     """The base Khashmir class, with base functionality and find node, no key-value mappings.
40     
41     @type _Node: L{node.Node}
42     @ivar _Node: the knode implementation to use for this class of DHT
43     @type config: C{dictionary}
44     @ivar config: the configuration parameters for the DHT
45     @type pinging: C{dictionary}
46     @ivar pinging: the node's that are currently being pinged, keys are the
47         node id's, values are the Deferred or DelayedCall objects
48     @type port: C{int}
49     @ivar port: the port to listen on
50     @type store: L{db.DB}
51     @ivar store: the database to store nodes and key/value pairs in
52     @type node: L{node.Node}
53     @ivar node: this node
54     @type table: L{ktable.KTable}
55     @ivar table: the routing table
56     @type token_secrets: C{list} of C{string}
57     @ivar token_secrets: the current secrets to use to create tokens
58     @type stats: L{stats.StatsLogger}
59     @ivar stats: the statistics gatherer
60     @type udp: L{krpc.hostbroker}
61     @ivar udp: the factory for the KRPC protocol
62     @type listenport: L{twisted.internet.interfaces.IListeningPort}
63     @ivar listenport: the UDP listening port
64     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65     @ivar next_checkpoint: the delayed call for the next checkpoint
66     """
67     
68     _Node = KNodeBase
69     
70     def __init__(self, config, cache_dir='/tmp'):
71         """Initialize the Khashmir class and call the L{setup} method.
72         
73         @type config: C{dictionary}
74         @param config: the configuration parameters for the DHT
75         @type cache_dir: C{string}
76         @param cache_dir: the directory to store all files in
77             (optional, defaults to the /tmp directory)
78         """
79         self.config = None
80         self.pinging = {}
81         self.setup(config, cache_dir)
82         
83     def setup(self, config, cache_dir):
84         """Setup all the Khashmir sub-modules.
85         
86         @type config: C{dictionary}
87         @param config: the configuration parameters for the DHT
88         @type cache_dir: C{string}
89         @param cache_dir: the directory to store all files in
90         """
91         self.config = config
92         self.port = config['PORT']
93         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94         self.node = self._loadSelfNode('', self.port)
95         self.table = KTable(self.node, config)
96         self.token_secrets = [newID()]
97         self.stats = StatsLogger(self.table, self.store)
98         
99         # Start listening
100         self.udp = krpc.hostbroker(self, self.stats, config)
101         self.udp.protocol = krpc.KRPC
102         self.listenport = reactor.listenUDP(self.port, self.udp)
103         
104         # Load the routing table and begin checkpointing
105         self._loadRoutingTable()
106         self.refreshTable(force = True)
107         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
108
109     def Node(self, id, host = None, port = None):
110         """Create a new node.
111         
112         @see: L{node.Node.__init__}
113         """
114         n = self._Node(id, host, port)
115         n.table = self.table
116         n.conn = self.udp.connectionForAddr((n.host, n.port))
117         return n
118     
119     def __del__(self):
120         """Stop listening for packets."""
121         self.listenport.stopListening()
122         
123     def _loadSelfNode(self, host, port):
124         """Create this node, loading any previously saved one."""
125         id = self.store.getSelfNode()
126         if not id or not id.endswith(self.config['VERSION']):
127             id = newID(self.config['VERSION'])
128         return self._Node(id, host, port)
129         
130     def checkpoint(self):
131         """Perform some periodic maintenance operations."""
132         # Create a new token secret
133         self.token_secrets.insert(0, newID())
134         if len(self.token_secrets) > 3:
135             self.token_secrets.pop()
136             
137         # Save some parameters for reloading
138         self.store.saveSelfNode(self.node.id)
139         self.store.dumpRoutingTable(self.table.buckets)
140         
141         # DHT maintenance
142         self.store.expireValues(self.config['KEY_EXPIRE'])
143         self.refreshTable()
144         
145         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
146                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
147                                                  self.checkpoint)
148         
149     def _loadRoutingTable(self):
150         """Load the previous routing table nodes from the database.
151         
152         It's usually a good idea to call refreshTable(force = True) after
153         loading the table.
154         """
155         nodes = self.store.getRoutingTable()
156         for rec in nodes:
157             n = self.Node(rec[0], rec[1], int(rec[2]))
158             self.table.insertNode(n, contacted = False)
159             
160     #{ Local interface
161     def addContact(self, host, port, callback=None, errback=None):
162         """Ping this node and add the contact info to the table on pong.
163         
164         @type host: C{string}
165         @param host: the IP address of the node to contact
166         @type port: C{int}
167         @param port:the port of the node to contact
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the contact info returned by the node
171             (optional, defaults to doing nothing with the results)
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to calling the callback with the error)
175         """
176         n = self.Node(NULL_ID, host, port)
177         self.sendJoin(n, callback=callback, errback=errback)
178
179     def findNode(self, id, callback):
180         """Find the contact info for the K closest nodes in the global table.
181         
182         @type id: C{string}
183         @param id: the target ID to find the K closest nodes of
184         @type callback: C{method}
185         @param callback: the method to call with the results, it must take 1
186             parameter, the list of K closest nodes
187         """
188         # Mark the bucket as having been accessed
189         self.table.touch(id)
190         
191         # Start with our node
192         nodes = [copy(self.node)]
193
194         # Start the finding nodes action
195         state = FindNode(self, id, callback, self.config, self.stats)
196         reactor.callLater(0, state.goWithNodes, nodes)
197     
198     def insertNode(self, node, contacted = True):
199         """Try to insert a node in our local table, pinging oldest contact if necessary.
200         
201         If all you have is a host/port, then use L{addContact}, which calls this
202         method after receiving the PONG from the remote node. The reason for
203         the separation is we can't insert a node into the table without its
204         node ID. That means of course the node passed into this method needs
205         to be a properly formed Node object with a valid ID.
206
207         @type node: L{node.Node}
208         @param node: the new node to try and insert
209         @type contacted: C{boolean}
210         @param contacted: whether the new node is known to be good, i.e.
211             responded to a request (optional, defaults to True)
212         """
213         # Don't add any local nodes to the routing table
214         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
216             return
217         
218         old = self.table.insertNode(node, contacted=contacted)
219
220         if (isinstance(old, self._Node) and old.id != self.node.id and
221             (datetime.now() - old.lastSeen) > 
222              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
223             
224             # Bucket is full, check to see if old node is still available
225             df = self.sendPing(old)
226             df.addErrback(self._staleNodeHandler, old, node, contacted)
227         elif not old and not contacted:
228             # There's room, we just need to contact the node first
229             df = self.sendPing(node)
230             # Also schedule a future ping to make sure the node works
231             def rePing(newnode, self = self):
232                 if newnode.id not in self.pinging:
233                     self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234                                                                  self.sendPing, newnode)
235                 return newnode
236             df.addCallback(rePing)
237
238     def _staleNodeHandler(self, err, old, node, contacted):
239         """The pinged node never responded, so replace it."""
240         self.table.invalidateNode(old)
241         self.insertNode(node, contacted)
242         return err
243     
244     def nodeFailed(self, node):
245         """Mark a node as having failed a request and schedule a future check.
246         
247         @type node: L{node.Node}
248         @param node: the new node to try and insert
249         """
250         exists = self.table.nodeFailed(node)
251         
252         # If in the table, schedule a ping, if one isn't already sent/scheduled
253         if exists and node.id not in self.pinging:
254             self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
255                                                       self.sendPing, node)
256     
257     def sendPing(self, node):
258         """Ping the node to see if it's still alive.
259         
260         @type node: L{node.Node}
261         @param node: the node to send the join to
262         """
263         # Check for a ping already underway
264         if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265             self.pinging[node.id].active()):
266             self.pinging[node.id].cancel()
267         elif isinstance(self.pinging.get(node.id, None), Deferred):
268             return self.pinging[node.id]
269
270         self.stats.startedAction('ping')
271         df = node.ping(self.node.id)
272         self.pinging[node.id] = df
273         df.addCallbacks(self._pingHandler, self._pingError,
274                         callbackArgs = (node, datetime.now()),
275                         errbackArgs = (node, datetime.now()))
276         return df
277
278     def _pingHandler(self, dict, node, start):
279         """Node responded properly, update it and return the node object."""
280         self.stats.completedAction('ping', start)
281         del self.pinging[node.id]
282         # Create the node using the returned contact info
283         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284         reactor.callLater(0, self.insertNode, n)
285         return n
286
287     def _pingError(self, err, node, start):
288         """Error occurred, fail node."""
289         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290         self.stats.completedAction('ping', start)
291         del self.pinging[node.id]
292         self.nodeFailed(node)
293         return err
294         
295     def sendJoin(self, node, callback=None, errback=None):
296         """Join the DHT by pinging a bootstrap node.
297         
298         @type node: L{node.Node}
299         @param node: the node to send the join to
300         @type callback: C{method}
301         @param callback: the method to call with the results, it must take 1
302             parameter, the contact info returned by the node
303             (optional, defaults to doing nothing with the results)
304         @type errback: C{method}
305         @param errback: the method to call if an error occurs
306             (optional, defaults to calling the callback with the error)
307         """
308         if errback is None:
309             errback = callback
310         self.stats.startedAction('join')
311         df = node.join(self.node.id)
312         df.addCallbacks(self._joinHandler, self._joinError,
313                         callbackArgs = (node, datetime.now()),
314                         errbackArgs = (node, datetime.now()))
315         if callback:
316             df.addCallbacks(callback, errback)
317
318     def _joinHandler(self, dict, node, start):
319         """Node responded properly, extract the response."""
320         self.stats.completedAction('join', start)
321         # Create the node using the returned contact info
322         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
323         reactor.callLater(0, self.insertNode, n)
324         return (dict['ip_addr'], dict['port'])
325
326     def _joinError(self, err, node, start):
327         """Error occurred, fail node."""
328         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
329         self.stats.completedAction('join', start)
330         self.nodeFailed(node)
331         return err
332         
333     def findCloseNodes(self, callback=lambda a: None):
334         """Perform a findNode on the ID one away from our own.
335
336         This will allow us to populate our table with nodes on our network
337         closest to our own. This is called as soon as we start up with an
338         empty table.
339
340         @type callback: C{method}
341         @param callback: the method to call with the results, it must take 1
342             parameter, the list of K closest nodes
343             (optional, defaults to doing nothing with the results)
344         """
345         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
346         self.findNode(id, callback)
347
348     def refreshTable(self, force = False):
349         """Check all the buckets for those that need refreshing.
350         
351         @param force: refresh all buckets regardless of last bucket access time
352             (optional, defaults to False)
353         """
354         def callback(nodes):
355             pass
356     
357         for bucket in self.table.buckets:
358             if force or (datetime.now() - bucket.lastAccessed > 
359                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
360                 # Choose a random ID in the bucket and try and find it
361                 id = newIDInRange(bucket.min, bucket.max)
362                 self.findNode(id, callback)
363
364     def shutdown(self):
365         """Closes the port and cancels pending later calls."""
366         self.listenport.stopListening()
367         try:
368             self.next_checkpoint.cancel()
369         except:
370             pass
371         for nodeid in self.pinging.keys():
372             if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active():
373                 self.pinging[nodeid].cancel()
374                 del self.pinging[nodeid]
375         self.store.close()
376     
377     def getStats(self):
378         """Gather the statistics for the DHT."""
379         return self.stats.formatHTML()
380
381     #{ Remote interface
382     def krpc_ping(self, id, _krpc_sender = None):
383         """Pong with our ID.
384         
385         @type id: C{string}
386         @param id: the node ID of the sender node
387         @type _krpc_sender: (C{string}, C{int})
388         @param _krpc_sender: the sender node's IP address and port
389         """
390         if _krpc_sender is not None:
391             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
392             reactor.callLater(0, self.insertNode, n, False)
393
394         return {"id" : self.node.id}
395         
396     def krpc_join(self, id, _krpc_sender = None):
397         """Add the node by responding with its address and port.
398         
399         @type id: C{string}
400         @param id: the node ID of the sender node
401         @type _krpc_sender: (C{string}, C{int})
402         @param _krpc_sender: the sender node's IP address and port
403         """
404         if _krpc_sender is not None:
405             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
406             reactor.callLater(0, self.insertNode, n, False)
407         else:
408             _krpc_sender = ('127.0.0.1', self.port)
409
410         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
411         
412     def krpc_find_node(self, id, target, _krpc_sender = None):
413         """Find the K closest nodes to the target in the local routing table.
414         
415         @type target: C{string}
416         @param target: the target ID to find nodes for
417         @type id: C{string}
418         @param id: the node ID of the sender node
419         @type _krpc_sender: (C{string}, C{int})
420         @param _krpc_sender: the sender node's IP address and port
421         """
422         if _krpc_sender is not None:
423             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
424             reactor.callLater(0, self.insertNode, n, False)
425         else:
426             _krpc_sender = ('127.0.0.1', self.port)
427
428         nodes = self.table.findNodes(target)
429         nodes = map(lambda node: node.contactInfo(), nodes)
430         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
431         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
432
433
434 class KhashmirRead(KhashmirBase):
435     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
436
437     _Node = KNodeRead
438
439     #{ Local interface
440     def findValue(self, key, callback):
441         """Get the nodes that have values for the key from the global table.
442         
443         @type key: C{string}
444         @param key: the target key to find the values for
445         @type callback: C{method}
446         @param callback: the method to call with the results, it must take 1
447             parameter, the list of nodes with values
448         """
449         # Mark the bucket as having been accessed
450         self.table.touch(key)
451         
452         # Start with ourself
453         nodes = [copy(self.node)]
454         
455         # Search for others starting with the locally found ones
456         state = FindValue(self, key, callback, self.config, self.stats)
457         reactor.callLater(0, state.goWithNodes, nodes)
458
459     def valueForKey(self, key, callback, searchlocal = True):
460         """Get the values found for key in global table.
461         
462         Callback will be called with a list of values for each peer that
463         returns unique values. The final callback will be an empty list.
464
465         @type key: C{string}
466         @param key: the target key to get the values for
467         @type callback: C{method}
468         @param callback: the method to call with the results, it must take 2
469             parameters: the key, and the values found
470         @type searchlocal: C{boolean}
471         @param searchlocal: whether to also look for any local values
472         """
473
474         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
475             """Use the found nodes to send requests for values to."""
476             # Get any local values
477             if searchlocal:
478                 l = self.store.retrieveValues(key)
479                 if len(l) > 0:
480                     node = copy(self.node)
481                     node.updateNumValues(len(l))
482                     nodes = nodes + [node]
483
484             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
485             reactor.callLater(0, state.goWithNodes, nodes)
486             
487         # First lookup nodes that have values for the key
488         self.findValue(key, _getValueForKey)
489
490     #{ Remote interface
491     def krpc_find_value(self, id, key, _krpc_sender = None):
492         """Find the number of values stored locally for the key, and the K closest nodes.
493         
494         @type key: C{string}
495         @param key: the target key to find the values and nodes for
496         @type id: C{string}
497         @param id: the node ID of the sender node
498         @type _krpc_sender: (C{string}, C{int})
499         @param _krpc_sender: the sender node's IP address and port
500         """
501         if _krpc_sender is not None:
502             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
503             reactor.callLater(0, self.insertNode, n, False)
504     
505         nodes = self.table.findNodes(key)
506         nodes = map(lambda node: node.contactInfo(), nodes)
507         num_values = self.store.countValues(key)
508         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
509
510     def krpc_get_value(self, id, key, num, _krpc_sender = None):
511         """Retrieve the values stored locally for the key.
512         
513         @type key: C{string}
514         @param key: the target key to retrieve the values for
515         @type num: C{int}
516         @param num: the maximum number of values to retrieve, or 0 to
517             retrieve all of them
518         @type id: C{string}
519         @param id: the node ID of the sender node
520         @type _krpc_sender: (C{string}, C{int})
521         @param _krpc_sender: the sender node's IP address and port
522         """
523         if _krpc_sender is not None:
524             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
525             reactor.callLater(0, self.insertNode, n, False)
526     
527         l = self.store.retrieveValues(key)
528         if num == 0 or num >= len(l):
529             return {'values' : l, "id": self.node.id}
530         else:
531             shuffle(l)
532             return {'values' : l[:num], "id": self.node.id}
533
534
535 class KhashmirWrite(KhashmirRead):
536     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
537
538     _Node = KNodeWrite
539
540     #{ Local interface
541     def storeValueForKey(self, key, value, callback=None):
542         """Stores the value for the key in the global table.
543         
544         No status in this implementation, peers respond but don't indicate
545         status of storing values.
546
547         @type key: C{string}
548         @param key: the target key to store the value for
549         @type value: C{string}
550         @param value: the value to store with the key
551         @type callback: C{method}
552         @param callback: the method to call with the results, it must take 3
553             parameters: the key, the value stored, and the result of the store
554             (optional, defaults to doing nothing with the results)
555         """
556         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
557             """Use the returned K closest nodes to store the key at."""
558             if not response:
559                 def _storedValueHandler(key, value, sender):
560                     """Default callback that does nothing."""
561                     pass
562                 response = _storedValueHandler
563             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
564             reactor.callLater(0, action.goWithNodes, nodes)
565             
566         # First find the K closest nodes to operate on.
567         self.findNode(key, _storeValueForKey)
568                     
569     #{ Remote interface
570     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
571         """Store the value locally with the key.
572         
573         @type key: C{string}
574         @param key: the target key to store the value for
575         @type value: C{string}
576         @param value: the value to store with the key
577         @param token: the token to confirm that this peer contacted us previously
578         @type id: C{string}
579         @param id: the node ID of the sender node
580         @type _krpc_sender: (C{string}, C{int})
581         @param _krpc_sender: the sender node's IP address and port
582         """
583         if _krpc_sender is not None:
584             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
585             reactor.callLater(0, self.insertNode, n, False)
586         else:
587             _krpc_sender = ('127.0.0.1', self.port)
588
589         for secret in self.token_secrets:
590             this_token = sha(secret + _krpc_sender[0]).digest()
591             if token == this_token:
592                 self.store.storeValue(key, value)
593                 return {"id" : self.node.id}
594         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
595
596
597 class Khashmir(KhashmirWrite):
598     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
599     _Node = KNodeWrite
600
601
602 class SimpleTests(unittest.TestCase):
603     
604     timeout = 10
605     DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
606                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
607                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
608                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
609                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
610                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
611                     'KEY_EXPIRE': 3600, 'SPEW': True, }
612
613     def setUp(self):
614         d = self.DHT_DEFAULTS.copy()
615         d['PORT'] = 4044
616         self.a = Khashmir(d)
617         d = self.DHT_DEFAULTS.copy()
618         d['PORT'] = 4045
619         self.b = Khashmir(d)
620         
621     def tearDown(self):
622         self.a.shutdown()
623         self.b.shutdown()
624         os.unlink(self.a.store.db)
625         os.unlink(self.b.store.db)
626
627     def testAddContact(self):
628         self.failUnlessEqual(len(self.a.table.buckets), 1)
629         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
630
631         self.failUnlessEqual(len(self.b.table.buckets), 1)
632         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
633
634         self.a.addContact('127.0.0.1', 4045)
635         reactor.iterate()
636         reactor.iterate()
637         reactor.iterate()
638         reactor.iterate()
639         reactor.iterate()
640         reactor.iterate()
641         reactor.iterate()
642         reactor.iterate()
643
644         self.failUnlessEqual(len(self.a.table.buckets), 1)
645         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
646         self.failUnlessEqual(len(self.b.table.buckets), 1)
647         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
648
649     def testStoreRetrieve(self):
650         self.a.addContact('127.0.0.1', 4045)
651         reactor.iterate()
652         reactor.iterate()
653         reactor.iterate()
654         reactor.iterate()
655         self.got = 0
656         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
657         reactor.iterate()
658         reactor.iterate()
659         reactor.iterate()
660         reactor.iterate()
661         reactor.iterate()
662         reactor.iterate()
663         self.a.valueForKey(sha('foo').digest(), self._cb)
664         reactor.iterate()
665         reactor.iterate()
666         reactor.iterate()
667         reactor.iterate()
668         reactor.iterate()
669         reactor.iterate()
670         reactor.iterate()
671         reactor.iterate()
672         reactor.iterate()
673         reactor.iterate()
674         reactor.iterate()
675         reactor.iterate()
676         reactor.iterate()
677         reactor.iterate()
678         reactor.iterate()
679         reactor.iterate()
680         reactor.iterate()
681         reactor.iterate()
682         reactor.iterate()
683         reactor.iterate()
684         reactor.iterate()
685
686     def _cb(self, key, val):
687         if not val:
688             self.failUnlessEqual(self.got, 1)
689         elif 'foobar' in val:
690             self.got = 1
691
692
693 class MultiTest(unittest.TestCase):
694     
695     timeout = 30
696     num = 20
697     DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
698                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
699                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
700                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
701                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
702                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
703                     'KEY_EXPIRE': 3600, 'SPEW': True, }
704
705     def _done(self, val):
706         self.done = 1
707         
708     def setUp(self):
709         self.l = []
710         self.startport = 4088
711         for i in range(self.num):
712             d = self.DHT_DEFAULTS.copy()
713             d['PORT'] = self.startport + i
714             self.l.append(Khashmir(d))
715         reactor.iterate()
716         reactor.iterate()
717         
718         for i in self.l:
719             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
720             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
721             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
722             reactor.iterate()
723             reactor.iterate()
724             reactor.iterate() 
725             reactor.iterate()
726             reactor.iterate()
727             reactor.iterate() 
728             
729         for i in self.l:
730             self.done = 0
731             i.findCloseNodes(self._done)
732             while not self.done:
733                 reactor.iterate()
734         for i in self.l:
735             self.done = 0
736             i.findCloseNodes(self._done)
737             while not self.done:
738                 reactor.iterate()
739
740     def tearDown(self):
741         for i in self.l:
742             i.shutdown()
743             os.unlink(i.store.db)
744             
745         reactor.iterate()
746         
747     def testStoreRetrieve(self):
748         for i in range(10):
749             K = newID()
750             V = newID()
751             
752             for a in range(3):
753                 self.done = 0
754                 def _scb(key, value, result):
755                     self.done = 1
756                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
757                 while not self.done:
758                     reactor.iterate()
759
760
761                 def _rcb(key, val):
762                     if not val:
763                         self.done = 1
764                         self.failUnlessEqual(self.got, 1)
765                     elif V in val:
766                         self.got = 1
767                 for x in range(3):
768                     self.got = 0
769                     self.done = 0
770                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
771                     while not self.done:
772                         reactor.iterate()