f083da4763dc25f5a96fea8b50be9ca7488a89a7
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
21
22 from db import DB
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
28 import krpc
29
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
36
37 class KhashmirBase(protocol.Factory):
38     """The base Khashmir class, with base functionality and find node, no key-value mappings.
39     
40     @type _Node: L{node.Node}
41     @ivar _Node: the knode implementation to use for this class of DHT
42     @type config: C{dictionary}
43     @ivar config: the configuration parameters for the DHT
44     @type port: C{int}
45     @ivar port: the port to listen on
46     @type store: L{db.DB}
47     @ivar store: the database to store nodes and key/value pairs in
48     @type node: L{node.Node}
49     @ivar node: this node
50     @type table: L{ktable.KTable}
51     @ivar table: the routing table
52     @type token_secrets: C{list} of C{string}
53     @ivar token_secrets: the current secrets to use to create tokens
54     @type stats: L{stats.StatsLogger}
55     @ivar stats: the statistics gatherer
56     @type udp: L{krpc.hostbroker}
57     @ivar udp: the factory for the KRPC protocol
58     @type listenport: L{twisted.internet.interfaces.IListeningPort}
59     @ivar listenport: the UDP listening port
60     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
61     @ivar next_checkpoint: the delayed call for the next checkpoint
62     """
63     
64     _Node = KNodeBase
65     
66     def __init__(self, config, cache_dir='/tmp'):
67         """Initialize the Khashmir class and call the L{setup} method.
68         
69         @type config: C{dictionary}
70         @param config: the configuration parameters for the DHT
71         @type cache_dir: C{string}
72         @param cache_dir: the directory to store all files in
73             (optional, defaults to the /tmp directory)
74         """
75         self.config = None
76         self.setup(config, cache_dir)
77         
78     def setup(self, config, cache_dir):
79         """Setup all the Khashmir sub-modules.
80         
81         @type config: C{dictionary}
82         @param config: the configuration parameters for the DHT
83         @type cache_dir: C{string}
84         @param cache_dir: the directory to store all files in
85         """
86         self.config = config
87         self.port = config['PORT']
88         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
89         self.node = self._loadSelfNode('', self.port)
90         self.table = KTable(self.node, config)
91         self.token_secrets = [newID()]
92         self.stats = StatsLogger(self.table, self.store)
93         
94         # Start listening
95         self.udp = krpc.hostbroker(self, self.stats, config)
96         self.udp.protocol = krpc.KRPC
97         self.listenport = reactor.listenUDP(self.port, self.udp)
98         
99         # Load the routing table and begin checkpointing
100         self._loadRoutingTable()
101         self.refreshTable(force = True)
102         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
103
104     def Node(self, id, host = None, port = None):
105         """Create a new node.
106         
107         @see: L{node.Node.__init__}
108         """
109         n = self._Node(id, host, port)
110         n.table = self.table
111         n.conn = self.udp.connectionForAddr((n.host, n.port))
112         return n
113     
114     def __del__(self):
115         """Stop listening for packets."""
116         self.listenport.stopListening()
117         
118     def _loadSelfNode(self, host, port):
119         """Create this node, loading any previously saved one."""
120         id = self.store.getSelfNode()
121         if not id:
122             id = newID()
123         return self._Node(id, host, port)
124         
125     def checkpoint(self):
126         """Perform some periodic maintenance operations."""
127         # Create a new token secret
128         self.token_secrets.insert(0, newID())
129         if len(self.token_secrets) > 3:
130             self.token_secrets.pop()
131             
132         # Save some parameters for reloading
133         self.store.saveSelfNode(self.node.id)
134         self.store.dumpRoutingTable(self.table.buckets)
135         
136         # DHT maintenance
137         self.store.expireValues(self.config['KEY_EXPIRE'])
138         self.refreshTable()
139         
140         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
141                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
142                                                  self.checkpoint)
143         
144     def _loadRoutingTable(self):
145         """Load the previous routing table nodes from the database.
146         
147         It's usually a good idea to call refreshTable(force = True) after
148         loading the table.
149         """
150         nodes = self.store.getRoutingTable()
151         for rec in nodes:
152             n = self.Node(rec[0], rec[1], int(rec[2]))
153             self.table.insertNode(n, contacted = False)
154             
155     #{ Local interface
156     def addContact(self, host, port, callback=None, errback=None):
157         """Ping this node and add the contact info to the table on pong.
158         
159         @type host: C{string}
160         @param host: the IP address of the node to contact
161         @type port: C{int}
162         @param port:the port of the node to contact
163         @type callback: C{method}
164         @param callback: the method to call with the results, it must take 1
165             parameter, the contact info returned by the node
166             (optional, defaults to doing nothing with the results)
167         @type errback: C{method}
168         @param errback: the method to call if an error occurs
169             (optional, defaults to calling the callback with the error)
170         """
171         n = self.Node(NULL_ID, host, port)
172         self.sendJoin(n, callback=callback, errback=errback)
173
174     def findNode(self, id, callback):
175         """Find the contact info for the K closest nodes in the global table.
176         
177         @type id: C{string}
178         @param id: the target ID to find the K closest nodes of
179         @type callback: C{method}
180         @param callback: the method to call with the results, it must take 1
181             parameter, the list of K closest nodes
182         """
183         # Start with our node
184         nodes = [copy(self.node)]
185
186         # Start the finding nodes action
187         state = FindNode(self, id, callback, self.config, self.stats)
188         reactor.callLater(0, state.goWithNodes, nodes)
189     
190     def insertNode(self, node, contacted = True):
191         """Try to insert a node in our local table, pinging oldest contact if necessary.
192         
193         If all you have is a host/port, then use L{addContact}, which calls this
194         method after receiving the PONG from the remote node. The reason for
195         the separation is we can't insert a node into the table without its
196         node ID. That means of course the node passed into this method needs
197         to be a properly formed Node object with a valid ID.
198
199         @type node: L{node.Node}
200         @param node: the new node to try and insert
201         @type contacted: C{boolean}
202         @param contacted: whether the new node is known to be good, i.e.
203             responded to a request (optional, defaults to True)
204         """
205         # Don't add any local nodes to the routing table
206         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
207             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
208             return
209         
210         old = self.table.insertNode(node, contacted=contacted)
211
212         if (isinstance(old, self._Node) and old.id != self.node.id and
213             (datetime.now() - old.lastSeen) > 
214              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
215             
216             # Bucket is full, check to see if old node is still available
217             self.stats.startedAction('ping')
218             df = old.ping(self.node.id)
219             df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
220                             callbackArgs = (old, datetime.now()),
221                             errbackArgs = (old, datetime.now(), node, contacted))
222         elif not old and not contacted:
223             # There's room, we just need to contact the node first
224             self.stats.startedAction('ping')
225             df = node.ping(self.node.id)
226             # Convert the returned contact info into a node
227             df.addCallback(self._pongHandler, datetime.now())
228             # Try adding the contacted node
229             df.addCallbacks(self.insertNode, self._pongError,
230                             errbackArgs = (node, datetime.now()))
231
232     def _freshNodeHandler(self, dict, old, start):
233         """Got a pong from the old node, so update it."""
234         self.stats.completedAction('ping', start)
235         if dict['id'] == old.id:
236             self.table.justSeenNode(old.id)
237     
238     def _staleNodeHandler(self, err, old, start, node, contacted):
239         """The pinged node never responded, so replace it."""
240         log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
241         self.stats.completedAction('ping', start)
242         self.table.invalidateNode(old)
243         self.insertNode(node, contacted)
244     
245     def _pongHandler(self, dict, start):
246         """Node responded properly, change response into a node to insert."""
247         self.stats.completedAction('ping', start)
248         # Create the node using the returned contact info
249         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
250         return n
251
252     def _pongError(self, err, node, start):
253         """Error occurred, fail node and errback or callback with error."""
254         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
255         self.stats.completedAction('ping', start)
256         self.table.nodeFailed(node)
257     
258     def sendJoin(self, node, callback=None, errback=None):
259         """Join the DHT by pinging a bootstrap node.
260         
261         @type node: L{node.Node}
262         @param node: the node to send the join to
263         @type callback: C{method}
264         @param callback: the method to call with the results, it must take 1
265             parameter, the contact info returned by the node
266             (optional, defaults to doing nothing with the results)
267         @type errback: C{method}
268         @param errback: the method to call if an error occurs
269             (optional, defaults to calling the callback with the error)
270         """
271         if errback is None:
272             errback = callback
273         self.stats.startedAction('join')
274         df = node.join(self.node.id)
275         df.addCallbacks(self._joinHandler, self._joinError,
276                         callbackArgs = (node, datetime.now()),
277                         errbackArgs = (node, datetime.now()))
278         if callback:
279             df.addCallbacks(callback, errback)
280
281     def _joinHandler(self, dict, node, start):
282         """Node responded properly, extract the response."""
283         self.stats.completedAction('join', start)
284         # Create the node using the returned contact info
285         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
286         reactor.callLater(0, self.insertNode, n)
287         return (dict['ip_addr'], dict['port'])
288
289     def _joinError(self, err, node, start):
290         """Error occurred, fail node."""
291         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
292         self.stats.completedAction('join', start)
293         self.table.nodeFailed(node)
294         return err
295         
296     def findCloseNodes(self, callback=lambda a: None):
297         """Perform a findNode on the ID one away from our own.
298
299         This will allow us to populate our table with nodes on our network
300         closest to our own. This is called as soon as we start up with an
301         empty table.
302
303         @type callback: C{method}
304         @param callback: the method to call with the results, it must take 1
305             parameter, the list of K closest nodes
306             (optional, defaults to doing nothing with the results)
307         """
308         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
309         self.findNode(id, callback)
310
311     def refreshTable(self, force = False):
312         """Check all the buckets for those that need refreshing.
313         
314         @param force: refresh all buckets regardless of last bucket access time
315             (optional, defaults to False)
316         """
317         def callback(nodes):
318             pass
319     
320         for bucket in self.table.buckets:
321             if force or (datetime.now() - bucket.lastAccessed > 
322                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
323                 # Choose a random ID in the bucket and try and find it
324                 id = newIDInRange(bucket.min, bucket.max)
325                 self.findNode(id, callback)
326
327     def shutdown(self):
328         """Closes the port and cancels pending later calls."""
329         self.listenport.stopListening()
330         try:
331             self.next_checkpoint.cancel()
332         except:
333             pass
334         self.store.close()
335     
336     def getStats(self):
337         """Gather the statistics for the DHT."""
338         return self.stats.formatHTML()
339
340     #{ Remote interface
341     def krpc_ping(self, id, _krpc_sender = None):
342         """Pong with our ID.
343         
344         @type id: C{string}
345         @param id: the node ID of the sender node
346         @type _krpc_sender: (C{string}, C{int})
347         @param _krpc_sender: the sender node's IP address and port
348         """
349         if _krpc_sender is not None:
350             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
351             reactor.callLater(0, self.insertNode, n, False)
352
353         return {"id" : self.node.id}
354         
355     def krpc_join(self, id, _krpc_sender = None):
356         """Add the node by responding with its address and port.
357         
358         @type id: C{string}
359         @param id: the node ID of the sender node
360         @type _krpc_sender: (C{string}, C{int})
361         @param _krpc_sender: the sender node's IP address and port
362         """
363         if _krpc_sender is not None:
364             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
365             reactor.callLater(0, self.insertNode, n, False)
366         else:
367             _krpc_sender = ('127.0.0.1', self.port)
368
369         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
370         
371     def krpc_find_node(self, id, target, _krpc_sender = None):
372         """Find the K closest nodes to the target in the local routing table.
373         
374         @type target: C{string}
375         @param target: the target ID to find nodes for
376         @type id: C{string}
377         @param id: the node ID of the sender node
378         @type _krpc_sender: (C{string}, C{int})
379         @param _krpc_sender: the sender node's IP address and port
380         """
381         if _krpc_sender is not None:
382             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
383             reactor.callLater(0, self.insertNode, n, False)
384         else:
385             _krpc_sender = ('127.0.0.1', self.port)
386
387         nodes = self.table.findNodes(target)
388         nodes = map(lambda node: node.contactInfo(), nodes)
389         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
390         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
391
392
393 class KhashmirRead(KhashmirBase):
394     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
395
396     _Node = KNodeRead
397
398     #{ Local interface
399     def findValue(self, key, callback):
400         """Get the nodes that have values for the key from the global table.
401         
402         @type key: C{string}
403         @param key: the target key to find the values for
404         @type callback: C{method}
405         @param callback: the method to call with the results, it must take 1
406             parameter, the list of nodes with values
407         """
408         # Start with ourself
409         nodes = [copy(self.node)]
410         
411         # Search for others starting with the locally found ones
412         state = FindValue(self, key, callback, self.config, self.stats)
413         reactor.callLater(0, state.goWithNodes, nodes)
414
415     def valueForKey(self, key, callback, searchlocal = True):
416         """Get the values found for key in global table.
417         
418         Callback will be called with a list of values for each peer that
419         returns unique values. The final callback will be an empty list.
420
421         @type key: C{string}
422         @param key: the target key to get the values for
423         @type callback: C{method}
424         @param callback: the method to call with the results, it must take 2
425             parameters: the key, and the values found
426         @type searchlocal: C{boolean}
427         @param searchlocal: whether to also look for any local values
428         """
429
430         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
431             """Use the found nodes to send requests for values to."""
432             # Get any local values
433             if searchlocal:
434                 l = self.store.retrieveValues(key)
435                 if len(l) > 0:
436                     node = copy(self.node)
437                     node.updateNumValues(len(l))
438                     nodes = nodes + [node]
439
440             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
441             reactor.callLater(0, state.goWithNodes, nodes)
442             
443         # First lookup nodes that have values for the key
444         self.findValue(key, _getValueForKey)
445
446     #{ Remote interface
447     def krpc_find_value(self, id, key, _krpc_sender = None):
448         """Find the number of values stored locally for the key, and the K closest nodes.
449         
450         @type key: C{string}
451         @param key: the target key to find the values and nodes for
452         @type id: C{string}
453         @param id: the node ID of the sender node
454         @type _krpc_sender: (C{string}, C{int})
455         @param _krpc_sender: the sender node's IP address and port
456         """
457         if _krpc_sender is not None:
458             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
459             reactor.callLater(0, self.insertNode, n, False)
460     
461         nodes = self.table.findNodes(key)
462         nodes = map(lambda node: node.contactInfo(), nodes)
463         num_values = self.store.countValues(key)
464         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
465
466     def krpc_get_value(self, id, key, num, _krpc_sender = None):
467         """Retrieve the values stored locally for the key.
468         
469         @type key: C{string}
470         @param key: the target key to retrieve the values for
471         @type num: C{int}
472         @param num: the maximum number of values to retrieve, or 0 to
473             retrieve all of them
474         @type id: C{string}
475         @param id: the node ID of the sender node
476         @type _krpc_sender: (C{string}, C{int})
477         @param _krpc_sender: the sender node's IP address and port
478         """
479         if _krpc_sender is not None:
480             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
481             reactor.callLater(0, self.insertNode, n, False)
482     
483         l = self.store.retrieveValues(key)
484         if num == 0 or num >= len(l):
485             return {'values' : l, "id": self.node.id}
486         else:
487             shuffle(l)
488             return {'values' : l[:num], "id": self.node.id}
489
490
491 class KhashmirWrite(KhashmirRead):
492     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
493
494     _Node = KNodeWrite
495
496     #{ Local interface
497     def storeValueForKey(self, key, value, callback=None):
498         """Stores the value for the key in the global table.
499         
500         No status in this implementation, peers respond but don't indicate
501         status of storing values.
502
503         @type key: C{string}
504         @param key: the target key to store the value for
505         @type value: C{string}
506         @param value: the value to store with the key
507         @type callback: C{method}
508         @param callback: the method to call with the results, it must take 3
509             parameters: the key, the value stored, and the result of the store
510             (optional, defaults to doing nothing with the results)
511         """
512         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
513             """Use the returned K closest nodes to store the key at."""
514             if not response:
515                 def _storedValueHandler(key, value, sender):
516                     """Default callback that does nothing."""
517                     pass
518                 response = _storedValueHandler
519             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
520             reactor.callLater(0, action.goWithNodes, nodes)
521             
522         # First find the K closest nodes to operate on.
523         self.findNode(key, _storeValueForKey)
524                     
525     #{ Remote interface
526     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
527         """Store the value locally with the key.
528         
529         @type key: C{string}
530         @param key: the target key to store the value for
531         @type value: C{string}
532         @param value: the value to store with the key
533         @param token: the token to confirm that this peer contacted us previously
534         @type id: C{string}
535         @param id: the node ID of the sender node
536         @type _krpc_sender: (C{string}, C{int})
537         @param _krpc_sender: the sender node's IP address and port
538         """
539         if _krpc_sender is not None:
540             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
541             reactor.callLater(0, self.insertNode, n, False)
542         else:
543             _krpc_sender = ('127.0.0.1', self.port)
544
545         for secret in self.token_secrets:
546             this_token = sha(secret + _krpc_sender[0]).digest()
547             if token == this_token:
548                 self.store.storeValue(key, value)
549                 return {"id" : self.node.id}
550         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
551
552
553 class Khashmir(KhashmirWrite):
554     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
555     _Node = KNodeWrite
556
557
558 class SimpleTests(unittest.TestCase):
559     
560     timeout = 10
561     DHT_DEFAULTS = {'PORT': 9977,
562                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
563                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
564                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
565                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
566                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
567                     'KEY_EXPIRE': 3600, 'SPEW': True, }
568
569     def setUp(self):
570         d = self.DHT_DEFAULTS.copy()
571         d['PORT'] = 4044
572         self.a = Khashmir(d)
573         d = self.DHT_DEFAULTS.copy()
574         d['PORT'] = 4045
575         self.b = Khashmir(d)
576         
577     def tearDown(self):
578         self.a.shutdown()
579         self.b.shutdown()
580         os.unlink(self.a.store.db)
581         os.unlink(self.b.store.db)
582
583     def testAddContact(self):
584         self.failUnlessEqual(len(self.a.table.buckets), 1)
585         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
586
587         self.failUnlessEqual(len(self.b.table.buckets), 1)
588         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
589
590         self.a.addContact('127.0.0.1', 4045)
591         reactor.iterate()
592         reactor.iterate()
593         reactor.iterate()
594         reactor.iterate()
595
596         self.failUnlessEqual(len(self.a.table.buckets), 1)
597         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
598         self.failUnlessEqual(len(self.b.table.buckets), 1)
599         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
600
601     def testStoreRetrieve(self):
602         self.a.addContact('127.0.0.1', 4045)
603         reactor.iterate()
604         reactor.iterate()
605         reactor.iterate()
606         reactor.iterate()
607         self.got = 0
608         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
609         reactor.iterate()
610         reactor.iterate()
611         reactor.iterate()
612         reactor.iterate()
613         reactor.iterate()
614         reactor.iterate()
615         self.a.valueForKey(sha('foo').digest(), self._cb)
616         reactor.iterate()
617         reactor.iterate()
618         reactor.iterate()
619         reactor.iterate()
620         reactor.iterate()
621         reactor.iterate()
622         reactor.iterate()
623
624     def _cb(self, key, val):
625         if not val:
626             self.failUnlessEqual(self.got, 1)
627         elif 'foobar' in val:
628             self.got = 1
629
630
631 class MultiTest(unittest.TestCase):
632     
633     timeout = 30
634     num = 20
635     DHT_DEFAULTS = {'PORT': 9977,
636                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
637                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
638                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
639                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
640                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
641                     'KEY_EXPIRE': 3600, 'SPEW': True, }
642
643     def _done(self, val):
644         self.done = 1
645         
646     def setUp(self):
647         self.l = []
648         self.startport = 4088
649         for i in range(self.num):
650             d = self.DHT_DEFAULTS.copy()
651             d['PORT'] = self.startport + i
652             self.l.append(Khashmir(d))
653         reactor.iterate()
654         reactor.iterate()
655         
656         for i in self.l:
657             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
658             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
659             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
660             reactor.iterate()
661             reactor.iterate()
662             reactor.iterate() 
663             
664         for i in self.l:
665             self.done = 0
666             i.findCloseNodes(self._done)
667             while not self.done:
668                 reactor.iterate()
669         for i in self.l:
670             self.done = 0
671             i.findCloseNodes(self._done)
672             while not self.done:
673                 reactor.iterate()
674
675     def tearDown(self):
676         for i in self.l:
677             i.shutdown()
678             os.unlink(i.store.db)
679             
680         reactor.iterate()
681         
682     def testStoreRetrieve(self):
683         for i in range(10):
684             K = newID()
685             V = newID()
686             
687             for a in range(3):
688                 self.done = 0
689                 def _scb(key, value, result):
690                     self.done = 1
691                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
692                 while not self.done:
693                     reactor.iterate()
694
695
696                 def _rcb(key, val):
697                     if not val:
698                         self.done = 1
699                         self.failUnlessEqual(self.got, 1)
700                     elif V in val:
701                         self.got = 1
702                 for x in range(3):
703                     self.got = 0
704                     self.done = 0
705                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
706                     while not self.done:
707                         reactor.iterate()