KRPC calls callback with the response dictionary by itself.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store, self.config)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback, errback=None):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         @type errback: C{method}
174         @param errback: the method to call if an error occurs
175             (optional, defaults to doing nothing when an error occurs)
176         """
177         # Get K nodes out of local table/cache
178         nodes = self.table.findNodes(id)
179         nodes = [copy(node) for node in nodes]
180         d = Deferred()
181         if errback:
182             d.addCallbacks(callback, errback)
183         else:
184             d.addCallback(callback)
185
186         # If the target ID was found
187         if len(nodes) == 1 and nodes[0].id == id:
188             d.callback(nodes)
189         else:
190             # Start the finding nodes action
191             state = FindNode(self, id, d.callback, self.config, self.stats)
192             reactor.callLater(0, state.goWithNodes, nodes)
193     
194     def insertNode(self, node, contacted = True):
195         """Try to insert a node in our local table, pinging oldest contact if necessary.
196         
197         If all you have is a host/port, then use L{addContact}, which calls this
198         method after receiving the PONG from the remote node. The reason for
199         the seperation is we can't insert a node into the table without its
200         node ID. That means of course the node passed into this method needs
201         to be a properly formed Node object with a valid ID.
202
203         @type node: L{node.Node}
204         @param node: the new node to try and insert
205         @type contacted: C{boolean}
206         @param contacted: whether the new node is known to be good, i.e.
207             responded to a request (optional, defaults to True)
208         """
209         old = self.table.insertNode(node, contacted=contacted)
210         if (old and old.id != self.node.id and
211             (datetime.now() - old.lastSeen) > 
212              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
213             
214             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
215                 """The pinged node never responded, so replace it."""
216                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
217                 log.err(err)
218                 self.table.replaceStaleNode(oldnode, newnode)
219             
220             def _notStaleNodeHandler(dict, old=old, self=self):
221                 """Got a pong from the old node, so update it."""
222                 if dict['id'] == old.id:
223                     self.table.justSeenNode(old.id)
224             
225             # Bucket is full, check to see if old node is still available
226             self.stats.startedAction('ping')
227             df = old.ping(self.node.id)
228             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
229
230     def sendJoin(self, node, callback=None, errback=None):
231         """Join the DHT by pinging a bootstrap node.
232         
233         @type node: L{node.Node}
234         @param node: the node to send the join to
235         @type callback: C{method}
236         @param callback: the method to call with the results, it must take 1
237             parameter, the contact info returned by the node
238             (optional, defaults to doing nothing with the results)
239         @type errback: C{method}
240         @param errback: the method to call if an error occurs
241             (optional, defaults to calling the callback with None)
242         """
243
244         def _pongHandler(dict, node=node, self=self, callback=callback):
245             """Node responded properly, callback with response."""
246             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
247             self.insertNode(n)
248             if callback:
249                 callback((dict['ip_addr'], dict['port']))
250
251         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
252             """Error occurred, fail node and errback or callback with error."""
253             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
254             log.err(err)
255             self.table.nodeFailed(node)
256             if errback:
257                 errback()
258             elif callback:
259                 callback(None)
260         
261         self.stats.startedAction('join')
262         df = node.join(self.node.id)
263         df.addCallbacks(_pongHandler, _defaultPong)
264
265     def findCloseNodes(self, callback=lambda a: None, errback = None):
266         """Perform a findNode on the ID one away from our own.
267
268         This will allow us to populate our table with nodes on our network
269         closest to our own. This is called as soon as we start up with an
270         empty table.
271
272         @type callback: C{method}
273         @param callback: the method to call with the results, it must take 1
274             parameter, the list of K closest nodes
275             (optional, defaults to doing nothing with the results)
276         @type errback: C{method}
277         @param errback: the method to call if an error occurs
278             (optional, defaults to doing nothing when an error occurs)
279         """
280         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
281         self.findNode(id, callback, errback)
282
283     def refreshTable(self, force = False):
284         """Check all the buckets for those that need refreshing.
285         
286         @param force: refresh all buckets regardless of last bucket access time
287             (optional, defaults to False)
288         """
289         def callback(nodes):
290             pass
291     
292         for bucket in self.table.buckets:
293             if force or (datetime.now() - bucket.lastAccessed > 
294                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
295                 # Choose a random ID in the bucket and try and find it
296                 id = newIDInRange(bucket.min, bucket.max)
297                 self.findNode(id, callback)
298
299     def shutdown(self):
300         """Closes the port and cancels pending later calls."""
301         self.listenport.stopListening()
302         try:
303             self.next_checkpoint.cancel()
304         except:
305             pass
306         self.store.close()
307     
308     def getStats(self):
309         """Gather the statistics for the DHT."""
310         return self.stats.formatHTML()
311
312     #{ Remote interface
313     def krpc_ping(self, id, _krpc_sender = None):
314         """Pong with our ID.
315         
316         @type id: C{string}
317         @param id: the node ID of the sender node
318         @type _krpc_sender: (C{string}, C{int})
319         @param _krpc_sender: the sender node's IP address and port
320         """
321         if _krpc_sender is not None:
322             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
323             self.insertNode(n, contacted = False)
324
325         return {"id" : self.node.id}
326         
327     def krpc_join(self, id, _krpc_sender = None):
328         """Add the node by responding with its address and port.
329         
330         @type id: C{string}
331         @param id: the node ID of the sender node
332         @type _krpc_sender: (C{string}, C{int})
333         @param _krpc_sender: the sender node's IP address and port
334         """
335         if _krpc_sender is not None:
336             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
337             self.insertNode(n, contacted = False)
338         else:
339             _krpc_sender = ('127.0.0.1', self.port)
340
341         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
342         
343     def krpc_find_node(self, id, target, _krpc_sender = None):
344         """Find the K closest nodes to the target in the local routing table.
345         
346         @type target: C{string}
347         @param target: the target ID to find nodes for
348         @type id: C{string}
349         @param id: the node ID of the sender node
350         @type _krpc_sender: (C{string}, C{int})
351         @param _krpc_sender: the sender node's IP address and port
352         """
353         if _krpc_sender is not None:
354             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
355             self.insertNode(n, contacted = False)
356         else:
357             _krpc_sender = ('127.0.0.1', self.port)
358
359         nodes = self.table.findNodes(target)
360         nodes = map(lambda node: node.contactInfo(), nodes)
361         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
362         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
363
364
365 class KhashmirRead(KhashmirBase):
366     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
367
368     _Node = KNodeRead
369
370     #{ Local interface
371     def findValue(self, key, callback, errback=None):
372         """Get the nodes that have values for the key from the global table.
373         
374         @type key: C{string}
375         @param key: the target key to find the values for
376         @type callback: C{method}
377         @param callback: the method to call with the results, it must take 1
378             parameter, the list of nodes with values
379         @type errback: C{method}
380         @param errback: the method to call if an error occurs
381             (optional, defaults to doing nothing when an error occurs)
382         """
383         # Get K nodes out of local table/cache
384         nodes = self.table.findNodes(key)
385         nodes = [copy(node) for node in nodes]
386         d = Deferred()
387         if errback:
388             d.addCallbacks(callback, errback)
389         else:
390             d.addCallback(callback)
391
392         # Search for others starting with the locally found ones
393         state = FindValue(self, key, d.callback, self.config, self.stats)
394         reactor.callLater(0, state.goWithNodes, nodes)
395
396     def valueForKey(self, key, callback, searchlocal = True):
397         """Get the values found for key in global table.
398         
399         Callback will be called with a list of values for each peer that
400         returns unique values. The final callback will be an empty list.
401
402         @type key: C{string}
403         @param key: the target key to get the values for
404         @type callback: C{method}
405         @param callback: the method to call with the results, it must take 2
406             parameters: the key, and the values found
407         @type searchlocal: C{boolean}
408         @param searchlocal: whether to also look for any local values
409         """
410         # Get any local values
411         if searchlocal:
412             l = self.store.retrieveValues(key)
413             if len(l) > 0:
414                 reactor.callLater(0, callback, key, l)
415         else:
416             l = []
417
418         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
419             """Use the found nodes to send requests for values to."""
420             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
421             reactor.callLater(0, state.goWithNodes, nodes)
422             
423         # First lookup nodes that have values for the key
424         self.findValue(key, _getValueForKey)
425
426     #{ Remote interface
427     def krpc_find_value(self, id, key, _krpc_sender = None):
428         """Find the number of values stored locally for the key, and the K closest nodes.
429         
430         @type key: C{string}
431         @param key: the target key to find the values and nodes for
432         @type id: C{string}
433         @param id: the node ID of the sender node
434         @type _krpc_sender: (C{string}, C{int})
435         @param _krpc_sender: the sender node's IP address and port
436         """
437         if _krpc_sender is not None:
438             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
439             self.insertNode(n, contacted = False)
440     
441         nodes = self.table.findNodes(key)
442         nodes = map(lambda node: node.contactInfo(), nodes)
443         num_values = self.store.countValues(key)
444         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
445
446     def krpc_get_value(self, id, key, num, _krpc_sender = None):
447         """Retrieve the values stored locally for the key.
448         
449         @type key: C{string}
450         @param key: the target key to retrieve the values for
451         @type num: C{int}
452         @param num: the maximum number of values to retrieve, or 0 to
453             retrieve all of them
454         @type id: C{string}
455         @param id: the node ID of the sender node
456         @type _krpc_sender: (C{string}, C{int})
457         @param _krpc_sender: the sender node's IP address and port
458         """
459         if _krpc_sender is not None:
460             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
461             self.insertNode(n, contacted = False)
462     
463         l = self.store.retrieveValues(key)
464         if num == 0 or num >= len(l):
465             return {'values' : l, "id": self.node.id}
466         else:
467             shuffle(l)
468             return {'values' : l[:num], "id": self.node.id}
469
470
471 class KhashmirWrite(KhashmirRead):
472     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
473
474     _Node = KNodeWrite
475
476     #{ Local interface
477     def storeValueForKey(self, key, value, callback=None):
478         """Stores the value for the key in the global table.
479         
480         No status in this implementation, peers respond but don't indicate
481         status of storing values.
482
483         @type key: C{string}
484         @param key: the target key to store the value for
485         @type value: C{string}
486         @param value: the value to store with the key
487         @type callback: C{method}
488         @param callback: the method to call with the results, it must take 3
489             parameters: the key, the value stored, and the result of the store
490             (optional, defaults to doing nothing with the results)
491         """
492         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
493             """Use the returned K closest nodes to store the key at."""
494             if not response:
495                 def _storedValueHandler(key, value, sender):
496                     """Default callback that does nothing."""
497                     pass
498                 response = _storedValueHandler
499             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
500             reactor.callLater(0, action.goWithNodes, nodes)
501             
502         # First find the K closest nodes to operate on.
503         self.findNode(key, _storeValueForKey)
504                     
505     #{ Remote interface
506     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
507         """Store the value locally with the key.
508         
509         @type key: C{string}
510         @param key: the target key to store the value for
511         @type value: C{string}
512         @param value: the value to store with the key
513         @param token: the token to confirm that this peer contacted us previously
514         @type id: C{string}
515         @param id: the node ID of the sender node
516         @type _krpc_sender: (C{string}, C{int})
517         @param _krpc_sender: the sender node's IP address and port
518         """
519         if _krpc_sender is not None:
520             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
521             self.insertNode(n, contacted = False)
522         else:
523             _krpc_sender = ('127.0.0.1', self.port)
524
525         for secret in self.token_secrets:
526             this_token = sha(secret + _krpc_sender[0]).digest()
527             if token == this_token:
528                 self.store.storeValue(key, value)
529                 return {"id" : self.node.id}
530         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
531
532
533 class Khashmir(KhashmirWrite):
534     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
535     _Node = KNodeWrite
536
537
538 class SimpleTests(unittest.TestCase):
539     
540     timeout = 10
541     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
542                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
543                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
544                     'MAX_FAILURES': 3,
545                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
546                     'KEY_EXPIRE': 3600, 'SPEW': False, }
547
548     def setUp(self):
549         d = self.DHT_DEFAULTS.copy()
550         d['PORT'] = 4044
551         self.a = Khashmir(d)
552         d = self.DHT_DEFAULTS.copy()
553         d['PORT'] = 4045
554         self.b = Khashmir(d)
555         
556     def tearDown(self):
557         self.a.shutdown()
558         self.b.shutdown()
559         os.unlink(self.a.store.db)
560         os.unlink(self.b.store.db)
561
562     def testAddContact(self):
563         self.failUnlessEqual(len(self.a.table.buckets), 1)
564         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
565
566         self.failUnlessEqual(len(self.b.table.buckets), 1)
567         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
568
569         self.a.addContact('127.0.0.1', 4045)
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574
575         self.failUnlessEqual(len(self.a.table.buckets), 1)
576         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
577         self.failUnlessEqual(len(self.b.table.buckets), 1)
578         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
579
580     def testStoreRetrieve(self):
581         self.a.addContact('127.0.0.1', 4045)
582         reactor.iterate()
583         reactor.iterate()
584         reactor.iterate()
585         reactor.iterate()
586         self.got = 0
587         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
588         reactor.iterate()
589         reactor.iterate()
590         reactor.iterate()
591         reactor.iterate()
592         reactor.iterate()
593         reactor.iterate()
594         self.a.valueForKey(sha('foo').digest(), self._cb)
595         reactor.iterate()
596         reactor.iterate()
597         reactor.iterate()
598         reactor.iterate()
599         reactor.iterate()
600         reactor.iterate()
601         reactor.iterate()
602
603     def _cb(self, key, val):
604         if not val:
605             self.failUnlessEqual(self.got, 1)
606         elif 'foobar' in val:
607             self.got = 1
608
609
610 class MultiTest(unittest.TestCase):
611     
612     timeout = 30
613     num = 20
614     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
615                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
616                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
617                     'MAX_FAILURES': 3,
618                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
619                     'KEY_EXPIRE': 3600, 'SPEW': False, }
620
621     def _done(self, val):
622         self.done = 1
623         
624     def setUp(self):
625         self.l = []
626         self.startport = 4088
627         for i in range(self.num):
628             d = self.DHT_DEFAULTS.copy()
629             d['PORT'] = self.startport + i
630             self.l.append(Khashmir(d))
631         reactor.iterate()
632         reactor.iterate()
633         
634         for i in self.l:
635             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
636             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
637             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
638             reactor.iterate()
639             reactor.iterate()
640             reactor.iterate() 
641             
642         for i in self.l:
643             self.done = 0
644             i.findCloseNodes(self._done)
645             while not self.done:
646                 reactor.iterate()
647         for i in self.l:
648             self.done = 0
649             i.findCloseNodes(self._done)
650             while not self.done:
651                 reactor.iterate()
652
653     def tearDown(self):
654         for i in self.l:
655             i.shutdown()
656             os.unlink(i.store.db)
657             
658         reactor.iterate()
659         
660     def testStoreRetrieve(self):
661         for i in range(10):
662             K = newID()
663             V = newID()
664             
665             for a in range(3):
666                 self.done = 0
667                 def _scb(key, value, result):
668                     self.done = 1
669                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
670                 while not self.done:
671                     reactor.iterate()
672
673
674                 def _rcb(key, val):
675                     if not val:
676                         self.done = 1
677                         self.failUnlessEqual(self.got, 1)
678                     elif V in val:
679                         self.got = 1
680                 for x in range(3):
681                     self.got = 0
682                     self.done = 0
683                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
684                     while not self.done:
685                         reactor.iterate()