Added .gitattributes
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
22
23 from db import DB
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
29 import krpc
30
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34                      '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35                      '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37
38 class KhashmirBase(protocol.Factory):
39     """The base Khashmir class, with base functionality and find node, no key-value mappings.
40     
41     @type _Node: L{node.Node}
42     @ivar _Node: the knode implementation to use for this class of DHT
43     @type config: C{dictionary}
44     @ivar config: the configuration parameters for the DHT
45     @type pinging: C{dictionary}
46     @ivar pinging: the node's that are currently being pinged, keys are the
47         node id's, values are the Deferred or DelayedCall objects
48     @type port: C{int}
49     @ivar port: the port to listen on
50     @type store: L{db.DB}
51     @ivar store: the database to store nodes and key/value pairs in
52     @type node: L{node.Node}
53     @ivar node: this node
54     @type table: L{ktable.KTable}
55     @ivar table: the routing table
56     @type token_secrets: C{list} of C{string}
57     @ivar token_secrets: the current secrets to use to create tokens
58     @type stats: L{stats.StatsLogger}
59     @ivar stats: the statistics gatherer
60     @type udp: L{krpc.hostbroker}
61     @ivar udp: the factory for the KRPC protocol
62     @type listenport: L{twisted.internet.interfaces.IListeningPort}
63     @ivar listenport: the UDP listening port
64     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65     @ivar next_checkpoint: the delayed call for the next checkpoint
66     """
67     
68     _Node = KNodeBase
69     
70     def __init__(self, config, cache_dir='/tmp'):
71         """Initialize the Khashmir class and call the L{setup} method.
72         
73         @type config: C{dictionary}
74         @param config: the configuration parameters for the DHT
75         @type cache_dir: C{string}
76         @param cache_dir: the directory to store all files in
77             (optional, defaults to the /tmp directory)
78         """
79         self.config = None
80         self.pinging = {}
81         self.setup(config, cache_dir)
82         
83     def setup(self, config, cache_dir):
84         """Setup all the Khashmir sub-modules.
85         
86         @type config: C{dictionary}
87         @param config: the configuration parameters for the DHT
88         @type cache_dir: C{string}
89         @param cache_dir: the directory to store all files in
90         """
91         self.config = config
92         self.port = config['PORT']
93         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94         self.node = self._loadSelfNode('', self.port)
95         self.table = KTable(self.node, config)
96         self.token_secrets = [newID()]
97         self.stats = StatsLogger(self.table, self.store)
98         
99         # Start listening
100         self.udp = krpc.hostbroker(self, self.stats, config)
101         self.udp.protocol = krpc.KRPC
102         self.listenport = reactor.listenUDP(self.port, self.udp)
103         
104         # Load the routing table and begin checkpointing
105         self._loadRoutingTable()
106         self.refreshTable(force = True)
107         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
108
109     def Node(self, id, host = None, port = None):
110         """Create a new node.
111         
112         @see: L{node.Node.__init__}
113         """
114         n = self._Node(id, host, port)
115         n.table = self.table
116         n.conn = self.udp.connectionForAddr((n.host, n.port))
117         return n
118     
119     def __del__(self):
120         """Stop listening for packets."""
121         self.listenport.stopListening()
122         
123     def _loadSelfNode(self, host, port):
124         """Create this node, loading any previously saved one."""
125         id = self.store.getSelfNode()
126         if not id or not id.endswith(self.config['VERSION']):
127             id = newID(self.config['VERSION'])
128         return self._Node(id, host, port)
129         
130     def checkpoint(self):
131         """Perform some periodic maintenance operations."""
132         # Create a new token secret
133         self.token_secrets.insert(0, newID())
134         if len(self.token_secrets) > 3:
135             self.token_secrets.pop()
136             
137         # Save some parameters for reloading
138         self.store.saveSelfNode(self.node.id)
139         self.store.dumpRoutingTable(self.table.buckets)
140         
141         # DHT maintenance
142         self.store.expireValues(self.config['KEY_EXPIRE'])
143         self.refreshTable()
144         
145         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
146                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
147                                                  self.checkpoint)
148         
149     def _loadRoutingTable(self):
150         """Load the previous routing table nodes from the database.
151         
152         It's usually a good idea to call refreshTable(force = True) after
153         loading the table.
154         """
155         nodes = self.store.getRoutingTable()
156         for rec in nodes:
157             n = self.Node(rec[0], rec[1], int(rec[2]))
158             self.table.insertNode(n, contacted = False)
159             
160     #{ Local interface
161     def addContact(self, host, port, callback=None, errback=None):
162         """Ping this node and add the contact info to the table on pong.
163         
164         @type host: C{string}
165         @param host: the IP address of the node to contact
166         @type port: C{int}
167         @param port:the port of the node to contact
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the contact info returned by the node
171             (optional, defaults to doing nothing with the results)
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to calling the callback with the error)
175         """
176         n = self.Node(NULL_ID, host, port)
177         self.sendJoin(n, callback=callback, errback=errback)
178
179     def findNode(self, id, callback):
180         """Find the contact info for the K closest nodes in the global table.
181         
182         @type id: C{string}
183         @param id: the target ID to find the K closest nodes of
184         @type callback: C{method}
185         @param callback: the method to call with the results, it must take 1
186             parameter, the list of K closest nodes
187         """
188         # Mark the bucket as having been accessed
189         self.table.touch(id)
190         
191         # Start with our node
192         nodes = [copy(self.node)]
193
194         # Start the finding nodes action
195         state = FindNode(self, id, callback, self.config, self.stats)
196         reactor.callLater(0, state.goWithNodes, nodes)
197     
198     def insertNode(self, node, contacted = True):
199         """Try to insert a node in our local table, pinging oldest contact if necessary.
200         
201         If all you have is a host/port, then use L{addContact}, which calls this
202         method after receiving the PONG from the remote node. The reason for
203         the separation is we can't insert a node into the table without its
204         node ID. That means of course the node passed into this method needs
205         to be a properly formed Node object with a valid ID.
206
207         @type node: L{node.Node}
208         @param node: the new node to try and insert
209         @type contacted: C{boolean}
210         @param contacted: whether the new node is known to be good, i.e.
211             responded to a request (optional, defaults to True)
212         """
213         # Don't add any local nodes to the routing table
214         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215             log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
216             return
217         
218         old = self.table.insertNode(node, contacted=contacted)
219
220         if (isinstance(old, self._Node) and old.id != self.node.id and
221             (datetime.now() - old.lastSeen) > 
222              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
223             
224             # Bucket is full, check to see if old node is still available
225             df = self.sendPing(old)
226             df.addErrback(self._staleNodeHandler, old, node, contacted)
227         elif not old and not contacted:
228             # There's room, we just need to contact the node first
229             df = self.sendPing(node)
230             # Also schedule a future ping to make sure the node works
231             def rePing(newnode, self = self):
232                 if newnode.id not in self.pinging:
233                     self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234                                                                  self.sendPing, newnode)
235                 return newnode
236             df.addCallback(rePing)
237
238     def _staleNodeHandler(self, err, old, node, contacted):
239         """The pinged node never responded, so replace it."""
240         self.table.invalidateNode(old)
241         self.insertNode(node, contacted)
242         return err
243     
244     def nodeFailed(self, node):
245         """Mark a node as having failed a request and schedule a future check.
246         
247         @type node: L{node.Node}
248         @param node: the new node to try and insert
249         """
250         exists = self.table.nodeFailed(node)
251         
252         # If in the table, schedule a ping, if one isn't already sent/scheduled
253         if exists and node.id not in self.pinging:
254             self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
255                                                       self.sendPing, node)
256     
257     def sendPing(self, node):
258         """Ping the node to see if it's still alive.
259         
260         @type node: L{node.Node}
261         @param node: the node to send the join to
262         """
263         # Check for a ping already underway
264         if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265             self.pinging[node.id].active()):
266             self.pinging[node.id].cancel()
267         elif isinstance(self.pinging.get(node.id, None), Deferred):
268             return self.pinging[node.id]
269
270         self.stats.startedAction('ping')
271         df = node.ping(self.node.id)
272         self.pinging[node.id] = df
273         df.addCallbacks(self._pingHandler, self._pingError,
274                         callbackArgs = (node, datetime.now()),
275                         errbackArgs = (node, datetime.now()))
276         return df
277
278     def _pingHandler(self, dict, node, start):
279         """Node responded properly, update it and return the node object."""
280         self.stats.completedAction('ping', start)
281         del self.pinging[node.id]
282         # Create the node using the returned contact info
283         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284         reactor.callLater(0, self.insertNode, n)
285         return n
286
287     def _pingError(self, err, node, start):
288         """Error occurred, fail node."""
289         log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290         self.stats.completedAction('ping', start)
291         
292         # Consume unhandled errors
293         self.pinging[node.id].addErrback(lambda ping_err: None)
294         del self.pinging[node.id]
295         
296         self.nodeFailed(node)
297         return err
298         
299     def sendJoin(self, node, callback=None, errback=None):
300         """Join the DHT by pinging a bootstrap node.
301         
302         @type node: L{node.Node}
303         @param node: the node to send the join to
304         @type callback: C{method}
305         @param callback: the method to call with the results, it must take 1
306             parameter, the contact info returned by the node
307             (optional, defaults to doing nothing with the results)
308         @type errback: C{method}
309         @param errback: the method to call if an error occurs
310             (optional, defaults to calling the callback with the error)
311         """
312         if errback is None:
313             errback = callback
314         self.stats.startedAction('join')
315         df = node.join(self.node.id)
316         df.addCallbacks(self._joinHandler, self._joinError,
317                         callbackArgs = (node, datetime.now()),
318                         errbackArgs = (node, datetime.now()))
319         if callback:
320             df.addCallbacks(callback, errback)
321
322     def _joinHandler(self, dict, node, start):
323         """Node responded properly, extract the response."""
324         self.stats.completedAction('join', start)
325         # Create the node using the returned contact info
326         n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
327         reactor.callLater(0, self.insertNode, n)
328         return (dict['ip_addr'], dict['port'])
329
330     def _joinError(self, err, node, start):
331         """Error occurred, fail node."""
332         log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
333         self.stats.completedAction('join', start)
334         self.nodeFailed(node)
335         return err
336         
337     def findCloseNodes(self, callback=lambda a: None):
338         """Perform a findNode on the ID one away from our own.
339
340         This will allow us to populate our table with nodes on our network
341         closest to our own. This is called as soon as we start up with an
342         empty table.
343
344         @type callback: C{method}
345         @param callback: the method to call with the results, it must take 1
346             parameter, the list of K closest nodes
347             (optional, defaults to doing nothing with the results)
348         """
349         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
350         self.findNode(id, callback)
351
352     def refreshTable(self, force = False):
353         """Check all the buckets for those that need refreshing.
354         
355         @param force: refresh all buckets regardless of last bucket access time
356             (optional, defaults to False)
357         """
358         def callback(nodes):
359             pass
360     
361         for bucket in self.table.buckets:
362             if force or (datetime.now() - bucket.lastAccessed > 
363                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
364                 # Choose a random ID in the bucket and try and find it
365                 id = newIDInRange(bucket.min, bucket.max)
366                 self.findNode(id, callback)
367
368     def shutdown(self):
369         """Closes the port and cancels pending later calls."""
370         self.listenport.stopListening()
371         try:
372             self.next_checkpoint.cancel()
373         except:
374             pass
375         for nodeid in self.pinging.keys():
376             if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active():
377                 self.pinging[nodeid].cancel()
378                 del self.pinging[nodeid]
379         self.store.close()
380     
381     def getStats(self):
382         """Gather the statistics for the DHT."""
383         return self.stats.formatHTML()
384
385     #{ Remote interface
386     def krpc_ping(self, id, _krpc_sender = None):
387         """Pong with our ID.
388         
389         @type id: C{string}
390         @param id: the node ID of the sender node
391         @type _krpc_sender: (C{string}, C{int})
392         @param _krpc_sender: the sender node's IP address and port
393         """
394         if _krpc_sender is not None:
395             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
396             reactor.callLater(0, self.insertNode, n, False)
397
398         return {"id" : self.node.id}
399         
400     def krpc_join(self, id, _krpc_sender = None):
401         """Add the node by responding with its address and port.
402         
403         @type id: C{string}
404         @param id: the node ID of the sender node
405         @type _krpc_sender: (C{string}, C{int})
406         @param _krpc_sender: the sender node's IP address and port
407         """
408         if _krpc_sender is not None:
409             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
410             reactor.callLater(0, self.insertNode, n, False)
411         else:
412             _krpc_sender = ('127.0.0.1', self.port)
413
414         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
415         
416     def krpc_find_node(self, id, target, _krpc_sender = None):
417         """Find the K closest nodes to the target in the local routing table.
418         
419         @type target: C{string}
420         @param target: the target ID to find nodes for
421         @type id: C{string}
422         @param id: the node ID of the sender node
423         @type _krpc_sender: (C{string}, C{int})
424         @param _krpc_sender: the sender node's IP address and port
425         """
426         if _krpc_sender is not None:
427             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
428             reactor.callLater(0, self.insertNode, n, False)
429         else:
430             _krpc_sender = ('127.0.0.1', self.port)
431
432         nodes = self.table.findNodes(target)
433         nodes = map(lambda node: node.contactInfo(), nodes)
434         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
435         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
436
437
438 class KhashmirRead(KhashmirBase):
439     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
440
441     _Node = KNodeRead
442
443     #{ Local interface
444     def findValue(self, key, callback):
445         """Get the nodes that have values for the key from the global table.
446         
447         @type key: C{string}
448         @param key: the target key to find the values for
449         @type callback: C{method}
450         @param callback: the method to call with the results, it must take 1
451             parameter, the list of nodes with values
452         """
453         # Mark the bucket as having been accessed
454         self.table.touch(key)
455         
456         # Start with ourself
457         nodes = [copy(self.node)]
458         
459         # Search for others starting with the locally found ones
460         state = FindValue(self, key, callback, self.config, self.stats)
461         reactor.callLater(0, state.goWithNodes, nodes)
462
463     def valueForKey(self, key, callback, searchlocal = True):
464         """Get the values found for key in global table.
465         
466         Callback will be called with a list of values for each peer that
467         returns unique values. The final callback will be an empty list.
468
469         @type key: C{string}
470         @param key: the target key to get the values for
471         @type callback: C{method}
472         @param callback: the method to call with the results, it must take 2
473             parameters: the key, and the values found
474         @type searchlocal: C{boolean}
475         @param searchlocal: whether to also look for any local values
476         """
477
478         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
479             """Use the found nodes to send requests for values to."""
480             # Get any local values
481             if searchlocal:
482                 l = self.store.retrieveValues(key)
483                 if len(l) > 0:
484                     node = copy(self.node)
485                     node.updateNumValues(len(l))
486                     nodes = nodes + [node]
487
488             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
489             reactor.callLater(0, state.goWithNodes, nodes)
490             
491         # First lookup nodes that have values for the key
492         self.findValue(key, _getValueForKey)
493
494     #{ Remote interface
495     def krpc_find_value(self, id, key, _krpc_sender = None):
496         """Find the number of values stored locally for the key, and the K closest nodes.
497         
498         @type key: C{string}
499         @param key: the target key to find the values and nodes for
500         @type id: C{string}
501         @param id: the node ID of the sender node
502         @type _krpc_sender: (C{string}, C{int})
503         @param _krpc_sender: the sender node's IP address and port
504         """
505         if _krpc_sender is not None:
506             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
507             reactor.callLater(0, self.insertNode, n, False)
508     
509         nodes = self.table.findNodes(key)
510         nodes = map(lambda node: node.contactInfo(), nodes)
511         num_values = self.store.countValues(key)
512         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
513
514     def krpc_get_value(self, id, key, num, _krpc_sender = None):
515         """Retrieve the values stored locally for the key.
516         
517         @type key: C{string}
518         @param key: the target key to retrieve the values for
519         @type num: C{int}
520         @param num: the maximum number of values to retrieve, or 0 to
521             retrieve all of them
522         @type id: C{string}
523         @param id: the node ID of the sender node
524         @type _krpc_sender: (C{string}, C{int})
525         @param _krpc_sender: the sender node's IP address and port
526         """
527         if _krpc_sender is not None:
528             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
529             reactor.callLater(0, self.insertNode, n, False)
530     
531         l = self.store.retrieveValues(key)
532         if num == 0 or num >= len(l):
533             return {'values' : l, "id": self.node.id}
534         else:
535             shuffle(l)
536             return {'values' : l[:num], "id": self.node.id}
537
538
539 class KhashmirWrite(KhashmirRead):
540     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
541
542     _Node = KNodeWrite
543
544     #{ Local interface
545     def storeValueForKey(self, key, value, callback=None):
546         """Stores the value for the key in the global table.
547         
548         No status in this implementation, peers respond but don't indicate
549         status of storing values.
550
551         @type key: C{string}
552         @param key: the target key to store the value for
553         @type value: C{string}
554         @param value: the value to store with the key
555         @type callback: C{method}
556         @param callback: the method to call with the results, it must take 3
557             parameters: the key, the value stored, and the result of the store
558             (optional, defaults to doing nothing with the results)
559         """
560         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
561             """Use the returned K closest nodes to store the key at."""
562             if not response:
563                 def _storedValueHandler(key, value, sender):
564                     """Default callback that does nothing."""
565                     pass
566                 response = _storedValueHandler
567             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
568             reactor.callLater(0, action.goWithNodes, nodes)
569             
570         # First find the K closest nodes to operate on.
571         self.findNode(key, _storeValueForKey)
572                     
573     #{ Remote interface
574     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
575         """Store the value locally with the key.
576         
577         @type key: C{string}
578         @param key: the target key to store the value for
579         @type value: C{string}
580         @param value: the value to store with the key
581         @param token: the token to confirm that this peer contacted us previously
582         @type id: C{string}
583         @param id: the node ID of the sender node
584         @type _krpc_sender: (C{string}, C{int})
585         @param _krpc_sender: the sender node's IP address and port
586         """
587         if _krpc_sender is not None:
588             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
589             reactor.callLater(0, self.insertNode, n, False)
590         else:
591             _krpc_sender = ('127.0.0.1', self.port)
592
593         for secret in self.token_secrets:
594             this_token = sha(secret + _krpc_sender[0]).digest()
595             if token == this_token:
596                 self.store.storeValue(key, value)
597                 return {"id" : self.node.id}
598         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
599
600
601 class Khashmir(KhashmirWrite):
602     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
603     _Node = KNodeWrite
604
605
606 class SimpleTests(unittest.TestCase):
607     
608     timeout = 10
609     DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
610                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
611                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
612                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
613                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
614                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
615                     'KEY_EXPIRE': 3600, 'SPEW': True, }
616
617     def setUp(self):
618         d = self.DHT_DEFAULTS.copy()
619         d['PORT'] = 4044
620         self.a = Khashmir(d)
621         d = self.DHT_DEFAULTS.copy()
622         d['PORT'] = 4045
623         self.b = Khashmir(d)
624         
625     def tearDown(self):
626         self.a.shutdown()
627         self.b.shutdown()
628         os.unlink(self.a.store.db)
629         os.unlink(self.b.store.db)
630
631     def testAddContact(self):
632         self.failUnlessEqual(len(self.a.table.buckets), 1)
633         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
634
635         self.failUnlessEqual(len(self.b.table.buckets), 1)
636         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
637
638         self.a.addContact('127.0.0.1', 4045)
639         reactor.iterate()
640         reactor.iterate()
641         reactor.iterate()
642         reactor.iterate()
643         reactor.iterate()
644         reactor.iterate()
645         reactor.iterate()
646         reactor.iterate()
647
648         self.failUnlessEqual(len(self.a.table.buckets), 1)
649         self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
650         self.failUnlessEqual(len(self.b.table.buckets), 1)
651         self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
652
653     def testStoreRetrieve(self):
654         self.a.addContact('127.0.0.1', 4045)
655         reactor.iterate()
656         reactor.iterate()
657         reactor.iterate()
658         reactor.iterate()
659         self.got = 0
660         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
661         reactor.iterate()
662         reactor.iterate()
663         reactor.iterate()
664         reactor.iterate()
665         reactor.iterate()
666         reactor.iterate()
667         self.a.valueForKey(sha('foo').digest(), self._cb)
668         reactor.iterate()
669         reactor.iterate()
670         reactor.iterate()
671         reactor.iterate()
672         reactor.iterate()
673         reactor.iterate()
674         reactor.iterate()
675         reactor.iterate()
676         reactor.iterate()
677         reactor.iterate()
678         reactor.iterate()
679         reactor.iterate()
680         reactor.iterate()
681         reactor.iterate()
682         reactor.iterate()
683         reactor.iterate()
684         reactor.iterate()
685         reactor.iterate()
686         reactor.iterate()
687         reactor.iterate()
688         reactor.iterate()
689
690     def _cb(self, key, val):
691         if not val:
692             self.failUnlessEqual(self.got, 1)
693         elif 'foobar' in val:
694             self.got = 1
695
696
697 class MultiTest(unittest.TestCase):
698     
699     timeout = 30
700     num = 20
701     DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
702                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
703                     'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
704                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
705                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
706                     'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
707                     'KEY_EXPIRE': 3600, 'SPEW': True, }
708
709     def _done(self, val):
710         self.done = 1
711         
712     def setUp(self):
713         self.l = []
714         self.startport = 4088
715         for i in range(self.num):
716             d = self.DHT_DEFAULTS.copy()
717             d['PORT'] = self.startport + i
718             self.l.append(Khashmir(d))
719         reactor.iterate()
720         reactor.iterate()
721         
722         for i in self.l:
723             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
724             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
725             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
726             reactor.iterate()
727             reactor.iterate()
728             reactor.iterate() 
729             reactor.iterate()
730             reactor.iterate()
731             reactor.iterate() 
732             
733         for i in self.l:
734             self.done = 0
735             i.findCloseNodes(self._done)
736             while not self.done:
737                 reactor.iterate()
738         for i in self.l:
739             self.done = 0
740             i.findCloseNodes(self._done)
741             while not self.done:
742                 reactor.iterate()
743
744     def tearDown(self):
745         for i in self.l:
746             i.shutdown()
747             os.unlink(i.store.db)
748             
749         reactor.iterate()
750         
751     def testStoreRetrieve(self):
752         for i in range(10):
753             K = newID()
754             V = newID()
755             
756             for a in range(3):
757                 self.done = 0
758                 def _scb(key, value, result):
759                     self.done = 1
760                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
761                 while not self.done:
762                     reactor.iterate()
763
764
765                 def _rcb(key, val):
766                     if not val:
767                         self.done = 1
768                         self.failUnlessEqual(self.got, 1)
769                     elif V in val:
770                         self.got = 1
771                 for x in range(3):
772                     self.got = 0
773                     self.done = 0
774                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
775                     while not self.done:
776                         reactor.iterate()