]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/khashmir.py
When a node fails, schedule a future ping to check again.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
22
23 from db import DB
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
29 import krpc
30
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37
38 class KhashmirBase(protocol.Factory):
39     """The base Khashmir class, with base functionality and find node, no key-value mappings.
40     
41     @type _Node: L{node.Node}
42     @ivar _Node: the knode implementation to use for this class of DHT
43     @type config: C{dictionary}
44     @ivar config: the configuration parameters for the DHT
45     @type pinging: C{dictionary}
46     @ivar pinging: the node's that are currently being pinged, keys are the
47         node id's, values are the Deferred or DelayedCall objects
48     @type port: C{int}
49     @ivar port: the port to listen on
50     @type store: L{db.DB}
51     @ivar store: the database to store nodes and key/value pairs in
52     @type node: L{node.Node}
53     @ivar node: this node
54     @type table: L{ktable.KTable}
55     @ivar table: the routing table
56     @type token_secrets: C{list} of C{string}
57     @ivar token_secrets: the current secrets to use to create tokens
58     @type stats: L{stats.StatsLogger}
59     @ivar stats: the statistics gatherer
60     @type udp: L{krpc.hostbroker}
61     @ivar udp: the factory for the KRPC protocol
62     @type listenport: L{twisted.internet.interfaces.IListeningPort}
63     @ivar listenport: the UDP listening port
64     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65     @ivar next_checkpoint: the delayed call for the next checkpoint
66     """
67     
68     _Node = KNodeBase
69     
70     def __init__(self, config, cache_dir='/tmp'):
71         """Initialize the Khashmir class and call the L{setup} method.
72         
73         @type config: C{dictionary}
74         @param config: the configuration parameters for the DHT
75         @type cache_dir: C{string}
76         @param cache_dir: the directory to store all files in
77             (optional, defaults to the /tmp directory)
78         """
79         self.config = None
80         self.pinging = {}
81         self.setup(config, cache_dir)
82         
83     def setup(self, config, cache_dir):
84         """Setup all the Khashmir sub-modules.
85         
86         @type config: C{dictionary}
87         @param config: the configuration parameters for the DHT
88         @type cache_dir: C{string}
89         @param cache_dir: the directory to store all files in
90         """
91         self.config = config
92         self.port = config['PORT']
93         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94         self.node = self._loadSelfNode('', self.port)
95         self.table = KTable(self.node, config)
96         self.token_secrets = [newID()]
97         self.stats = StatsLogger(self.table, self.store)
98         
99         # Start listening
100         self.udp = krpc.hostbroker(self, self.stats, config)
101         self.udp.protocol = krpc.KRPC
102         self.listenport = reactor.listenUDP(self.port, self.udp)
103         
104         # Load the routing table and begin checkpointing
105         self._loadRoutingTable()
106         self.refreshTable(force = True)
107         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
108
109     def Node(self, id, host = None, port = None):
110         """Create a new node.
111         
112         @see: L{node.Node.__init__}
113         """
114         n = self._Node(id, host, port)
115         n.table = self.table
116         n.conn = self.udp.connectionForAddr((n.host, n.port))
117         return n
118     
119     def __del__(self):
120         """Stop listening for packets."""
121         self.listenport.stopListening()
122         
123     def _loadSelfNode(self, host, port):
124         """Create this node, loading any previously saved one."""
125         id = self.store.getSelfNode()
126         if not id:
127             id = newID()
128         return self._Node(id, host, port)
129         
130     def checkpoint(self):
131         """Perform some periodic maintenance operations."""
132         # Create a new token secret
133         self.token_secrets.insert(0, newID())
134         if len(self.token_secrets) > 3:
135             self.token_secrets.pop()
136             
137         # Save some parameters for reloading
138         self.store.saveSelfNode(self.node.id)
139         self.store.dumpRoutingTable(self.table.buckets)
140         
141         # DHT maintenance
142         self.store.expireValues(self.config['KEY_EXPIRE'])
143         self.refreshTable()
144         
145         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
146                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
147                                                  self.checkpoint)
148         
149     def _loadRoutingTable(self):
150         """Load the previous routing table nodes from the database.
151         
152         It's usually a good idea to call refreshTable(force = True) after
153         loading the table.
154         """
155         nodes = self.store.getRoutingTable()
156         for rec in nodes:
157             n = self.Node(rec[0], rec[1], int(rec[2]))
158             self.table.insertNode(n, contacted = False)
159             
160     #{ Local interface
161     def addContact(self, host, port, callback=None, errback=None):
162         """Ping this node and add the contact info to the table on pong.
163         
164         @type host: C{string}
165         @param host: the IP address of the node to contact
166         @type port: C{int}
167         @param port:the port of the node to contact
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the contact info returned by the node
171             (optional, defaults to doing nothing with the results)
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to calling the callback with the error)
175         """
176         n = self.Node(NULL_ID, host, port)
177         self.sendJoin(n, callback=callback, errback=errback)
178
179     def findNode(self, id, callback):
180         """Find the contact info for the K closest nodes in the global table.
181         
182         @type id: C{string}
183         @param id: the target ID to find the K closest nodes of
184         @type callback: C{method}
185         @param callback: the method to call with the results, it must take 1
186             parameter, the list of K closest nodes
187         """
188         # Mark the bucket as having been accessed
189         self.table.touch(id)
190         
191         # Start with our node
192         nodes = [copy(self.node)]
193
194         # Start the finding nodes action
195         state = FindNode(self, id, callback, self.config, self.stats)
196         reactor.callLater(0, state.goWithNodes, nodes)
197     
198     def insertNode(self, node, contacted = True):
199         """Try to insert a node in our local table, pinging oldest contact if necessary.
200         
201         If all you have is a host/port, then use L{addContact}, which calls this
202         method after receiving the PONG from the remote node. The reason for
203         the separation is we can't insert a node into the table without its
204         node ID. That means of course the node passed into this method needs
205         to be a properly formed Node object with a valid ID.
206
207         @type node: L{node.Node}
208         @param node: the new node to try and insert
209         @type contacted: C{boolean}
210         @param contacted: whether the new node is known to be good, i.e.
211             responded to a request (optional, defaults to True)
212         """
213         # Don't add any local nodes to the routing table
214         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
216             return
217         
218         old = self.table.insertNode(node, contacted=contacted)
219
220         if (isinstance(old, self._Node) and old.id != self.node.id and
221             (datetime.now() - old.lastSeen) > 
222              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
223             
224             # Bucket is full, check to see if old node is still available
225             df = self.sendPing(old)
226             df.addErrback(self._staleNodeHandler, old, node, contacted)
227         elif not old and not contacted:
228             # There's room, we just need to contact the node first
229             self.sendPing(node)
230
231     def _staleNodeHandler(self, err, old, node, contacted):
232         """The pinged node never responded, so replace it."""
233         self.table.invalidateNode(old)
234         self.insertNode(node, contacted)
235         return err
236     
237     def nodeFailed(self, node):
238         """Mark a node as having failed a request and schedule a future check.
239         
240         @type node: L{node.Node}
241         @param node: the new node to try and insert
242         """
243         exists = self.table.nodeFailed(node)
244         
245         # If in the table, schedule a ping, if one isn't already sent/scheduled
246         if exists and node.id not in self.pinging:
247             self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
248                                                       self.sendPing, node)
249     
250     def sendPing(self, node):
251         """Ping the node to see if it's still alive.
252         
253         @type node: L{node.Node}
254         @param node: the node to send the join to
255         """
256         # Check for a ping already underway
257         if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
258             self.pinging[node.id].active()):
259             self.pinging[node.id].cancel()
260         elif isinstance(self.pinging.get(node.id, None), Deferred):
261             return self.pinging[node.id]
262
263         self.stats.startedAction('ping')
264         df = node.ping(self.node.id)
265         self.pinging[node.id] = df
266         df.addCallbacks(self._pingHandler, self._pingError,
267                         callbackArgs = (node, datetime.now()),
268                         errbackArgs = (node, datetime.now()))
269         return df
270
271     def _pingHandler(self, dict, node, start):
272         """Node responded properly, update it and return the node object."""
273         self.stats.completedAction('ping', start)
274         del self.pinging[node.id]
275         # Create the node using the returned contact info
276         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
277         reactor.callLater(0, self.insertNode, n)
278         return n
279
280     def _pingError(self, err, node, start):
281         """Error occurred, fail node."""
282         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
283         self.stats.completedAction('ping', start)
284         del self.pinging[node.id]
285         self.nodeFailed(node)
286         return err
287         
288     def sendJoin(self, node, callback=None, errback=None):
289         """Join the DHT by pinging a bootstrap node.
290         
291         @type node: L{node.Node}
292         @param node: the node to send the join to
293         @type callback: C{method}
294         @param callback: the method to call with the results, it must take 1
295             parameter, the contact info returned by the node
296             (optional, defaults to doing nothing with the results)
297         @type errback: C{method}
298         @param errback: the method to call if an error occurs
299             (optional, defaults to calling the callback with the error)
300         """
301         if errback is None:
302             errback = callback
303         self.stats.startedAction('join')
304         df = node.join(self.node.id)
305         df.addCallbacks(self._joinHandler, self._joinError,
306                         callbackArgs = (node, datetime.now()),
307                         errbackArgs = (node, datetime.now()))
308         if callback:
309             df.addCallbacks(callback, errback)
310
311     def _joinHandler(self, dict, node, start):
312         """Node responded properly, extract the response."""
313         self.stats.completedAction('join', start)
314         # Create the node using the returned contact info
315         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
316         reactor.callLater(0, self.insertNode, n)
317         return (dict['ip_addr'], dict['port'])
318
319     def _joinError(self, err, node, start):
320         """Error occurred, fail node."""
321         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
322         self.stats.completedAction('join', start)
323         self.nodeFailed(node)
324         return err
325         
326     def findCloseNodes(self, callback=lambda a: None):
327         """Perform a findNode on the ID one away from our own.
328
329         This will allow us to populate our table with nodes on our network
330         closest to our own. This is called as soon as we start up with an
331         empty table.
332
333         @type callback: C{method}
334         @param callback: the method to call with the results, it must take 1
335             parameter, the list of K closest nodes
336             (optional, defaults to doing nothing with the results)
337         """
338         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
339         self.findNode(id, callback)
340
341     def refreshTable(self, force = False):
342         """Check all the buckets for those that need refreshing.
343         
344         @param force: refresh all buckets regardless of last bucket access time
345             (optional, defaults to False)
346         """
347         def callback(nodes):
348             pass
349     
350         for bucket in self.table.buckets:
351             if force or (datetime.now() - bucket.lastAccessed > 
352                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
353                 # Choose a random ID in the bucket and try and find it
354                 id = newIDInRange(bucket.min, bucket.max)
355                 self.findNode(id, callback)
356
357     def shutdown(self):
358         """Closes the port and cancels pending later calls."""
359         self.listenport.stopListening()
360         try:
361             self.next_checkpoint.cancel()
362         except:
363             pass
364         for call in self.pinging:
365             if isinstance(call, DelayedCall) and call.active():
366                 call.cancel()
367         self.store.close()
368     
369     def getStats(self):
370         """Gather the statistics for the DHT."""
371         return self.stats.formatHTML()
372
373     #{ Remote interface
374     def krpc_ping(self, id, _krpc_sender = None):
375         """Pong with our ID.
376         
377         @type id: C{string}
378         @param id: the node ID of the sender node
379         @type _krpc_sender: (C{string}, C{int})
380         @param _krpc_sender: the sender node's IP address and port
381         """
382         if _krpc_sender is not None:
383             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
384             reactor.callLater(0, self.insertNode, n, False)
385
386         return {"id" : self.node.id}
387         
388     def krpc_join(self, id, _krpc_sender = None):
389         """Add the node by responding with its address and port.
390         
391         @type id: C{string}
392         @param id: the node ID of the sender node
393         @type _krpc_sender: (C{string}, C{int})
394         @param _krpc_sender: the sender node's IP address and port
395         """
396         if _krpc_sender is not None:
397             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
398             reactor.callLater(0, self.insertNode, n, False)
399         else:
400             _krpc_sender = ('127.0.0.1', self.port)
401
402         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
403         
404     def krpc_find_node(self, id, target, _krpc_sender = None):
405         """Find the K closest nodes to the target in the local routing table.
406         
407         @type target: C{string}
408         @param target: the target ID to find nodes for
409         @type id: C{string}
410         @param id: the node ID of the sender node
411         @type _krpc_sender: (C{string}, C{int})
412         @param _krpc_sender: the sender node's IP address and port
413         """
414         if _krpc_sender is not None:
415             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
416             reactor.callLater(0, self.insertNode, n, False)
417         else:
418             _krpc_sender = ('127.0.0.1', self.port)
419
420         nodes = self.table.findNodes(target)
421         nodes = map(lambda node: node.contactInfo(), nodes)
422         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
423         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
424
425
426 class KhashmirRead(KhashmirBase):
427     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
428
429     _Node = KNodeRead
430
431     #{ Local interface
432     def findValue(self, key, callback):
433         """Get the nodes that have values for the key from the global table.
434         
435         @type key: C{string}
436         @param key: the target key to find the values for
437         @type callback: C{method}
438         @param callback: the method to call with the results, it must take 1
439             parameter, the list of nodes with values
440         """
441         # Mark the bucket as having been accessed
442         self.table.touch(key)
443         
444         # Start with ourself
445         nodes = [copy(self.node)]
446         
447         # Search for others starting with the locally found ones
448         state = FindValue(self, key, callback, self.config, self.stats)
449         reactor.callLater(0, state.goWithNodes, nodes)
450
451     def valueForKey(self, key, callback, searchlocal = True):
452         """Get the values found for key in global table.
453         
454         Callback will be called with a list of values for each peer that
455         returns unique values. The final callback will be an empty list.
456
457         @type key: C{string}
458         @param key: the target key to get the values for
459         @type callback: C{method}
460         @param callback: the method to call with the results, it must take 2
461             parameters: the key, and the values found
462         @type searchlocal: C{boolean}
463         @param searchlocal: whether to also look for any local values
464         """
465
466         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
467             """Use the found nodes to send requests for values to."""
468             # Get any local values
469             if searchlocal:
470                 l = self.store.retrieveValues(key)
471                 if len(l) > 0:
472                     node = copy(self.node)
473                     node.updateNumValues(len(l))
474                     nodes = nodes + [node]
475
476             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
477             reactor.callLater(0, state.goWithNodes, nodes)
478             
479         # First lookup nodes that have values for the key
480         self.findValue(key, _getValueForKey)
481
482     #{ Remote interface
483     def krpc_find_value(self, id, key, _krpc_sender = None):
484         """Find the number of values stored locally for the key, and the K closest nodes.
485         
486         @type key: C{string}
487         @param key: the target key to find the values and nodes for
488         @type id: C{string}
489         @param id: the node ID of the sender node
490         @type _krpc_sender: (C{string}, C{int})
491         @param _krpc_sender: the sender node's IP address and port
492         """
493         if _krpc_sender is not None:
494             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
495             reactor.callLater(0, self.insertNode, n, False)
496     
497         nodes = self.table.findNodes(key)
498         nodes = map(lambda node: node.contactInfo(), nodes)
499         num_values = self.store.countValues(key)
500         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
501
502     def krpc_get_value(self, id, key, num, _krpc_sender = None):
503         """Retrieve the values stored locally for the key.
504         
505         @type key: C{string}
506         @param key: the target key to retrieve the values for
507         @type num: C{int}
508         @param num: the maximum number of values to retrieve, or 0 to
509             retrieve all of them
510         @type id: C{string}
511         @param id: the node ID of the sender node
512         @type _krpc_sender: (C{string}, C{int})
513         @param _krpc_sender: the sender node's IP address and port
514         """
515         if _krpc_sender is not None:
516             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
517             reactor.callLater(0, self.insertNode, n, False)
518     
519         l = self.store.retrieveValues(key)
520         if num == 0 or num >= len(l):
521             return {'values' : l, "id": self.node.id}
522         else:
523             shuffle(l)
524             return {'values' : l[:num], "id": self.node.id}
525
526
527 class KhashmirWrite(KhashmirRead):
528     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
529
530     _Node = KNodeWrite
531
532     #{ Local interface
533     def storeValueForKey(self, key, value, callback=None):
534         """Stores the value for the key in the global table.
535         
536         No status in this implementation, peers respond but don't indicate
537         status of storing values.
538
539         @type key: C{string}
540         @param key: the target key to store the value for
541         @type value: C{string}
542         @param value: the value to store with the key
543         @type callback: C{method}
544         @param callback: the method to call with the results, it must take 3
545             parameters: the key, the value stored, and the result of the store
546             (optional, defaults to doing nothing with the results)
547         """
548         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
549             """Use the returned K closest nodes to store the key at."""
550             if not response:
551                 def _storedValueHandler(key, value, sender):
552                     """Default callback that does nothing."""
553                     pass
554                 response = _storedValueHandler
555             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
556             reactor.callLater(0, action.goWithNodes, nodes)
557             
558         # First find the K closest nodes to operate on.
559         self.findNode(key, _storeValueForKey)
560                     
561     #{ Remote interface
562     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
563         """Store the value locally with the key.
564         
565         @type key: C{string}
566         @param key: the target key to store the value for
567         @type value: C{string}
568         @param value: the value to store with the key
569         @param token: the token to confirm that this peer contacted us previously
570         @type id: C{string}
571         @param id: the node ID of the sender node
572         @type _krpc_sender: (C{string}, C{int})
573         @param _krpc_sender: the sender node's IP address and port
574         """
575         if _krpc_sender is not None:
576             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
577             reactor.callLater(0, self.insertNode, n, False)
578         else:
579             _krpc_sender = ('127.0.0.1', self.port)
580
581         for secret in self.token_secrets:
582             this_token = sha(secret + _krpc_sender[0]).digest()
583             if token == this_token:
584                 self.store.storeValue(key, value)
585                 return {"id" : self.node.id}
586         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
587
588
589 class Khashmir(KhashmirWrite):
590     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
591     _Node = KNodeWrite
592
593
594 class SimpleTests(unittest.TestCase):
595     
596     timeout = 10
597     DHT_DEFAULTS = {'PORT': 9977,
598                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
599                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
600                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
601                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
602                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
603                     'KEY_EXPIRE': 3600, 'SPEW': True, }
604
605     def setUp(self):
606         d = self.DHT_DEFAULTS.copy()
607         d['PORT'] = 4044
608         self.a = Khashmir(d)
609         d = self.DHT_DEFAULTS.copy()
610         d['PORT'] = 4045
611         self.b = Khashmir(d)
612         
613     def tearDown(self):
614         self.a.shutdown()
615         self.b.shutdown()
616         os.unlink(self.a.store.db)
617         os.unlink(self.b.store.db)
618
619     def testAddContact(self):
620         self.failUnlessEqual(len(self.a.table.buckets), 1)
621         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
622
623         self.failUnlessEqual(len(self.b.table.buckets), 1)
624         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
625
626         self.a.addContact('127.0.0.1', 4045)
627         reactor.iterate()
628         reactor.iterate()
629         reactor.iterate()
630         reactor.iterate()
631         reactor.iterate()
632         reactor.iterate()
633         reactor.iterate()
634         reactor.iterate()
635
636         self.failUnlessEqual(len(self.a.table.buckets), 1)
637         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
638         self.failUnlessEqual(len(self.b.table.buckets), 1)
639         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
640
641     def testStoreRetrieve(self):
642         self.a.addContact('127.0.0.1', 4045)
643         reactor.iterate()
644         reactor.iterate()
645         reactor.iterate()
646         reactor.iterate()
647         self.got = 0
648         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
649         reactor.iterate()
650         reactor.iterate()
651         reactor.iterate()
652         reactor.iterate()
653         reactor.iterate()
654         reactor.iterate()
655         self.a.valueForKey(sha('foo').digest(), self._cb)
656         reactor.iterate()
657         reactor.iterate()
658         reactor.iterate()
659         reactor.iterate()
660         reactor.iterate()
661         reactor.iterate()
662         reactor.iterate()
663         reactor.iterate()
664         reactor.iterate()
665         reactor.iterate()
666         reactor.iterate()
667         reactor.iterate()
668         reactor.iterate()
669         reactor.iterate()
670         reactor.iterate()
671         reactor.iterate()
672         reactor.iterate()
673         reactor.iterate()
674         reactor.iterate()
675         reactor.iterate()
676         reactor.iterate()
677
678     def _cb(self, key, val):
679         if not val:
680             self.failUnlessEqual(self.got, 1)
681         elif 'foobar' in val:
682             self.got = 1
683
684
685 class MultiTest(unittest.TestCase):
686     
687     timeout = 30
688     num = 20
689     DHT_DEFAULTS = {'PORT': 9977,
690                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
691                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
692                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
693                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
694                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
695                     'KEY_EXPIRE': 3600, 'SPEW': True, }
696
697     def _done(self, val):
698         self.done = 1
699         
700     def setUp(self):
701         self.l = []
702         self.startport = 4088
703         for i in range(self.num):
704             d = self.DHT_DEFAULTS.copy()
705             d['PORT'] = self.startport + i
706             self.l.append(Khashmir(d))
707         reactor.iterate()
708         reactor.iterate()
709         
710         for i in self.l:
711             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
712             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
713             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
714             reactor.iterate()
715             reactor.iterate()
716             reactor.iterate() 
717             reactor.iterate()
718             reactor.iterate()
719             reactor.iterate() 
720             
721         for i in self.l:
722             self.done = 0
723             i.findCloseNodes(self._done)
724             while not self.done:
725                 reactor.iterate()
726         for i in self.l:
727             self.done = 0
728             i.findCloseNodes(self._done)
729             while not self.done:
730                 reactor.iterate()
731
732     def tearDown(self):
733         for i in self.l:
734             i.shutdown()
735             os.unlink(i.store.db)
736             
737         reactor.iterate()
738         
739     def testStoreRetrieve(self):
740         for i in range(10):
741             K = newID()
742             V = newID()
743             
744             for a in range(3):
745                 self.done = 0
746                 def _scb(key, value, result):
747                     self.done = 1
748                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
749                 while not self.done:
750                     reactor.iterate()
751
752
753                 def _rcb(key, val):
754                     if not val:
755                         self.done = 1
756                         self.failUnlessEqual(self.got, 1)
757                     elif V in val:
758                         self.got = 1
759                 for x in range(3):
760                     self.got = 0
761                     self.done = 0
762                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
763                     while not self.done:
764                         reactor.iterate()