Schedule a re-ping message after adding a new node.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
22
23 from db import DB
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
29 import krpc
30
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37
38 class KhashmirBase(protocol.Factory):
39     """The base Khashmir class, with base functionality and find node, no key-value mappings.
40     
41     @type _Node: L{node.Node}
42     @ivar _Node: the knode implementation to use for this class of DHT
43     @type config: C{dictionary}
44     @ivar config: the configuration parameters for the DHT
45     @type pinging: C{dictionary}
46     @ivar pinging: the node's that are currently being pinged, keys are the
47         node id's, values are the Deferred or DelayedCall objects
48     @type port: C{int}
49     @ivar port: the port to listen on
50     @type store: L{db.DB}
51     @ivar store: the database to store nodes and key/value pairs in
52     @type node: L{node.Node}
53     @ivar node: this node
54     @type table: L{ktable.KTable}
55     @ivar table: the routing table
56     @type token_secrets: C{list} of C{string}
57     @ivar token_secrets: the current secrets to use to create tokens
58     @type stats: L{stats.StatsLogger}
59     @ivar stats: the statistics gatherer
60     @type udp: L{krpc.hostbroker}
61     @ivar udp: the factory for the KRPC protocol
62     @type listenport: L{twisted.internet.interfaces.IListeningPort}
63     @ivar listenport: the UDP listening port
64     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65     @ivar next_checkpoint: the delayed call for the next checkpoint
66     """
67     
68     _Node = KNodeBase
69     
70     def __init__(self, config, cache_dir='/tmp'):
71         """Initialize the Khashmir class and call the L{setup} method.
72         
73         @type config: C{dictionary}
74         @param config: the configuration parameters for the DHT
75         @type cache_dir: C{string}
76         @param cache_dir: the directory to store all files in
77             (optional, defaults to the /tmp directory)
78         """
79         self.config = None
80         self.pinging = {}
81         self.setup(config, cache_dir)
82         
83     def setup(self, config, cache_dir):
84         """Setup all the Khashmir sub-modules.
85         
86         @type config: C{dictionary}
87         @param config: the configuration parameters for the DHT
88         @type cache_dir: C{string}
89         @param cache_dir: the directory to store all files in
90         """
91         self.config = config
92         self.port = config['PORT']
93         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94         self.node = self._loadSelfNode('', self.port)
95         self.table = KTable(self.node, config)
96         self.token_secrets = [newID()]
97         self.stats = StatsLogger(self.table, self.store)
98         
99         # Start listening
100         self.udp = krpc.hostbroker(self, self.stats, config)
101         self.udp.protocol = krpc.KRPC
102         self.listenport = reactor.listenUDP(self.port, self.udp)
103         
104         # Load the routing table and begin checkpointing
105         self._loadRoutingTable()
106         self.refreshTable(force = True)
107         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
108
109     def Node(self, id, host = None, port = None):
110         """Create a new node.
111         
112         @see: L{node.Node.__init__}
113         """
114         n = self._Node(id, host, port)
115         n.table = self.table
116         n.conn = self.udp.connectionForAddr((n.host, n.port))
117         return n
118     
119     def __del__(self):
120         """Stop listening for packets."""
121         self.listenport.stopListening()
122         
123     def _loadSelfNode(self, host, port):
124         """Create this node, loading any previously saved one."""
125         id = self.store.getSelfNode()
126         if not id:
127             id = newID()
128         return self._Node(id, host, port)
129         
130     def checkpoint(self):
131         """Perform some periodic maintenance operations."""
132         # Create a new token secret
133         self.token_secrets.insert(0, newID())
134         if len(self.token_secrets) > 3:
135             self.token_secrets.pop()
136             
137         # Save some parameters for reloading
138         self.store.saveSelfNode(self.node.id)
139         self.store.dumpRoutingTable(self.table.buckets)
140         
141         # DHT maintenance
142         self.store.expireValues(self.config['KEY_EXPIRE'])
143         self.refreshTable()
144         
145         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
146                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
147                                                  self.checkpoint)
148         
149     def _loadRoutingTable(self):
150         """Load the previous routing table nodes from the database.
151         
152         It's usually a good idea to call refreshTable(force = True) after
153         loading the table.
154         """
155         nodes = self.store.getRoutingTable()
156         for rec in nodes:
157             n = self.Node(rec[0], rec[1], int(rec[2]))
158             self.table.insertNode(n, contacted = False)
159             
160     #{ Local interface
161     def addContact(self, host, port, callback=None, errback=None):
162         """Ping this node and add the contact info to the table on pong.
163         
164         @type host: C{string}
165         @param host: the IP address of the node to contact
166         @type port: C{int}
167         @param port:the port of the node to contact
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the contact info returned by the node
171             (optional, defaults to doing nothing with the results)
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to calling the callback with the error)
175         """
176         n = self.Node(NULL_ID, host, port)
177         self.sendJoin(n, callback=callback, errback=errback)
178
179     def findNode(self, id, callback):
180         """Find the contact info for the K closest nodes in the global table.
181         
182         @type id: C{string}
183         @param id: the target ID to find the K closest nodes of
184         @type callback: C{method}
185         @param callback: the method to call with the results, it must take 1
186             parameter, the list of K closest nodes
187         """
188         # Mark the bucket as having been accessed
189         self.table.touch(id)
190         
191         # Start with our node
192         nodes = [copy(self.node)]
193
194         # Start the finding nodes action
195         state = FindNode(self, id, callback, self.config, self.stats)
196         reactor.callLater(0, state.goWithNodes, nodes)
197     
198     def insertNode(self, node, contacted = True):
199         """Try to insert a node in our local table, pinging oldest contact if necessary.
200         
201         If all you have is a host/port, then use L{addContact}, which calls this
202         method after receiving the PONG from the remote node. The reason for
203         the separation is we can't insert a node into the table without its
204         node ID. That means of course the node passed into this method needs
205         to be a properly formed Node object with a valid ID.
206
207         @type node: L{node.Node}
208         @param node: the new node to try and insert
209         @type contacted: C{boolean}
210         @param contacted: whether the new node is known to be good, i.e.
211             responded to a request (optional, defaults to True)
212         """
213         # Don't add any local nodes to the routing table
214         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
216             return
217         
218         old = self.table.insertNode(node, contacted=contacted)
219
220         if (isinstance(old, self._Node) and old.id != self.node.id and
221             (datetime.now() - old.lastSeen) > 
222              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
223             
224             # Bucket is full, check to see if old node is still available
225             df = self.sendPing(old)
226             df.addErrback(self._staleNodeHandler, old, node, contacted)
227         elif not old and not contacted:
228             # There's room, we just need to contact the node first
229             df = self.sendPing(node)
230             # Also schedule a future ping to make sure the node works
231             def rePing(newnode, self = self):
232                 if newnode.id not in self.pinging:
233                     self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234                                                                  self.sendPing, newnode)
235             df.addCallback(rePing)
236
237     def _staleNodeHandler(self, err, old, node, contacted):
238         """The pinged node never responded, so replace it."""
239         self.table.invalidateNode(old)
240         self.insertNode(node, contacted)
241         return err
242     
243     def nodeFailed(self, node):
244         """Mark a node as having failed a request and schedule a future check.
245         
246         @type node: L{node.Node}
247         @param node: the new node to try and insert
248         """
249         exists = self.table.nodeFailed(node)
250         
251         # If in the table, schedule a ping, if one isn't already sent/scheduled
252         if exists and node.id not in self.pinging:
253             self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
254                                                       self.sendPing, node)
255     
256     def sendPing(self, node):
257         """Ping the node to see if it's still alive.
258         
259         @type node: L{node.Node}
260         @param node: the node to send the join to
261         """
262         # Check for a ping already underway
263         if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
264             self.pinging[node.id].active()):
265             self.pinging[node.id].cancel()
266         elif isinstance(self.pinging.get(node.id, None), Deferred):
267             return self.pinging[node.id]
268
269         self.stats.startedAction('ping')
270         df = node.ping(self.node.id)
271         self.pinging[node.id] = df
272         df.addCallbacks(self._pingHandler, self._pingError,
273                         callbackArgs = (node, datetime.now()),
274                         errbackArgs = (node, datetime.now()))
275         return df
276
277     def _pingHandler(self, dict, node, start):
278         """Node responded properly, update it and return the node object."""
279         self.stats.completedAction('ping', start)
280         del self.pinging[node.id]
281         # Create the node using the returned contact info
282         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
283         reactor.callLater(0, self.insertNode, n)
284         return n
285
286     def _pingError(self, err, node, start):
287         """Error occurred, fail node."""
288         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
289         self.stats.completedAction('ping', start)
290         del self.pinging[node.id]
291         self.nodeFailed(node)
292         return err
293         
294     def sendJoin(self, node, callback=None, errback=None):
295         """Join the DHT by pinging a bootstrap node.
296         
297         @type node: L{node.Node}
298         @param node: the node to send the join to
299         @type callback: C{method}
300         @param callback: the method to call with the results, it must take 1
301             parameter, the contact info returned by the node
302             (optional, defaults to doing nothing with the results)
303         @type errback: C{method}
304         @param errback: the method to call if an error occurs
305             (optional, defaults to calling the callback with the error)
306         """
307         if errback is None:
308             errback = callback
309         self.stats.startedAction('join')
310         df = node.join(self.node.id)
311         df.addCallbacks(self._joinHandler, self._joinError,
312                         callbackArgs = (node, datetime.now()),
313                         errbackArgs = (node, datetime.now()))
314         if callback:
315             df.addCallbacks(callback, errback)
316
317     def _joinHandler(self, dict, node, start):
318         """Node responded properly, extract the response."""
319         self.stats.completedAction('join', start)
320         # Create the node using the returned contact info
321         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
322         reactor.callLater(0, self.insertNode, n)
323         return (dict['ip_addr'], dict['port'])
324
325     def _joinError(self, err, node, start):
326         """Error occurred, fail node."""
327         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
328         self.stats.completedAction('join', start)
329         self.nodeFailed(node)
330         return err
331         
332     def findCloseNodes(self, callback=lambda a: None):
333         """Perform a findNode on the ID one away from our own.
334
335         This will allow us to populate our table with nodes on our network
336         closest to our own. This is called as soon as we start up with an
337         empty table.
338
339         @type callback: C{method}
340         @param callback: the method to call with the results, it must take 1
341             parameter, the list of K closest nodes
342             (optional, defaults to doing nothing with the results)
343         """
344         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
345         self.findNode(id, callback)
346
347     def refreshTable(self, force = False):
348         """Check all the buckets for those that need refreshing.
349         
350         @param force: refresh all buckets regardless of last bucket access time
351             (optional, defaults to False)
352         """
353         def callback(nodes):
354             pass
355     
356         for bucket in self.table.buckets:
357             if force or (datetime.now() - bucket.lastAccessed > 
358                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
359                 # Choose a random ID in the bucket and try and find it
360                 id = newIDInRange(bucket.min, bucket.max)
361                 self.findNode(id, callback)
362
363     def shutdown(self):
364         """Closes the port and cancels pending later calls."""
365         self.listenport.stopListening()
366         try:
367             self.next_checkpoint.cancel()
368         except:
369             pass
370         for call in self.pinging:
371             if isinstance(call, DelayedCall) and call.active():
372                 call.cancel()
373         self.store.close()
374     
375     def getStats(self):
376         """Gather the statistics for the DHT."""
377         return self.stats.formatHTML()
378
379     #{ Remote interface
380     def krpc_ping(self, id, _krpc_sender = None):
381         """Pong with our ID.
382         
383         @type id: C{string}
384         @param id: the node ID of the sender node
385         @type _krpc_sender: (C{string}, C{int})
386         @param _krpc_sender: the sender node's IP address and port
387         """
388         if _krpc_sender is not None:
389             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
390             reactor.callLater(0, self.insertNode, n, False)
391
392         return {"id" : self.node.id}
393         
394     def krpc_join(self, id, _krpc_sender = None):
395         """Add the node by responding with its address and port.
396         
397         @type id: C{string}
398         @param id: the node ID of the sender node
399         @type _krpc_sender: (C{string}, C{int})
400         @param _krpc_sender: the sender node's IP address and port
401         """
402         if _krpc_sender is not None:
403             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
404             reactor.callLater(0, self.insertNode, n, False)
405         else:
406             _krpc_sender = ('127.0.0.1', self.port)
407
408         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
409         
410     def krpc_find_node(self, id, target, _krpc_sender = None):
411         """Find the K closest nodes to the target in the local routing table.
412         
413         @type target: C{string}
414         @param target: the target ID to find nodes for
415         @type id: C{string}
416         @param id: the node ID of the sender node
417         @type _krpc_sender: (C{string}, C{int})
418         @param _krpc_sender: the sender node's IP address and port
419         """
420         if _krpc_sender is not None:
421             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
422             reactor.callLater(0, self.insertNode, n, False)
423         else:
424             _krpc_sender = ('127.0.0.1', self.port)
425
426         nodes = self.table.findNodes(target)
427         nodes = map(lambda node: node.contactInfo(), nodes)
428         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
429         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
430
431
432 class KhashmirRead(KhashmirBase):
433     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
434
435     _Node = KNodeRead
436
437     #{ Local interface
438     def findValue(self, key, callback):
439         """Get the nodes that have values for the key from the global table.
440         
441         @type key: C{string}
442         @param key: the target key to find the values for
443         @type callback: C{method}
444         @param callback: the method to call with the results, it must take 1
445             parameter, the list of nodes with values
446         """
447         # Mark the bucket as having been accessed
448         self.table.touch(key)
449         
450         # Start with ourself
451         nodes = [copy(self.node)]
452         
453         # Search for others starting with the locally found ones
454         state = FindValue(self, key, callback, self.config, self.stats)
455         reactor.callLater(0, state.goWithNodes, nodes)
456
457     def valueForKey(self, key, callback, searchlocal = True):
458         """Get the values found for key in global table.
459         
460         Callback will be called with a list of values for each peer that
461         returns unique values. The final callback will be an empty list.
462
463         @type key: C{string}
464         @param key: the target key to get the values for
465         @type callback: C{method}
466         @param callback: the method to call with the results, it must take 2
467             parameters: the key, and the values found
468         @type searchlocal: C{boolean}
469         @param searchlocal: whether to also look for any local values
470         """
471
472         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
473             """Use the found nodes to send requests for values to."""
474             # Get any local values
475             if searchlocal:
476                 l = self.store.retrieveValues(key)
477                 if len(l) > 0:
478                     node = copy(self.node)
479                     node.updateNumValues(len(l))
480                     nodes = nodes + [node]
481
482             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
483             reactor.callLater(0, state.goWithNodes, nodes)
484             
485         # First lookup nodes that have values for the key
486         self.findValue(key, _getValueForKey)
487
488     #{ Remote interface
489     def krpc_find_value(self, id, key, _krpc_sender = None):
490         """Find the number of values stored locally for the key, and the K closest nodes.
491         
492         @type key: C{string}
493         @param key: the target key to find the values and nodes for
494         @type id: C{string}
495         @param id: the node ID of the sender node
496         @type _krpc_sender: (C{string}, C{int})
497         @param _krpc_sender: the sender node's IP address and port
498         """
499         if _krpc_sender is not None:
500             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
501             reactor.callLater(0, self.insertNode, n, False)
502     
503         nodes = self.table.findNodes(key)
504         nodes = map(lambda node: node.contactInfo(), nodes)
505         num_values = self.store.countValues(key)
506         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
507
508     def krpc_get_value(self, id, key, num, _krpc_sender = None):
509         """Retrieve the values stored locally for the key.
510         
511         @type key: C{string}
512         @param key: the target key to retrieve the values for
513         @type num: C{int}
514         @param num: the maximum number of values to retrieve, or 0 to
515             retrieve all of them
516         @type id: C{string}
517         @param id: the node ID of the sender node
518         @type _krpc_sender: (C{string}, C{int})
519         @param _krpc_sender: the sender node's IP address and port
520         """
521         if _krpc_sender is not None:
522             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
523             reactor.callLater(0, self.insertNode, n, False)
524     
525         l = self.store.retrieveValues(key)
526         if num == 0 or num >= len(l):
527             return {'values' : l, "id": self.node.id}
528         else:
529             shuffle(l)
530             return {'values' : l[:num], "id": self.node.id}
531
532
533 class KhashmirWrite(KhashmirRead):
534     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
535
536     _Node = KNodeWrite
537
538     #{ Local interface
539     def storeValueForKey(self, key, value, callback=None):
540         """Stores the value for the key in the global table.
541         
542         No status in this implementation, peers respond but don't indicate
543         status of storing values.
544
545         @type key: C{string}
546         @param key: the target key to store the value for
547         @type value: C{string}
548         @param value: the value to store with the key
549         @type callback: C{method}
550         @param callback: the method to call with the results, it must take 3
551             parameters: the key, the value stored, and the result of the store
552             (optional, defaults to doing nothing with the results)
553         """
554         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
555             """Use the returned K closest nodes to store the key at."""
556             if not response:
557                 def _storedValueHandler(key, value, sender):
558                     """Default callback that does nothing."""
559                     pass
560                 response = _storedValueHandler
561             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
562             reactor.callLater(0, action.goWithNodes, nodes)
563             
564         # First find the K closest nodes to operate on.
565         self.findNode(key, _storeValueForKey)
566                     
567     #{ Remote interface
568     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
569         """Store the value locally with the key.
570         
571         @type key: C{string}
572         @param key: the target key to store the value for
573         @type value: C{string}
574         @param value: the value to store with the key
575         @param token: the token to confirm that this peer contacted us previously
576         @type id: C{string}
577         @param id: the node ID of the sender node
578         @type _krpc_sender: (C{string}, C{int})
579         @param _krpc_sender: the sender node's IP address and port
580         """
581         if _krpc_sender is not None:
582             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
583             reactor.callLater(0, self.insertNode, n, False)
584         else:
585             _krpc_sender = ('127.0.0.1', self.port)
586
587         for secret in self.token_secrets:
588             this_token = sha(secret + _krpc_sender[0]).digest()
589             if token == this_token:
590                 self.store.storeValue(key, value)
591                 return {"id" : self.node.id}
592         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
593
594
595 class Khashmir(KhashmirWrite):
596     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
597     _Node = KNodeWrite
598
599
600 class SimpleTests(unittest.TestCase):
601     
602     timeout = 10
603     DHT_DEFAULTS = {'PORT': 9977,
604                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
605                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
606                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
607                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
608                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
609                     'KEY_EXPIRE': 3600, 'SPEW': True, }
610
611     def setUp(self):
612         d = self.DHT_DEFAULTS.copy()
613         d['PORT'] = 4044
614         self.a = Khashmir(d)
615         d = self.DHT_DEFAULTS.copy()
616         d['PORT'] = 4045
617         self.b = Khashmir(d)
618         
619     def tearDown(self):
620         self.a.shutdown()
621         self.b.shutdown()
622         os.unlink(self.a.store.db)
623         os.unlink(self.b.store.db)
624
625     def testAddContact(self):
626         self.failUnlessEqual(len(self.a.table.buckets), 1)
627         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
628
629         self.failUnlessEqual(len(self.b.table.buckets), 1)
630         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
631
632         self.a.addContact('127.0.0.1', 4045)
633         reactor.iterate()
634         reactor.iterate()
635         reactor.iterate()
636         reactor.iterate()
637         reactor.iterate()
638         reactor.iterate()
639         reactor.iterate()
640         reactor.iterate()
641
642         self.failUnlessEqual(len(self.a.table.buckets), 1)
643         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
644         self.failUnlessEqual(len(self.b.table.buckets), 1)
645         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
646
647     def testStoreRetrieve(self):
648         self.a.addContact('127.0.0.1', 4045)
649         reactor.iterate()
650         reactor.iterate()
651         reactor.iterate()
652         reactor.iterate()
653         self.got = 0
654         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
655         reactor.iterate()
656         reactor.iterate()
657         reactor.iterate()
658         reactor.iterate()
659         reactor.iterate()
660         reactor.iterate()
661         self.a.valueForKey(sha('foo').digest(), self._cb)
662         reactor.iterate()
663         reactor.iterate()
664         reactor.iterate()
665         reactor.iterate()
666         reactor.iterate()
667         reactor.iterate()
668         reactor.iterate()
669         reactor.iterate()
670         reactor.iterate()
671         reactor.iterate()
672         reactor.iterate()
673         reactor.iterate()
674         reactor.iterate()
675         reactor.iterate()
676         reactor.iterate()
677         reactor.iterate()
678         reactor.iterate()
679         reactor.iterate()
680         reactor.iterate()
681         reactor.iterate()
682         reactor.iterate()
683
684     def _cb(self, key, val):
685         if not val:
686             self.failUnlessEqual(self.got, 1)
687         elif 'foobar' in val:
688             self.got = 1
689
690
691 class MultiTest(unittest.TestCase):
692     
693     timeout = 30
694     num = 20
695     DHT_DEFAULTS = {'PORT': 9977,
696                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
697                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
698                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
699                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
700                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
701                     'KEY_EXPIRE': 3600, 'SPEW': True, }
702
703     def _done(self, val):
704         self.done = 1
705         
706     def setUp(self):
707         self.l = []
708         self.startport = 4088
709         for i in range(self.num):
710             d = self.DHT_DEFAULTS.copy()
711             d['PORT'] = self.startport + i
712             self.l.append(Khashmir(d))
713         reactor.iterate()
714         reactor.iterate()
715         
716         for i in self.l:
717             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
718             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
719             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
720             reactor.iterate()
721             reactor.iterate()
722             reactor.iterate() 
723             reactor.iterate()
724             reactor.iterate()
725             reactor.iterate() 
726             
727         for i in self.l:
728             self.done = 0
729             i.findCloseNodes(self._done)
730             while not self.done:
731                 reactor.iterate()
732         for i in self.l:
733             self.done = 0
734             i.findCloseNodes(self._done)
735             while not self.done:
736                 reactor.iterate()
737
738     def tearDown(self):
739         for i in self.l:
740             i.shutdown()
741             os.unlink(i.store.db)
742             
743         reactor.iterate()
744         
745     def testStoreRetrieve(self):
746         for i in range(10):
747             K = newID()
748             V = newID()
749             
750             for a in range(3):
751                 self.done = 0
752                 def _scb(key, value, result):
753                     self.done = 1
754                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
755                 while not self.done:
756                     reactor.iterate()
757
758
759                 def _rcb(key, val):
760                     if not val:
761                         self.done = 1
762                         self.failUnlessEqual(self.got, 1)
763                     elif V in val:
764                         self.got = 1
765                 for x in range(3):
766                     self.got = 0
767                     self.done = 0
768                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
769                     while not self.done:
770                         reactor.iterate()