94af8ae0ddc8aae044f1bad704a5ebca140cb631
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store, self.config)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback, errback=None):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         @type errback: C{method}
174         @param errback: the method to call if an error occurs
175             (optional, defaults to doing nothing when an error occurs)
176         """
177         # Get K nodes out of local table/cache
178         nodes = self.table.findNodes(id)
179         nodes = [copy(node) for node in nodes]
180         d = Deferred()
181         if errback:
182             d.addCallbacks(callback, errback)
183         else:
184             d.addCallback(callback)
185
186         # If the target ID was found
187         if len(nodes) == 1 and nodes[0].id == id:
188             d.callback(nodes)
189         else:
190             # Start the finding nodes action
191             state = FindNode(self, id, d.callback, self.config, self.stats)
192             reactor.callLater(0, state.goWithNodes, nodes)
193     
194     def insertNode(self, node, contacted = True):
195         """Try to insert a node in our local table, pinging oldest contact if necessary.
196         
197         If all you have is a host/port, then use L{addContact}, which calls this
198         method after receiving the PONG from the remote node. The reason for
199         the seperation is we can't insert a node into the table without its
200         node ID. That means of course the node passed into this method needs
201         to be a properly formed Node object with a valid ID.
202
203         @type node: L{node.Node}
204         @param node: the new node to try and insert
205         @type contacted: C{boolean}
206         @param contacted: whether the new node is known to be good, i.e.
207             responded to a request (optional, defaults to True)
208         """
209         old = self.table.insertNode(node, contacted=contacted)
210         if (old and old.id != self.node.id and
211             (datetime.now() - old.lastSeen) > 
212              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
213             
214             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
215                 """The pinged node never responded, so replace it."""
216                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
217                 log.err(err)
218                 self.table.replaceStaleNode(oldnode, newnode)
219             
220             def _notStaleNodeHandler(dict, old=old, self=self):
221                 """Got a pong from the old node, so update it."""
222                 dict = dict['rsp']
223                 if dict['id'] == old.id:
224                     self.table.justSeenNode(old.id)
225             
226             # Bucket is full, check to see if old node is still available
227             self.stats.startedAction('ping')
228             df = old.ping(self.node.id)
229             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
230
231     def sendJoin(self, node, callback=None, errback=None):
232         """Join the DHT by pinging a bootstrap node.
233         
234         @type node: L{node.Node}
235         @param node: the node to send the join to
236         @type callback: C{method}
237         @param callback: the method to call with the results, it must take 1
238             parameter, the contact info returned by the node
239             (optional, defaults to doing nothing with the results)
240         @type errback: C{method}
241         @param errback: the method to call if an error occurs
242             (optional, defaults to calling the callback with None)
243         """
244
245         def _pongHandler(dict, node=node, self=self, callback=callback):
246             """Node responded properly, callback with response."""
247             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
248             self.insertNode(n)
249             if callback:
250                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
251
252         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
253             """Error occurred, fail node and errback or callback with error."""
254             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
255             log.err(err)
256             self.table.nodeFailed(node)
257             if errback:
258                 errback()
259             elif callback:
260                 callback(None)
261         
262         self.stats.startedAction('join')
263         df = node.join(self.node.id)
264         df.addCallbacks(_pongHandler, _defaultPong)
265
266     def findCloseNodes(self, callback=lambda a: None, errback = None):
267         """Perform a findNode on the ID one away from our own.
268
269         This will allow us to populate our table with nodes on our network
270         closest to our own. This is called as soon as we start up with an
271         empty table.
272
273         @type callback: C{method}
274         @param callback: the method to call with the results, it must take 1
275             parameter, the list of K closest nodes
276             (optional, defaults to doing nothing with the results)
277         @type errback: C{method}
278         @param errback: the method to call if an error occurs
279             (optional, defaults to doing nothing when an error occurs)
280         """
281         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
282         self.findNode(id, callback, errback)
283
284     def refreshTable(self, force = False):
285         """Check all the buckets for those that need refreshing.
286         
287         @param force: refresh all buckets regardless of last bucket access time
288             (optional, defaults to False)
289         """
290         def callback(nodes):
291             pass
292     
293         for bucket in self.table.buckets:
294             if force or (datetime.now() - bucket.lastAccessed > 
295                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
296                 # Choose a random ID in the bucket and try and find it
297                 id = newIDInRange(bucket.min, bucket.max)
298                 self.findNode(id, callback)
299
300     def shutdown(self):
301         """Closes the port and cancels pending later calls."""
302         self.listenport.stopListening()
303         try:
304             self.next_checkpoint.cancel()
305         except:
306             pass
307         self.store.close()
308     
309     def getStats(self):
310         """Gather the statistics for the DHT."""
311         return self.stats.formatHTML()
312
313     #{ Remote interface
314     def krpc_ping(self, id, _krpc_sender = None):
315         """Pong with our ID.
316         
317         @type id: C{string}
318         @param id: the node ID of the sender node
319         @type _krpc_sender: (C{string}, C{int})
320         @param _krpc_sender: the sender node's IP address and port
321         """
322         if _krpc_sender is not None:
323             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
324             self.insertNode(n, contacted = False)
325
326         return {"id" : self.node.id}
327         
328     def krpc_join(self, id, _krpc_sender = None):
329         """Add the node by responding with its address and port.
330         
331         @type id: C{string}
332         @param id: the node ID of the sender node
333         @type _krpc_sender: (C{string}, C{int})
334         @param _krpc_sender: the sender node's IP address and port
335         """
336         if _krpc_sender is not None:
337             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
338             self.insertNode(n, contacted = False)
339         else:
340             _krpc_sender = ('127.0.0.1', self.port)
341
342         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
343         
344     def krpc_find_node(self, id, target, _krpc_sender = None):
345         """Find the K closest nodes to the target in the local routing table.
346         
347         @type target: C{string}
348         @param target: the target ID to find nodes for
349         @type id: C{string}
350         @param id: the node ID of the sender node
351         @type _krpc_sender: (C{string}, C{int})
352         @param _krpc_sender: the sender node's IP address and port
353         """
354         if _krpc_sender is not None:
355             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
356             self.insertNode(n, contacted = False)
357         else:
358             _krpc_sender = ('127.0.0.1', self.port)
359
360         nodes = self.table.findNodes(target)
361         nodes = map(lambda node: node.contactInfo(), nodes)
362         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
363         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
364
365
366 class KhashmirRead(KhashmirBase):
367     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
368
369     _Node = KNodeRead
370
371     #{ Local interface
372     def findValue(self, key, callback, errback=None):
373         """Get the nodes that have values for the key from the global table.
374         
375         @type key: C{string}
376         @param key: the target key to find the values for
377         @type callback: C{method}
378         @param callback: the method to call with the results, it must take 1
379             parameter, the list of nodes with values
380         @type errback: C{method}
381         @param errback: the method to call if an error occurs
382             (optional, defaults to doing nothing when an error occurs)
383         """
384         # Get K nodes out of local table/cache
385         nodes = self.table.findNodes(key)
386         nodes = [copy(node) for node in nodes]
387         d = Deferred()
388         if errback:
389             d.addCallbacks(callback, errback)
390         else:
391             d.addCallback(callback)
392
393         # Search for others starting with the locally found ones
394         state = FindValue(self, key, d.callback, self.config, self.stats)
395         reactor.callLater(0, state.goWithNodes, nodes)
396
397     def valueForKey(self, key, callback, searchlocal = True):
398         """Get the values found for key in global table.
399         
400         Callback will be called with a list of values for each peer that
401         returns unique values. The final callback will be an empty list.
402
403         @type key: C{string}
404         @param key: the target key to get the values for
405         @type callback: C{method}
406         @param callback: the method to call with the results, it must take 2
407             parameters: the key, and the values found
408         @type searchlocal: C{boolean}
409         @param searchlocal: whether to also look for any local values
410         """
411         # Get any local values
412         if searchlocal:
413             l = self.store.retrieveValues(key)
414             if len(l) > 0:
415                 reactor.callLater(0, callback, key, l)
416         else:
417             l = []
418
419         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
420             """Use the found nodes to send requests for values to."""
421             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
422             reactor.callLater(0, state.goWithNodes, nodes)
423             
424         # First lookup nodes that have values for the key
425         self.findValue(key, _getValueForKey)
426
427     #{ Remote interface
428     def krpc_find_value(self, id, key, _krpc_sender = None):
429         """Find the number of values stored locally for the key, and the K closest nodes.
430         
431         @type key: C{string}
432         @param key: the target key to find the values and nodes for
433         @type id: C{string}
434         @param id: the node ID of the sender node
435         @type _krpc_sender: (C{string}, C{int})
436         @param _krpc_sender: the sender node's IP address and port
437         """
438         if _krpc_sender is not None:
439             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
440             self.insertNode(n, contacted = False)
441     
442         nodes = self.table.findNodes(key)
443         nodes = map(lambda node: node.contactInfo(), nodes)
444         num_values = self.store.countValues(key)
445         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
446
447     def krpc_get_value(self, id, key, num, _krpc_sender = None):
448         """Retrieve the values stored locally for the key.
449         
450         @type key: C{string}
451         @param key: the target key to retrieve the values for
452         @type num: C{int}
453         @param num: the maximum number of values to retrieve, or 0 to
454             retrieve all of them
455         @type id: C{string}
456         @param id: the node ID of the sender node
457         @type _krpc_sender: (C{string}, C{int})
458         @param _krpc_sender: the sender node's IP address and port
459         """
460         if _krpc_sender is not None:
461             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
462             self.insertNode(n, contacted = False)
463     
464         l = self.store.retrieveValues(key)
465         if num == 0 or num >= len(l):
466             return {'values' : l, "id": self.node.id}
467         else:
468             shuffle(l)
469             return {'values' : l[:num], "id": self.node.id}
470
471
472 class KhashmirWrite(KhashmirRead):
473     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
474
475     _Node = KNodeWrite
476
477     #{ Local interface
478     def storeValueForKey(self, key, value, callback=None):
479         """Stores the value for the key in the global table.
480         
481         No status in this implementation, peers respond but don't indicate
482         status of storing values.
483
484         @type key: C{string}
485         @param key: the target key to store the value for
486         @type value: C{string}
487         @param value: the value to store with the key
488         @type callback: C{method}
489         @param callback: the method to call with the results, it must take 3
490             parameters: the key, the value stored, and the result of the store
491             (optional, defaults to doing nothing with the results)
492         """
493         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
494             """Use the returned K closest nodes to store the key at."""
495             if not response:
496                 def _storedValueHandler(key, value, sender):
497                     """Default callback that does nothing."""
498                     pass
499                 response = _storedValueHandler
500             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
501             reactor.callLater(0, action.goWithNodes, nodes)
502             
503         # First find the K closest nodes to operate on.
504         self.findNode(key, _storeValueForKey)
505                     
506     #{ Remote interface
507     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
508         """Store the value locally with the key.
509         
510         @type key: C{string}
511         @param key: the target key to store the value for
512         @type value: C{string}
513         @param value: the value to store with the key
514         @param token: the token to confirm that this peer contacted us previously
515         @type id: C{string}
516         @param id: the node ID of the sender node
517         @type _krpc_sender: (C{string}, C{int})
518         @param _krpc_sender: the sender node's IP address and port
519         """
520         if _krpc_sender is not None:
521             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
522             self.insertNode(n, contacted = False)
523         else:
524             _krpc_sender = ('127.0.0.1', self.port)
525
526         for secret in self.token_secrets:
527             this_token = sha(secret + _krpc_sender[0]).digest()
528             if token == this_token:
529                 self.store.storeValue(key, value)
530                 return {"id" : self.node.id}
531         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
532
533
534 class Khashmir(KhashmirWrite):
535     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
536     _Node = KNodeWrite
537
538
539 class SimpleTests(unittest.TestCase):
540     
541     timeout = 10
542     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
543                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
544                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
545                     'MAX_FAILURES': 3,
546                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
547                     'KEY_EXPIRE': 3600, 'SPEW': False, }
548
549     def setUp(self):
550         d = self.DHT_DEFAULTS.copy()
551         d['PORT'] = 4044
552         self.a = Khashmir(d)
553         d = self.DHT_DEFAULTS.copy()
554         d['PORT'] = 4045
555         self.b = Khashmir(d)
556         
557     def tearDown(self):
558         self.a.shutdown()
559         self.b.shutdown()
560         os.unlink(self.a.store.db)
561         os.unlink(self.b.store.db)
562
563     def testAddContact(self):
564         self.failUnlessEqual(len(self.a.table.buckets), 1)
565         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
566
567         self.failUnlessEqual(len(self.b.table.buckets), 1)
568         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
569
570         self.a.addContact('127.0.0.1', 4045)
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575
576         self.failUnlessEqual(len(self.a.table.buckets), 1)
577         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
578         self.failUnlessEqual(len(self.b.table.buckets), 1)
579         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
580
581     def testStoreRetrieve(self):
582         self.a.addContact('127.0.0.1', 4045)
583         reactor.iterate()
584         reactor.iterate()
585         reactor.iterate()
586         reactor.iterate()
587         self.got = 0
588         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
589         reactor.iterate()
590         reactor.iterate()
591         reactor.iterate()
592         reactor.iterate()
593         reactor.iterate()
594         reactor.iterate()
595         self.a.valueForKey(sha('foo').digest(), self._cb)
596         reactor.iterate()
597         reactor.iterate()
598         reactor.iterate()
599         reactor.iterate()
600         reactor.iterate()
601         reactor.iterate()
602         reactor.iterate()
603
604     def _cb(self, key, val):
605         if not val:
606             self.failUnlessEqual(self.got, 1)
607         elif 'foobar' in val:
608             self.got = 1
609
610
611 class MultiTest(unittest.TestCase):
612     
613     timeout = 30
614     num = 20
615     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
616                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
617                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
618                     'MAX_FAILURES': 3,
619                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
620                     'KEY_EXPIRE': 3600, 'SPEW': False, }
621
622     def _done(self, val):
623         self.done = 1
624         
625     def setUp(self):
626         self.l = []
627         self.startport = 4088
628         for i in range(self.num):
629             d = self.DHT_DEFAULTS.copy()
630             d['PORT'] = self.startport + i
631             self.l.append(Khashmir(d))
632         reactor.iterate()
633         reactor.iterate()
634         
635         for i in self.l:
636             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
637             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
638             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
639             reactor.iterate()
640             reactor.iterate()
641             reactor.iterate() 
642             
643         for i in self.l:
644             self.done = 0
645             i.findCloseNodes(self._done)
646             while not self.done:
647                 reactor.iterate()
648         for i in self.l:
649             self.done = 0
650             i.findCloseNodes(self._done)
651             while not self.done:
652                 reactor.iterate()
653
654     def tearDown(self):
655         for i in self.l:
656             i.shutdown()
657             os.unlink(i.store.db)
658             
659         reactor.iterate()
660         
661     def testStoreRetrieve(self):
662         for i in range(10):
663             K = newID()
664             V = newID()
665             
666             for a in range(3):
667                 self.done = 0
668                 def _scb(key, value, result):
669                     self.done = 1
670                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
671                 while not self.done:
672                     reactor.iterate()
673
674
675                 def _rcb(key, val):
676                     if not val:
677                         self.done = 1
678                         self.failUnlessEqual(self.got, 1)
679                     elif V in val:
680                         self.got = 1
681                 for x in range(3):
682                     self.got = 0
683                     self.done = 0
684                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
685                     while not self.done:
686                         reactor.iterate()