aa1a181c167dbb2094b35b42c38ae1b347419bb5
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store, self.config)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback, errback=None):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         @type errback: C{method}
174         @param errback: the method to call if an error occurs
175             (optional, defaults to doing nothing when an error occurs)
176         """
177         # Get K nodes out of local table/cache
178         nodes = self.table.findNodes(id)
179         nodes = [copy(node) for node in nodes]
180         d = Deferred()
181         if errback:
182             d.addCallbacks(callback, errback)
183         else:
184             d.addCallback(callback)
185
186         # If the target ID was found
187         if len(nodes) == 1 and nodes[0].id == id:
188             d.callback(nodes)
189         else:
190             # Start the finding nodes action
191             state = FindNode(self, id, d.callback, self.config, self.stats)
192             reactor.callLater(0, state.goWithNodes, nodes)
193     
194     def insertNode(self, node, contacted = True):
195         """Try to insert a node in our local table, pinging oldest contact if necessary.
196         
197         If all you have is a host/port, then use L{addContact}, which calls this
198         method after receiving the PONG from the remote node. The reason for
199         the seperation is we can't insert a node into the table without its
200         node ID. That means of course the node passed into this method needs
201         to be a properly formed Node object with a valid ID.
202
203         @type node: L{node.Node}
204         @param node: the new node to try and insert
205         @type contacted: C{boolean}
206         @param contacted: whether the new node is known to be good, i.e.
207             responded to a request (optional, defaults to True)
208         """
209         old = self.table.insertNode(node, contacted=contacted)
210         if (old and old.id != self.node.id and
211             (datetime.now() - old.lastSeen) > 
212              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
213             
214             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
215                 """The pinged node never responded, so replace it."""
216                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
217                 log.err(err)
218                 self.table.replaceStaleNode(oldnode, newnode)
219             
220             def _notStaleNodeHandler(dict, old=old, self=self):
221                 """Got a pong from the old node, so update it."""
222                 dict = dict['rsp']
223                 if dict['id'] == old.id:
224                     self.table.justSeenNode(old.id)
225             
226             # Bucket is full, check to see if old node is still available
227             self.stats.startedAction('ping')
228             df = old.ping(self.node.id)
229             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
230
231     def sendJoin(self, node, callback=None, errback=None):
232         """Join the DHT by pinging a bootstrap node.
233         
234         @type node: L{node.Node}
235         @param node: the node to send the join to
236         @type callback: C{method}
237         @param callback: the method to call with the results, it must take 1
238             parameter, the contact info returned by the node
239             (optional, defaults to doing nothing with the results)
240         @type errback: C{method}
241         @param errback: the method to call if an error occurs
242             (optional, defaults to calling the callback with None)
243         """
244
245         def _pongHandler(dict, node=node, self=self, callback=callback):
246             """Node responded properly, callback with response."""
247             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
248             self.insertNode(n)
249             if callback:
250                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
251
252         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
253             """Error occurred, fail node and errback or callback with error."""
254             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
255             log.err(err)
256             self.table.nodeFailed(node)
257             if errback:
258                 errback()
259             elif callback:
260                 callback(None)
261         
262         self.stats.startedAction('join')
263         df = node.join(self.node.id)
264         df.addCallbacks(_pongHandler, _defaultPong)
265
266     def findCloseNodes(self, callback=lambda a: None, errback = None):
267         """Perform a findNode on the ID one away from our own.
268
269         This will allow us to populate our table with nodes on our network
270         closest to our own. This is called as soon as we start up with an
271         empty table.
272
273         @type callback: C{method}
274         @param callback: the method to call with the results, it must take 1
275             parameter, the list of K closest nodes
276             (optional, defaults to doing nothing with the results)
277         @type errback: C{method}
278         @param errback: the method to call if an error occurs
279             (optional, defaults to doing nothing when an error occurs)
280         """
281         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
282         self.findNode(id, callback, errback)
283
284     def refreshTable(self, force = False):
285         """Check all the buckets for those that need refreshing.
286         
287         @param force: refresh all buckets regardless of last bucket access time
288             (optional, defaults to False)
289         """
290         def callback(nodes):
291             pass
292     
293         for bucket in self.table.buckets:
294             if force or (datetime.now() - bucket.lastAccessed > 
295                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
296                 # Choose a random ID in the bucket and try and find it
297                 id = newIDInRange(bucket.min, bucket.max)
298                 self.findNode(id, callback)
299
300     def shutdown(self):
301         """Closes the port and cancels pending later calls."""
302         self.listenport.stopListening()
303         try:
304             self.next_checkpoint.cancel()
305         except:
306             pass
307         self.store.close()
308     
309     def getStats(self):
310         """Gather the statistics for the DHT."""
311         return self.stats.formatHTML()
312
313     #{ Remote interface
314     def krpc_ping(self, id, _krpc_sender):
315         """Pong with our ID.
316         
317         @type id: C{string}
318         @param id: the node ID of the sender node
319         @type _krpc_sender: (C{string}, C{int})
320         @param _krpc_sender: the sender node's IP address and port
321         """
322         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
323         self.insertNode(n, contacted = False)
324
325         return {"id" : self.node.id}
326         
327     def krpc_join(self, id, _krpc_sender):
328         """Add the node by responding with its address and port.
329         
330         @type id: C{string}
331         @param id: the node ID of the sender node
332         @type _krpc_sender: (C{string}, C{int})
333         @param _krpc_sender: the sender node's IP address and port
334         """
335         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
336         self.insertNode(n, contacted = False)
337
338         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
339         
340     def krpc_find_node(self, target, id, _krpc_sender):
341         """Find the K closest nodes to the target in the local routing table.
342         
343         @type target: C{string}
344         @param target: the target ID to find nodes for
345         @type id: C{string}
346         @param id: the node ID of the sender node
347         @type _krpc_sender: (C{string}, C{int})
348         @param _krpc_sender: the sender node's IP address and port
349         """
350         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
351         self.insertNode(n, contacted = False)
352
353         nodes = self.table.findNodes(target)
354         nodes = map(lambda node: node.contactInfo(), nodes)
355         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
356         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
357
358
359 class KhashmirRead(KhashmirBase):
360     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
361
362     _Node = KNodeRead
363
364     #{ Local interface
365     def findValue(self, key, callback, errback=None):
366         """Get the nodes that have values for the key from the global table.
367         
368         @type key: C{string}
369         @param key: the target key to find the values for
370         @type callback: C{method}
371         @param callback: the method to call with the results, it must take 1
372             parameter, the list of nodes with values
373         @type errback: C{method}
374         @param errback: the method to call if an error occurs
375             (optional, defaults to doing nothing when an error occurs)
376         """
377         # Get K nodes out of local table/cache
378         nodes = self.table.findNodes(key)
379         nodes = [copy(node) for node in nodes]
380         d = Deferred()
381         if errback:
382             d.addCallbacks(callback, errback)
383         else:
384             d.addCallback(callback)
385
386         # Search for others starting with the locally found ones
387         state = FindValue(self, key, d.callback, self.config, self.stats)
388         reactor.callLater(0, state.goWithNodes, nodes)
389
390     def valueForKey(self, key, callback, searchlocal = True):
391         """Get the values found for key in global table.
392         
393         Callback will be called with a list of values for each peer that
394         returns unique values. The final callback will be an empty list.
395
396         @type key: C{string}
397         @param key: the target key to get the values for
398         @type callback: C{method}
399         @param callback: the method to call with the results, it must take 2
400             parameters: the key, and the values found
401         @type searchlocal: C{boolean}
402         @param searchlocal: whether to also look for any local values
403         """
404         # Get any local values
405         if searchlocal:
406             l = self.store.retrieveValues(key)
407             if len(l) > 0:
408                 reactor.callLater(0, callback, key, l)
409         else:
410             l = []
411
412         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
413             """Use the found nodes to send requests for values to."""
414             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
415             reactor.callLater(0, state.goWithNodes, nodes)
416             
417         # First lookup nodes that have values for the key
418         self.findValue(key, _getValueForKey)
419
420     #{ Remote interface
421     def krpc_find_value(self, key, id, _krpc_sender):
422         """Find the number of values stored locally for the key, and the K closest nodes.
423         
424         @type key: C{string}
425         @param key: the target key to find the values and nodes for
426         @type id: C{string}
427         @param id: the node ID of the sender node
428         @type _krpc_sender: (C{string}, C{int})
429         @param _krpc_sender: the sender node's IP address and port
430         """
431         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
432         self.insertNode(n, contacted = False)
433     
434         nodes = self.table.findNodes(key)
435         nodes = map(lambda node: node.contactInfo(), nodes)
436         num_values = self.store.countValues(key)
437         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
438
439     def krpc_get_value(self, key, num, id, _krpc_sender):
440         """Retrieve the values stored locally for the key.
441         
442         @type key: C{string}
443         @param key: the target key to retrieve the values for
444         @type num: C{int}
445         @param num: the maximum number of values to retrieve, or 0 to
446             retrieve all of them
447         @type id: C{string}
448         @param id: the node ID of the sender node
449         @type _krpc_sender: (C{string}, C{int})
450         @param _krpc_sender: the sender node's IP address and port
451         """
452         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
453         self.insertNode(n, contacted = False)
454     
455         l = self.store.retrieveValues(key)
456         if num == 0 or num >= len(l):
457             return {'values' : l, "id": self.node.id}
458         else:
459             shuffle(l)
460             return {'values' : l[:num], "id": self.node.id}
461
462
463 class KhashmirWrite(KhashmirRead):
464     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
465
466     _Node = KNodeWrite
467
468     #{ Local interface
469     def storeValueForKey(self, key, value, callback=None):
470         """Stores the value for the key in the global table.
471         
472         No status in this implementation, peers respond but don't indicate
473         status of storing values.
474
475         @type key: C{string}
476         @param key: the target key to store the value for
477         @type value: C{string}
478         @param value: the value to store with the key
479         @type callback: C{method}
480         @param callback: the method to call with the results, it must take 3
481             parameters: the key, the value stored, and the result of the store
482             (optional, defaults to doing nothing with the results)
483         """
484         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
485             """Use the returned K closest nodes to store the key at."""
486             if not response:
487                 def _storedValueHandler(key, value, sender):
488                     """Default callback that does nothing."""
489                     pass
490                 response = _storedValueHandler
491             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
492             reactor.callLater(0, action.goWithNodes, nodes)
493             
494         # First find the K closest nodes to operate on.
495         self.findNode(key, _storeValueForKey)
496                     
497     #{ Remote interface
498     def krpc_store_value(self, key, value, token, id, _krpc_sender):
499         """Store the value locally with the key.
500         
501         @type key: C{string}
502         @param key: the target key to store the value for
503         @type value: C{string}
504         @param value: the value to store with the key
505         @param token: the token to confirm that this peer contacted us previously
506         @type id: C{string}
507         @param id: the node ID of the sender node
508         @type _krpc_sender: (C{string}, C{int})
509         @param _krpc_sender: the sender node's IP address and port
510         """
511         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
512         self.insertNode(n, contacted = False)
513         for secret in self.token_secrets:
514             this_token = sha(secret + _krpc_sender[0]).digest()
515             if token == this_token:
516                 self.store.storeValue(key, value)
517                 return {"id" : self.node.id}
518         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
519
520
521 class Khashmir(KhashmirWrite):
522     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
523     _Node = KNodeWrite
524
525
526 class SimpleTests(unittest.TestCase):
527     
528     timeout = 10
529     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
530                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
531                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
532                     'MAX_FAILURES': 3,
533                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
534                     'KEY_EXPIRE': 3600, 'SPEW': False, }
535
536     def setUp(self):
537         d = self.DHT_DEFAULTS.copy()
538         d['PORT'] = 4044
539         self.a = Khashmir(d)
540         d = self.DHT_DEFAULTS.copy()
541         d['PORT'] = 4045
542         self.b = Khashmir(d)
543         
544     def tearDown(self):
545         self.a.shutdown()
546         self.b.shutdown()
547         os.unlink(self.a.store.db)
548         os.unlink(self.b.store.db)
549
550     def testAddContact(self):
551         self.failUnlessEqual(len(self.a.table.buckets), 1)
552         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
553
554         self.failUnlessEqual(len(self.b.table.buckets), 1)
555         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
556
557         self.a.addContact('127.0.0.1', 4045)
558         reactor.iterate()
559         reactor.iterate()
560         reactor.iterate()
561         reactor.iterate()
562
563         self.failUnlessEqual(len(self.a.table.buckets), 1)
564         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
565         self.failUnlessEqual(len(self.b.table.buckets), 1)
566         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
567
568     def testStoreRetrieve(self):
569         self.a.addContact('127.0.0.1', 4045)
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574         self.got = 0
575         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         self.a.valueForKey(sha('foo').digest(), self._cb)
583         reactor.iterate()
584         reactor.iterate()
585         reactor.iterate()
586         reactor.iterate()
587         reactor.iterate()
588         reactor.iterate()
589         reactor.iterate()
590
591     def _cb(self, key, val):
592         if not val:
593             self.failUnlessEqual(self.got, 1)
594         elif 'foobar' in val:
595             self.got = 1
596
597
598 class MultiTest(unittest.TestCase):
599     
600     timeout = 30
601     num = 20
602     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
603                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
604                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
605                     'MAX_FAILURES': 3,
606                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
607                     'KEY_EXPIRE': 3600, 'SPEW': False, }
608
609     def _done(self, val):
610         self.done = 1
611         
612     def setUp(self):
613         self.l = []
614         self.startport = 4088
615         for i in range(self.num):
616             d = self.DHT_DEFAULTS.copy()
617             d['PORT'] = self.startport + i
618             self.l.append(Khashmir(d))
619         reactor.iterate()
620         reactor.iterate()
621         
622         for i in self.l:
623             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
624             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
625             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
626             reactor.iterate()
627             reactor.iterate()
628             reactor.iterate() 
629             
630         for i in self.l:
631             self.done = 0
632             i.findCloseNodes(self._done)
633             while not self.done:
634                 reactor.iterate()
635         for i in self.l:
636             self.done = 0
637             i.findCloseNodes(self._done)
638             while not self.done:
639                 reactor.iterate()
640
641     def tearDown(self):
642         for i in self.l:
643             i.shutdown()
644             os.unlink(i.store.db)
645             
646         reactor.iterate()
647         
648     def testStoreRetrieve(self):
649         for i in range(10):
650             K = newID()
651             V = newID()
652             
653             for a in range(3):
654                 self.done = 0
655                 def _scb(key, value, result):
656                     self.done = 1
657                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
658                 while not self.done:
659                     reactor.iterate()
660
661
662                 def _rcb(key, val):
663                     if not val:
664                         self.done = 1
665                         self.failUnlessEqual(self.got, 1)
666                     elif V in val:
667                         self.got = 1
668                 for x in range(3):
669                     self.got = 0
670                     self.done = 0
671                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
672                     while not self.done:
673                         reactor.iterate()