]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/khashmir.py
Workaround old sqlite not having 'select count(distinct key)'.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
24 import krpc
25
26 class KhashmirBase(protocol.Factory):
27     """The base Khashmir class, with base functionality and find node, no key-value mappings.
28     
29     @type _Node: L{node.Node}
30     @ivar _Node: the knode implementation to use for this class of DHT
31     @type config: C{dictionary}
32     @ivar config: the configuration parameters for the DHT
33     @type port: C{int}
34     @ivar port: the port to listen on
35     @type store: L{db.DB}
36     @ivar store: the database to store nodes and key/value pairs in
37     @type node: L{node.Node}
38     @ivar node: this node
39     @type table: L{ktable.KTable}
40     @ivar table: the routing table
41     @type token_secrets: C{list} of C{string}
42     @ivar token_secrets: the current secrets to use to create tokens
43     @type stats: L{stats.StatsLogger}
44     @ivar stats: the statistics gatherer
45     @type udp: L{krpc.hostbroker}
46     @ivar udp: the factory for the KRPC protocol
47     @type listenport: L{twisted.internet.interfaces.IListeningPort}
48     @ivar listenport: the UDP listening port
49     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50     @ivar next_checkpoint: the delayed call for the next checkpoint
51     """
52     
53     _Node = KNodeBase
54     
55     def __init__(self, config, cache_dir='/tmp'):
56         """Initialize the Khashmir class and call the L{setup} method.
57         
58         @type config: C{dictionary}
59         @param config: the configuration parameters for the DHT
60         @type cache_dir: C{string}
61         @param cache_dir: the directory to store all files in
62             (optional, defaults to the /tmp directory)
63         """
64         self.config = None
65         self.setup(config, cache_dir)
66         
67     def setup(self, config, cache_dir):
68         """Setup all the Khashmir sub-modules.
69         
70         @type config: C{dictionary}
71         @param config: the configuration parameters for the DHT
72         @type cache_dir: C{string}
73         @param cache_dir: the directory to store all files in
74         """
75         self.config = config
76         self.port = config['PORT']
77         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78         self.node = self._loadSelfNode('', self.port)
79         self.table = KTable(self.node, config)
80         self.token_secrets = [newID()]
81         self.stats = StatsLogger(self.table, self.store, self.config)
82         
83         # Start listening
84         self.udp = krpc.hostbroker(self, self.stats, config)
85         self.udp.protocol = krpc.KRPC
86         self.listenport = reactor.listenUDP(self.port, self.udp)
87         
88         # Load the routing table and begin checkpointing
89         self._loadRoutingTable()
90         self.refreshTable(force = True)
91         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
92
93     def Node(self, id, host = None, port = None):
94         """Create a new node.
95         
96         @see: L{node.Node.__init__}
97         """
98         n = self._Node(id, host, port)
99         n.table = self.table
100         n.conn = self.udp.connectionForAddr((n.host, n.port))
101         return n
102     
103     def __del__(self):
104         """Stop listening for packets."""
105         self.listenport.stopListening()
106         
107     def _loadSelfNode(self, host, port):
108         """Create this node, loading any previously saved one."""
109         id = self.store.getSelfNode()
110         if not id:
111             id = newID()
112         return self._Node(id, host, port)
113         
114     def checkpoint(self):
115         """Perform some periodic maintenance operations."""
116         # Create a new token secret
117         self.token_secrets.insert(0, newID())
118         if len(self.token_secrets) > 3:
119             self.token_secrets.pop()
120             
121         # Save some parameters for reloading
122         self.store.saveSelfNode(self.node.id)
123         self.store.dumpRoutingTable(self.table.buckets)
124         
125         # DHT maintenance
126         self.store.expireValues(self.config['KEY_EXPIRE'])
127         self.refreshTable()
128         
129         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
130                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
131                                                  self.checkpoint)
132         
133     def _loadRoutingTable(self):
134         """Load the previous routing table nodes from the database.
135         
136         It's usually a good idea to call refreshTable(force = True) after
137         loading the table.
138         """
139         nodes = self.store.getRoutingTable()
140         for rec in nodes:
141             n = self.Node(rec[0], rec[1], int(rec[2]))
142             self.table.insertNode(n, contacted = False)
143             
144     #{ Local interface
145     def addContact(self, host, port, callback=None, errback=None):
146         """Ping this node and add the contact info to the table on pong.
147         
148         @type host: C{string}
149         @param host: the IP address of the node to contact
150         @type port: C{int}
151         @param port:the port of the node to contact
152         @type callback: C{method}
153         @param callback: the method to call with the results, it must take 1
154             parameter, the contact info returned by the node
155             (optional, defaults to doing nothing with the results)
156         @type errback: C{method}
157         @param errback: the method to call if an error occurs
158             (optional, defaults to calling the callback with None)
159         """
160         n = self.Node(NULL_ID, host, port)
161         self.sendJoin(n, callback=callback, errback=errback)
162
163     def findNode(self, id, callback, errback=None):
164         """Find the contact info for the K closest nodes in the global table.
165         
166         @type id: C{string}
167         @param id: the target ID to find the K closest nodes of
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the list of K closest nodes
171         @type errback: C{method}
172         @param errback: the method to call if an error occurs
173             (optional, defaults to doing nothing when an error occurs)
174         """
175         # Get K nodes out of local table/cache
176         nodes = self.table.findNodes(id)
177         d = Deferred()
178         if errback:
179             d.addCallbacks(callback, errback)
180         else:
181             d.addCallback(callback)
182
183         # If the target ID was found
184         if len(nodes) == 1 and nodes[0].id == id:
185             d.callback(nodes)
186         else:
187             # Start the finding nodes action
188             state = FindNode(self, id, d.callback, self.config, self.stats)
189             reactor.callLater(0, state.goWithNodes, nodes)
190     
191     def insertNode(self, node, contacted = True):
192         """Try to insert a node in our local table, pinging oldest contact if necessary.
193         
194         If all you have is a host/port, then use L{addContact}, which calls this
195         method after receiving the PONG from the remote node. The reason for
196         the seperation is we can't insert a node into the table without its
197         node ID. That means of course the node passed into this method needs
198         to be a properly formed Node object with a valid ID.
199
200         @type node: L{node.Node}
201         @param node: the new node to try and insert
202         @type contacted: C{boolean}
203         @param contacted: whether the new node is known to be good, i.e.
204             responded to a request (optional, defaults to True)
205         """
206         old = self.table.insertNode(node, contacted=contacted)
207         if (old and old.id != self.node.id and
208             (datetime.now() - old.lastSeen) > 
209              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
210             
211             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
212                 """The pinged node never responded, so replace it."""
213                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
214                 log.err(err)
215                 self.table.replaceStaleNode(oldnode, newnode)
216             
217             def _notStaleNodeHandler(dict, old=old, self=self):
218                 """Got a pong from the old node, so update it."""
219                 dict = dict['rsp']
220                 if dict['id'] == old.id:
221                     self.table.justSeenNode(old.id)
222             
223             # Bucket is full, check to see if old node is still available
224             self.stats.startedAction('ping')
225             df = old.ping(self.node.id)
226             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
227
228     def sendJoin(self, node, callback=None, errback=None):
229         """Join the DHT by pinging a bootstrap node.
230         
231         @type node: L{node.Node}
232         @param node: the node to send the join to
233         @type callback: C{method}
234         @param callback: the method to call with the results, it must take 1
235             parameter, the contact info returned by the node
236             (optional, defaults to doing nothing with the results)
237         @type errback: C{method}
238         @param errback: the method to call if an error occurs
239             (optional, defaults to calling the callback with None)
240         """
241
242         def _pongHandler(dict, node=node, self=self, callback=callback):
243             """Node responded properly, callback with response."""
244             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
245             self.insertNode(n)
246             if callback:
247                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
248
249         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
250             """Error occurred, fail node and errback or callback with error."""
251             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
252             log.err(err)
253             self.table.nodeFailed(node)
254             if errback:
255                 errback()
256             elif callback:
257                 callback(None)
258         
259         self.stats.startedAction('join')
260         df = node.join(self.node.id)
261         df.addCallbacks(_pongHandler, _defaultPong)
262
263     def findCloseNodes(self, callback=lambda a: None, errback = None):
264         """Perform a findNode on the ID one away from our own.
265
266         This will allow us to populate our table with nodes on our network
267         closest to our own. This is called as soon as we start up with an
268         empty table.
269
270         @type callback: C{method}
271         @param callback: the method to call with the results, it must take 1
272             parameter, the list of K closest nodes
273             (optional, defaults to doing nothing with the results)
274         @type errback: C{method}
275         @param errback: the method to call if an error occurs
276             (optional, defaults to doing nothing when an error occurs)
277         """
278         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
279         self.findNode(id, callback, errback)
280
281     def refreshTable(self, force = False):
282         """Check all the buckets for those that need refreshing.
283         
284         @param force: refresh all buckets regardless of last bucket access time
285             (optional, defaults to False)
286         """
287         def callback(nodes):
288             pass
289     
290         for bucket in self.table.buckets:
291             if force or (datetime.now() - bucket.lastAccessed > 
292                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
293                 # Choose a random ID in the bucket and try and find it
294                 id = newIDInRange(bucket.min, bucket.max)
295                 self.findNode(id, callback)
296
297     def shutdown(self):
298         """Closes the port and cancels pending later calls."""
299         self.listenport.stopListening()
300         try:
301             self.next_checkpoint.cancel()
302         except:
303             pass
304         self.store.close()
305     
306     def getStats(self):
307         """Gather the statistics for the DHT."""
308         return self.stats.formatHTML()
309
310     #{ Remote interface
311     def krpc_ping(self, id, _krpc_sender):
312         """Pong with our ID.
313         
314         @type id: C{string}
315         @param id: the node ID of the sender node
316         @type _krpc_sender: (C{string}, C{int})
317         @param _krpc_sender: the sender node's IP address and port
318         """
319         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
320         self.insertNode(n, contacted = False)
321
322         return {"id" : self.node.id}
323         
324     def krpc_join(self, id, _krpc_sender):
325         """Add the node by responding with its address and port.
326         
327         @type id: C{string}
328         @param id: the node ID of the sender node
329         @type _krpc_sender: (C{string}, C{int})
330         @param _krpc_sender: the sender node's IP address and port
331         """
332         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
333         self.insertNode(n, contacted = False)
334
335         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
336         
337     def krpc_find_node(self, target, id, _krpc_sender):
338         """Find the K closest nodes to the target in the local routing table.
339         
340         @type target: C{string}
341         @param target: the target ID to find nodes for
342         @type id: C{string}
343         @param id: the node ID of the sender node
344         @type _krpc_sender: (C{string}, C{int})
345         @param _krpc_sender: the sender node's IP address and port
346         """
347         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
348         self.insertNode(n, contacted = False)
349
350         nodes = self.table.findNodes(target)
351         nodes = map(lambda node: node.contactInfo(), nodes)
352         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
353         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
354
355
356 class KhashmirRead(KhashmirBase):
357     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
358
359     _Node = KNodeRead
360
361     #{ Local interface
362     def findValue(self, key, callback, errback=None):
363         """Get the nodes that have values for the key from the global table.
364         
365         @type key: C{string}
366         @param key: the target key to find the values for
367         @type callback: C{method}
368         @param callback: the method to call with the results, it must take 1
369             parameter, the list of nodes with values
370         @type errback: C{method}
371         @param errback: the method to call if an error occurs
372             (optional, defaults to doing nothing when an error occurs)
373         """
374         # Get K nodes out of local table/cache
375         nodes = self.table.findNodes(key)
376         d = Deferred()
377         if errback:
378             d.addCallbacks(callback, errback)
379         else:
380             d.addCallback(callback)
381
382         # Search for others starting with the locally found ones
383         state = FindValue(self, key, d.callback, self.config, self.stats)
384         reactor.callLater(0, state.goWithNodes, nodes)
385
386     def valueForKey(self, key, callback, searchlocal = True):
387         """Get the values found for key in global table.
388         
389         Callback will be called with a list of values for each peer that
390         returns unique values. The final callback will be an empty list.
391
392         @type key: C{string}
393         @param key: the target key to get the values for
394         @type callback: C{method}
395         @param callback: the method to call with the results, it must take 2
396             parameters: the key, and the values found
397         @type searchlocal: C{boolean}
398         @param searchlocal: whether to also look for any local values
399         """
400         # Get any local values
401         if searchlocal:
402             l = self.store.retrieveValues(key)
403             if len(l) > 0:
404                 reactor.callLater(0, callback, key, l)
405         else:
406             l = []
407
408         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
409             """Use the found nodes to send requests for values to."""
410             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
411             reactor.callLater(0, state.goWithNodes, nodes)
412             
413         # First lookup nodes that have values for the key
414         self.findValue(key, _getValueForKey)
415
416     #{ Remote interface
417     def krpc_find_value(self, key, id, _krpc_sender):
418         """Find the number of values stored locally for the key, and the K closest nodes.
419         
420         @type key: C{string}
421         @param key: the target key to find the values and nodes for
422         @type id: C{string}
423         @param id: the node ID of the sender node
424         @type _krpc_sender: (C{string}, C{int})
425         @param _krpc_sender: the sender node's IP address and port
426         """
427         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
428         self.insertNode(n, contacted = False)
429     
430         nodes = self.table.findNodes(key)
431         nodes = map(lambda node: node.contactInfo(), nodes)
432         num_values = self.store.countValues(key)
433         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
434
435     def krpc_get_value(self, key, num, id, _krpc_sender):
436         """Retrieve the values stored locally for the key.
437         
438         @type key: C{string}
439         @param key: the target key to retrieve the values for
440         @type num: C{int}
441         @param num: the maximum number of values to retrieve, or 0 to
442             retrieve all of them
443         @type id: C{string}
444         @param id: the node ID of the sender node
445         @type _krpc_sender: (C{string}, C{int})
446         @param _krpc_sender: the sender node's IP address and port
447         """
448         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
449         self.insertNode(n, contacted = False)
450     
451         l = self.store.retrieveValues(key)
452         if num == 0 or num >= len(l):
453             return {'values' : l, "id": self.node.id}
454         else:
455             shuffle(l)
456             return {'values' : l[:num], "id": self.node.id}
457
458
459 class KhashmirWrite(KhashmirRead):
460     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
461
462     _Node = KNodeWrite
463
464     #{ Local interface
465     def storeValueForKey(self, key, value, callback=None):
466         """Stores the value for the key in the global table.
467         
468         No status in this implementation, peers respond but don't indicate
469         status of storing values.
470
471         @type key: C{string}
472         @param key: the target key to store the value for
473         @type value: C{string}
474         @param value: the value to store with the key
475         @type callback: C{method}
476         @param callback: the method to call with the results, it must take 3
477             parameters: the key, the value stored, and the result of the store
478             (optional, defaults to doing nothing with the results)
479         """
480         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
481             """Use the returned K closest nodes to store the key at."""
482             if not response:
483                 def _storedValueHandler(key, value, sender):
484                     """Default callback that does nothing."""
485                     pass
486                 response = _storedValueHandler
487             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
488             reactor.callLater(0, action.goWithNodes, nodes)
489             
490         # First find the K closest nodes to operate on.
491         self.findNode(key, _storeValueForKey)
492                     
493     #{ Remote interface
494     def krpc_store_value(self, key, value, token, id, _krpc_sender):
495         """Store the value locally with the key.
496         
497         @type key: C{string}
498         @param key: the target key to store the value for
499         @type value: C{string}
500         @param value: the value to store with the key
501         @param token: the token to confirm that this peer contacted us previously
502         @type id: C{string}
503         @param id: the node ID of the sender node
504         @type _krpc_sender: (C{string}, C{int})
505         @param _krpc_sender: the sender node's IP address and port
506         """
507         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
508         self.insertNode(n, contacted = False)
509         for secret in self.token_secrets:
510             this_token = sha(secret + _krpc_sender[0]).digest()
511             if token == this_token:
512                 self.store.storeValue(key, value)
513                 return {"id" : self.node.id}
514         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
515
516
517 class Khashmir(KhashmirWrite):
518     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
519     _Node = KNodeWrite
520
521
522 class SimpleTests(unittest.TestCase):
523     
524     timeout = 10
525     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
526                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
527                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
528                     'MAX_FAILURES': 3,
529                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
530                     'KEY_EXPIRE': 3600, 'SPEW': False, }
531
532     def setUp(self):
533         d = self.DHT_DEFAULTS.copy()
534         d['PORT'] = 4044
535         self.a = Khashmir(d)
536         d = self.DHT_DEFAULTS.copy()
537         d['PORT'] = 4045
538         self.b = Khashmir(d)
539         
540     def tearDown(self):
541         self.a.shutdown()
542         self.b.shutdown()
543         os.unlink(self.a.store.db)
544         os.unlink(self.b.store.db)
545
546     def testAddContact(self):
547         self.failUnlessEqual(len(self.a.table.buckets), 1)
548         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
549
550         self.failUnlessEqual(len(self.b.table.buckets), 1)
551         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
552
553         self.a.addContact('127.0.0.1', 4045)
554         reactor.iterate()
555         reactor.iterate()
556         reactor.iterate()
557         reactor.iterate()
558
559         self.failUnlessEqual(len(self.a.table.buckets), 1)
560         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
561         self.failUnlessEqual(len(self.b.table.buckets), 1)
562         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
563
564     def testStoreRetrieve(self):
565         self.a.addContact('127.0.0.1', 4045)
566         reactor.iterate()
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         self.got = 0
571         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         self.a.valueForKey(sha('foo').digest(), self._cb)
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583         reactor.iterate()
584         reactor.iterate()
585         reactor.iterate()
586
587     def _cb(self, key, val):
588         if not val:
589             self.failUnlessEqual(self.got, 1)
590         elif 'foobar' in val:
591             self.got = 1
592
593
594 class MultiTest(unittest.TestCase):
595     
596     timeout = 30
597     num = 20
598     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
599                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
600                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
601                     'MAX_FAILURES': 3,
602                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
603                     'KEY_EXPIRE': 3600, 'SPEW': False, }
604
605     def _done(self, val):
606         self.done = 1
607         
608     def setUp(self):
609         self.l = []
610         self.startport = 4088
611         for i in range(self.num):
612             d = self.DHT_DEFAULTS.copy()
613             d['PORT'] = self.startport + i
614             self.l.append(Khashmir(d))
615         reactor.iterate()
616         reactor.iterate()
617         
618         for i in self.l:
619             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
620             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
621             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
622             reactor.iterate()
623             reactor.iterate()
624             reactor.iterate() 
625             
626         for i in self.l:
627             self.done = 0
628             i.findCloseNodes(self._done)
629             while not self.done:
630                 reactor.iterate()
631         for i in self.l:
632             self.done = 0
633             i.findCloseNodes(self._done)
634             while not self.done:
635                 reactor.iterate()
636
637     def tearDown(self):
638         for i in self.l:
639             i.shutdown()
640             os.unlink(i.store.db)
641             
642         reactor.iterate()
643         
644     def testStoreRetrieve(self):
645         for i in range(10):
646             K = newID()
647             V = newID()
648             
649             for a in range(3):
650                 self.done = 0
651                 def _scb(key, value, result):
652                     self.done = 1
653                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
654                 while not self.done:
655                     reactor.iterate()
656
657
658                 def _rcb(key, val):
659                     if not val:
660                         self.done = 1
661                         self.failUnlessEqual(self.got, 1)
662                     elif V in val:
663                         self.got = 1
664                 for x in range(3):
665                     self.got = 0
666                     self.done = 0
667                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
668                     while not self.done:
669                         reactor.iterate()