]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/khashmir.py
Allow the actions to call the local node's remote interface.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store, self.config)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback, errback=None):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         @type errback: C{method}
174         @param errback: the method to call if an error occurs
175             (optional, defaults to doing nothing when an error occurs)
176         """
177         # Start with our node
178         nodes = [copy(self.node)]
179
180         d = Deferred()
181         if errback:
182             d.addCallbacks(callback, errback)
183         else:
184             d.addCallback(callback)
185
186         # Start the finding nodes action
187         state = FindNode(self, id, d.callback, self.config, self.stats)
188         reactor.callLater(0, state.goWithNodes, nodes)
189     
190     def insertNode(self, node, contacted = True):
191         """Try to insert a node in our local table, pinging oldest contact if necessary.
192         
193         If all you have is a host/port, then use L{addContact}, which calls this
194         method after receiving the PONG from the remote node. The reason for
195         the seperation is we can't insert a node into the table without its
196         node ID. That means of course the node passed into this method needs
197         to be a properly formed Node object with a valid ID.
198
199         @type node: L{node.Node}
200         @param node: the new node to try and insert
201         @type contacted: C{boolean}
202         @param contacted: whether the new node is known to be good, i.e.
203             responded to a request (optional, defaults to True)
204         """
205         old = self.table.insertNode(node, contacted=contacted)
206         if (old and old.id != self.node.id and
207             (datetime.now() - old.lastSeen) > 
208              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
209             
210             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
211                 """The pinged node never responded, so replace it."""
212                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
213                 log.err(err)
214                 self.table.replaceStaleNode(oldnode, newnode)
215             
216             def _notStaleNodeHandler(dict, old=old, self=self):
217                 """Got a pong from the old node, so update it."""
218                 if dict['id'] == old.id:
219                     self.table.justSeenNode(old.id)
220             
221             # Bucket is full, check to see if old node is still available
222             self.stats.startedAction('ping')
223             df = old.ping(self.node.id)
224             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
225
226     def sendJoin(self, node, callback=None, errback=None):
227         """Join the DHT by pinging a bootstrap node.
228         
229         @type node: L{node.Node}
230         @param node: the node to send the join to
231         @type callback: C{method}
232         @param callback: the method to call with the results, it must take 1
233             parameter, the contact info returned by the node
234             (optional, defaults to doing nothing with the results)
235         @type errback: C{method}
236         @param errback: the method to call if an error occurs
237             (optional, defaults to calling the callback with None)
238         """
239
240         def _pongHandler(dict, node=node, self=self, callback=callback):
241             """Node responded properly, callback with response."""
242             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
243             self.insertNode(n)
244             if callback:
245                 callback((dict['ip_addr'], dict['port']))
246
247         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
248             """Error occurred, fail node and errback or callback with error."""
249             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
250             log.err(err)
251             self.table.nodeFailed(node)
252             if errback:
253                 errback()
254             elif callback:
255                 callback(None)
256         
257         self.stats.startedAction('join')
258         df = node.join(self.node.id)
259         df.addCallbacks(_pongHandler, _defaultPong)
260
261     def findCloseNodes(self, callback=lambda a: None, errback = None):
262         """Perform a findNode on the ID one away from our own.
263
264         This will allow us to populate our table with nodes on our network
265         closest to our own. This is called as soon as we start up with an
266         empty table.
267
268         @type callback: C{method}
269         @param callback: the method to call with the results, it must take 1
270             parameter, the list of K closest nodes
271             (optional, defaults to doing nothing with the results)
272         @type errback: C{method}
273         @param errback: the method to call if an error occurs
274             (optional, defaults to doing nothing when an error occurs)
275         """
276         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
277         self.findNode(id, callback, errback)
278
279     def refreshTable(self, force = False):
280         """Check all the buckets for those that need refreshing.
281         
282         @param force: refresh all buckets regardless of last bucket access time
283             (optional, defaults to False)
284         """
285         def callback(nodes):
286             pass
287     
288         for bucket in self.table.buckets:
289             if force or (datetime.now() - bucket.lastAccessed > 
290                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
291                 # Choose a random ID in the bucket and try and find it
292                 id = newIDInRange(bucket.min, bucket.max)
293                 self.findNode(id, callback)
294
295     def shutdown(self):
296         """Closes the port and cancels pending later calls."""
297         self.listenport.stopListening()
298         try:
299             self.next_checkpoint.cancel()
300         except:
301             pass
302         self.store.close()
303     
304     def getStats(self):
305         """Gather the statistics for the DHT."""
306         return self.stats.formatHTML()
307
308     #{ Remote interface
309     def krpc_ping(self, id, _krpc_sender = None):
310         """Pong with our ID.
311         
312         @type id: C{string}
313         @param id: the node ID of the sender node
314         @type _krpc_sender: (C{string}, C{int})
315         @param _krpc_sender: the sender node's IP address and port
316         """
317         if _krpc_sender is not None:
318             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
319             self.insertNode(n, contacted = False)
320
321         return {"id" : self.node.id}
322         
323     def krpc_join(self, id, _krpc_sender = None):
324         """Add the node by responding with its address and port.
325         
326         @type id: C{string}
327         @param id: the node ID of the sender node
328         @type _krpc_sender: (C{string}, C{int})
329         @param _krpc_sender: the sender node's IP address and port
330         """
331         if _krpc_sender is not None:
332             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
333             self.insertNode(n, contacted = False)
334         else:
335             _krpc_sender = ('127.0.0.1', self.port)
336
337         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
338         
339     def krpc_find_node(self, id, target, _krpc_sender = None):
340         """Find the K closest nodes to the target in the local routing table.
341         
342         @type target: C{string}
343         @param target: the target ID to find nodes for
344         @type id: C{string}
345         @param id: the node ID of the sender node
346         @type _krpc_sender: (C{string}, C{int})
347         @param _krpc_sender: the sender node's IP address and port
348         """
349         if _krpc_sender is not None:
350             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
351             self.insertNode(n, contacted = False)
352         else:
353             _krpc_sender = ('127.0.0.1', self.port)
354
355         nodes = self.table.findNodes(target)
356         nodes = map(lambda node: node.contactInfo(), nodes)
357         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
358         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
359
360
361 class KhashmirRead(KhashmirBase):
362     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
363
364     _Node = KNodeRead
365
366     #{ Local interface
367     def findValue(self, key, callback, errback=None):
368         """Get the nodes that have values for the key from the global table.
369         
370         @type key: C{string}
371         @param key: the target key to find the values for
372         @type callback: C{method}
373         @param callback: the method to call with the results, it must take 1
374             parameter, the list of nodes with values
375         @type errback: C{method}
376         @param errback: the method to call if an error occurs
377             (optional, defaults to doing nothing when an error occurs)
378         """
379         # Start with ourself
380         nodes = [copy(self.node)]
381         
382         d = Deferred()
383         if errback:
384             d.addCallbacks(callback, errback)
385         else:
386             d.addCallback(callback)
387
388         # Search for others starting with the locally found ones
389         state = FindValue(self, key, d.callback, self.config, self.stats)
390         reactor.callLater(0, state.goWithNodes, nodes)
391
392     def valueForKey(self, key, callback, searchlocal = True):
393         """Get the values found for key in global table.
394         
395         Callback will be called with a list of values for each peer that
396         returns unique values. The final callback will be an empty list.
397
398         @type key: C{string}
399         @param key: the target key to get the values for
400         @type callback: C{method}
401         @param callback: the method to call with the results, it must take 2
402             parameters: the key, and the values found
403         @type searchlocal: C{boolean}
404         @param searchlocal: whether to also look for any local values
405         """
406
407         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
408             """Use the found nodes to send requests for values to."""
409             # Get any local values
410             if searchlocal:
411                 l = self.store.retrieveValues(key)
412                 if len(l) > 0:
413                     node = copy(self.node)
414                     node.updateNumValues(len(l))
415                     nodes = nodes + [node]
416
417             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
418             reactor.callLater(0, state.goWithNodes, nodes)
419             
420         # First lookup nodes that have values for the key
421         self.findValue(key, _getValueForKey)
422
423     #{ Remote interface
424     def krpc_find_value(self, id, key, _krpc_sender = None):
425         """Find the number of values stored locally for the key, and the K closest nodes.
426         
427         @type key: C{string}
428         @param key: the target key to find the values and nodes for
429         @type id: C{string}
430         @param id: the node ID of the sender node
431         @type _krpc_sender: (C{string}, C{int})
432         @param _krpc_sender: the sender node's IP address and port
433         """
434         if _krpc_sender is not None:
435             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
436             self.insertNode(n, contacted = False)
437     
438         nodes = self.table.findNodes(key)
439         nodes = map(lambda node: node.contactInfo(), nodes)
440         num_values = self.store.countValues(key)
441         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
442
443     def krpc_get_value(self, id, key, num, _krpc_sender = None):
444         """Retrieve the values stored locally for the key.
445         
446         @type key: C{string}
447         @param key: the target key to retrieve the values for
448         @type num: C{int}
449         @param num: the maximum number of values to retrieve, or 0 to
450             retrieve all of them
451         @type id: C{string}
452         @param id: the node ID of the sender node
453         @type _krpc_sender: (C{string}, C{int})
454         @param _krpc_sender: the sender node's IP address and port
455         """
456         if _krpc_sender is not None:
457             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
458             self.insertNode(n, contacted = False)
459     
460         l = self.store.retrieveValues(key)
461         if num == 0 or num >= len(l):
462             return {'values' : l, "id": self.node.id}
463         else:
464             shuffle(l)
465             return {'values' : l[:num], "id": self.node.id}
466
467
468 class KhashmirWrite(KhashmirRead):
469     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
470
471     _Node = KNodeWrite
472
473     #{ Local interface
474     def storeValueForKey(self, key, value, callback=None):
475         """Stores the value for the key in the global table.
476         
477         No status in this implementation, peers respond but don't indicate
478         status of storing values.
479
480         @type key: C{string}
481         @param key: the target key to store the value for
482         @type value: C{string}
483         @param value: the value to store with the key
484         @type callback: C{method}
485         @param callback: the method to call with the results, it must take 3
486             parameters: the key, the value stored, and the result of the store
487             (optional, defaults to doing nothing with the results)
488         """
489         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
490             """Use the returned K closest nodes to store the key at."""
491             if not response:
492                 def _storedValueHandler(key, value, sender):
493                     """Default callback that does nothing."""
494                     pass
495                 response = _storedValueHandler
496             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
497             reactor.callLater(0, action.goWithNodes, nodes)
498             
499         # First find the K closest nodes to operate on.
500         self.findNode(key, _storeValueForKey)
501                     
502     #{ Remote interface
503     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
504         """Store the value locally with the key.
505         
506         @type key: C{string}
507         @param key: the target key to store the value for
508         @type value: C{string}
509         @param value: the value to store with the key
510         @param token: the token to confirm that this peer contacted us previously
511         @type id: C{string}
512         @param id: the node ID of the sender node
513         @type _krpc_sender: (C{string}, C{int})
514         @param _krpc_sender: the sender node's IP address and port
515         """
516         if _krpc_sender is not None:
517             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
518             self.insertNode(n, contacted = False)
519         else:
520             _krpc_sender = ('127.0.0.1', self.port)
521
522         for secret in self.token_secrets:
523             this_token = sha(secret + _krpc_sender[0]).digest()
524             if token == this_token:
525                 self.store.storeValue(key, value)
526                 return {"id" : self.node.id}
527         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
528
529
530 class Khashmir(KhashmirWrite):
531     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
532     _Node = KNodeWrite
533
534
535 class SimpleTests(unittest.TestCase):
536     
537     timeout = 10
538     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
539                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
540                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
541                     'MAX_FAILURES': 3,
542                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
543                     'KEY_EXPIRE': 3600, 'SPEW': False, }
544
545     def setUp(self):
546         d = self.DHT_DEFAULTS.copy()
547         d['PORT'] = 4044
548         self.a = Khashmir(d)
549         d = self.DHT_DEFAULTS.copy()
550         d['PORT'] = 4045
551         self.b = Khashmir(d)
552         
553     def tearDown(self):
554         self.a.shutdown()
555         self.b.shutdown()
556         os.unlink(self.a.store.db)
557         os.unlink(self.b.store.db)
558
559     def testAddContact(self):
560         self.failUnlessEqual(len(self.a.table.buckets), 1)
561         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
562
563         self.failUnlessEqual(len(self.b.table.buckets), 1)
564         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
565
566         self.a.addContact('127.0.0.1', 4045)
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571
572         self.failUnlessEqual(len(self.a.table.buckets), 1)
573         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
574         self.failUnlessEqual(len(self.b.table.buckets), 1)
575         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
576
577     def testStoreRetrieve(self):
578         self.a.addContact('127.0.0.1', 4045)
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583         self.got = 0
584         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
585         reactor.iterate()
586         reactor.iterate()
587         reactor.iterate()
588         reactor.iterate()
589         reactor.iterate()
590         reactor.iterate()
591         self.a.valueForKey(sha('foo').digest(), self._cb)
592         reactor.iterate()
593         reactor.iterate()
594         reactor.iterate()
595         reactor.iterate()
596         reactor.iterate()
597         reactor.iterate()
598         reactor.iterate()
599
600     def _cb(self, key, val):
601         if not val:
602             self.failUnlessEqual(self.got, 1)
603         elif 'foobar' in val:
604             self.got = 1
605
606
607 class MultiTest(unittest.TestCase):
608     
609     timeout = 30
610     num = 20
611     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
612                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
613                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
614                     'MAX_FAILURES': 3,
615                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
616                     'KEY_EXPIRE': 3600, 'SPEW': False, }
617
618     def _done(self, val):
619         self.done = 1
620         
621     def setUp(self):
622         self.l = []
623         self.startport = 4088
624         for i in range(self.num):
625             d = self.DHT_DEFAULTS.copy()
626             d['PORT'] = self.startport + i
627             self.l.append(Khashmir(d))
628         reactor.iterate()
629         reactor.iterate()
630         
631         for i in self.l:
632             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
633             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
634             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
635             reactor.iterate()
636             reactor.iterate()
637             reactor.iterate() 
638             
639         for i in self.l:
640             self.done = 0
641             i.findCloseNodes(self._done)
642             while not self.done:
643                 reactor.iterate()
644         for i in self.l:
645             self.done = 0
646             i.findCloseNodes(self._done)
647             while not self.done:
648                 reactor.iterate()
649
650     def tearDown(self):
651         for i in self.l:
652             i.shutdown()
653             os.unlink(i.store.db)
654             
655         reactor.iterate()
656         
657     def testStoreRetrieve(self):
658         for i in range(10):
659             K = newID()
660             V = newID()
661             
662             for a in range(3):
663                 self.done = 0
664                 def _scb(key, value, result):
665                     self.done = 1
666                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
667                 while not self.done:
668                     reactor.iterate()
669
670
671                 def _rcb(key, val):
672                     if not val:
673                         self.done = 1
674                         self.failUnlessEqual(self.got, 1)
675                     elif V in val:
676                         self.got = 1
677                 for x in range(3):
678                     self.got = 0
679                     self.done = 0
680                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
681                     while not self.done:
682                         reactor.iterate()