]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/khashmir.py
Import the log module, DOH.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.python import log
17 from twisted.trial import unittest
18
19 from db import DB
20 from ktable import KTable
21 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
22 from khash import newID, newIDInRange
23 from actions import FindNode, FindValue, GetValue, StoreValue
24 from stats import StatsLogger
25 import krpc
26
27 class KhashmirBase(protocol.Factory):
28     """The base Khashmir class, with base functionality and find node, no key-value mappings.
29     
30     @type _Node: L{node.Node}
31     @ivar _Node: the knode implementation to use for this class of DHT
32     @type config: C{dictionary}
33     @ivar config: the configuration parameters for the DHT
34     @type port: C{int}
35     @ivar port: the port to listen on
36     @type store: L{db.DB}
37     @ivar store: the database to store nodes and key/value pairs in
38     @type node: L{node.Node}
39     @ivar node: this node
40     @type table: L{ktable.KTable}
41     @ivar table: the routing table
42     @type token_secrets: C{list} of C{string}
43     @ivar token_secrets: the current secrets to use to create tokens
44     @type stats: L{stats.StatsLogger}
45     @ivar stats: the statistics gatherer
46     @type udp: L{krpc.hostbroker}
47     @ivar udp: the factory for the KRPC protocol
48     @type listenport: L{twisted.internet.interfaces.IListeningPort}
49     @ivar listenport: the UDP listening port
50     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
51     @ivar next_checkpoint: the delayed call for the next checkpoint
52     """
53     
54     _Node = KNodeBase
55     
56     def __init__(self, config, cache_dir='/tmp'):
57         """Initialize the Khashmir class and call the L{setup} method.
58         
59         @type config: C{dictionary}
60         @param config: the configuration parameters for the DHT
61         @type cache_dir: C{string}
62         @param cache_dir: the directory to store all files in
63             (optional, defaults to the /tmp directory)
64         """
65         self.config = None
66         self.setup(config, cache_dir)
67         
68     def setup(self, config, cache_dir):
69         """Setup all the Khashmir sub-modules.
70         
71         @type config: C{dictionary}
72         @param config: the configuration parameters for the DHT
73         @type cache_dir: C{string}
74         @param cache_dir: the directory to store all files in
75         """
76         self.config = config
77         self.port = config['PORT']
78         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
79         self.node = self._loadSelfNode('', self.port)
80         self.table = KTable(self.node, config)
81         self.token_secrets = [newID()]
82         self.stats = StatsLogger(self.table, self.store, self.config)
83         
84         # Start listening
85         self.udp = krpc.hostbroker(self, self.stats, config)
86         self.udp.protocol = krpc.KRPC
87         self.listenport = reactor.listenUDP(self.port, self.udp)
88         
89         # Load the routing table and begin checkpointing
90         self._loadRoutingTable()
91         self.refreshTable(force = True)
92         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
93
94     def Node(self, id, host = None, port = None):
95         """Create a new node.
96         
97         @see: L{node.Node.__init__}
98         """
99         n = self._Node(id, host, port)
100         n.table = self.table
101         n.conn = self.udp.connectionForAddr((n.host, n.port))
102         return n
103     
104     def __del__(self):
105         """Stop listening for packets."""
106         self.listenport.stopListening()
107         
108     def _loadSelfNode(self, host, port):
109         """Create this node, loading any previously saved one."""
110         id = self.store.getSelfNode()
111         if not id:
112             id = newID()
113         return self._Node(id, host, port)
114         
115     def checkpoint(self):
116         """Perform some periodic maintenance operations."""
117         # Create a new token secret
118         self.token_secrets.insert(0, newID())
119         if len(self.token_secrets) > 3:
120             self.token_secrets.pop()
121             
122         # Save some parameters for reloading
123         self.store.saveSelfNode(self.node.id)
124         self.store.dumpRoutingTable(self.table.buckets)
125         
126         # DHT maintenance
127         self.store.expireValues(self.config['KEY_EXPIRE'])
128         self.refreshTable()
129         
130         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
131                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
132                                                  self.checkpoint)
133         
134     def _loadRoutingTable(self):
135         """Load the previous routing table nodes from the database.
136         
137         It's usually a good idea to call refreshTable(force = True) after
138         loading the table.
139         """
140         nodes = self.store.getRoutingTable()
141         for rec in nodes:
142             n = self.Node(rec[0], rec[1], int(rec[2]))
143             self.table.insertNode(n, contacted = False)
144             
145     #{ Local interface
146     def addContact(self, host, port, callback=None, errback=None):
147         """Ping this node and add the contact info to the table on pong.
148         
149         @type host: C{string}
150         @param host: the IP address of the node to contact
151         @type port: C{int}
152         @param port:the port of the node to contact
153         @type callback: C{method}
154         @param callback: the method to call with the results, it must take 1
155             parameter, the contact info returned by the node
156             (optional, defaults to doing nothing with the results)
157         @type errback: C{method}
158         @param errback: the method to call if an error occurs
159             (optional, defaults to calling the callback with None)
160         """
161         n = self.Node(NULL_ID, host, port)
162         self.sendJoin(n, callback=callback, errback=errback)
163
164     def findNode(self, id, callback, errback=None):
165         """Find the contact info for the K closest nodes in the global table.
166         
167         @type id: C{string}
168         @param id: the target ID to find the K closest nodes of
169         @type callback: C{method}
170         @param callback: the method to call with the results, it must take 1
171             parameter, the list of K closest nodes
172         @type errback: C{method}
173         @param errback: the method to call if an error occurs
174             (optional, defaults to doing nothing when an error occurs)
175         """
176         # Get K nodes out of local table/cache
177         nodes = self.table.findNodes(id)
178         d = Deferred()
179         if errback:
180             d.addCallbacks(callback, errback)
181         else:
182             d.addCallback(callback)
183
184         # If the target ID was found
185         if len(nodes) == 1 and nodes[0].id == id:
186             d.callback(nodes)
187         else:
188             # Start the finding nodes action
189             state = FindNode(self, id, d.callback, self.config, self.stats)
190             reactor.callLater(0, state.goWithNodes, nodes)
191     
192     def insertNode(self, node, contacted = True):
193         """Try to insert a node in our local table, pinging oldest contact if necessary.
194         
195         If all you have is a host/port, then use L{addContact}, which calls this
196         method after receiving the PONG from the remote node. The reason for
197         the seperation is we can't insert a node into the table without its
198         node ID. That means of course the node passed into this method needs
199         to be a properly formed Node object with a valid ID.
200
201         @type node: L{node.Node}
202         @param node: the new node to try and insert
203         @type contacted: C{boolean}
204         @param contacted: whether the new node is known to be good, i.e.
205             responded to a request (optional, defaults to True)
206         """
207         old = self.table.insertNode(node, contacted=contacted)
208         if (old and old.id != self.node.id and
209             (datetime.now() - old.lastSeen) > 
210              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
211             
212             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
213                 """The pinged node never responded, so replace it."""
214                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
215                 log.err(err)
216                 self.table.replaceStaleNode(oldnode, newnode)
217             
218             def _notStaleNodeHandler(dict, old=old, self=self):
219                 """Got a pong from the old node, so update it."""
220                 dict = dict['rsp']
221                 if dict['id'] == old.id:
222                     self.table.justSeenNode(old.id)
223             
224             # Bucket is full, check to see if old node is still available
225             self.stats.startedAction('ping')
226             df = old.ping(self.node.id)
227             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
228
229     def sendJoin(self, node, callback=None, errback=None):
230         """Join the DHT by pinging a bootstrap node.
231         
232         @type node: L{node.Node}
233         @param node: the node to send the join to
234         @type callback: C{method}
235         @param callback: the method to call with the results, it must take 1
236             parameter, the contact info returned by the node
237             (optional, defaults to doing nothing with the results)
238         @type errback: C{method}
239         @param errback: the method to call if an error occurs
240             (optional, defaults to calling the callback with None)
241         """
242
243         def _pongHandler(dict, node=node, self=self, callback=callback):
244             """Node responded properly, callback with response."""
245             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
246             self.insertNode(n)
247             if callback:
248                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
249
250         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
251             """Error occurred, fail node and errback or callback with error."""
252             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
253             log.err(err)
254             self.table.nodeFailed(node)
255             if errback:
256                 errback()
257             elif callback:
258                 callback(None)
259         
260         self.stats.startedAction('join')
261         df = node.join(self.node.id)
262         df.addCallbacks(_pongHandler, _defaultPong)
263
264     def findCloseNodes(self, callback=lambda a: None, errback = None):
265         """Perform a findNode on the ID one away from our own.
266
267         This will allow us to populate our table with nodes on our network
268         closest to our own. This is called as soon as we start up with an
269         empty table.
270
271         @type callback: C{method}
272         @param callback: the method to call with the results, it must take 1
273             parameter, the list of K closest nodes
274             (optional, defaults to doing nothing with the results)
275         @type errback: C{method}
276         @param errback: the method to call if an error occurs
277             (optional, defaults to doing nothing when an error occurs)
278         """
279         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
280         self.findNode(id, callback, errback)
281
282     def refreshTable(self, force = False):
283         """Check all the buckets for those that need refreshing.
284         
285         @param force: refresh all buckets regardless of last bucket access time
286             (optional, defaults to False)
287         """
288         def callback(nodes):
289             pass
290     
291         for bucket in self.table.buckets:
292             if force or (datetime.now() - bucket.lastAccessed > 
293                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
294                 # Choose a random ID in the bucket and try and find it
295                 id = newIDInRange(bucket.min, bucket.max)
296                 self.findNode(id, callback)
297
298     def shutdown(self):
299         """Closes the port and cancels pending later calls."""
300         self.listenport.stopListening()
301         try:
302             self.next_checkpoint.cancel()
303         except:
304             pass
305         self.store.close()
306     
307     def getStats(self):
308         """Gather the statistics for the DHT."""
309         return self.stats.formatHTML()
310
311     #{ Remote interface
312     def krpc_ping(self, id, _krpc_sender):
313         """Pong with our ID.
314         
315         @type id: C{string}
316         @param id: the node ID of the sender node
317         @type _krpc_sender: (C{string}, C{int})
318         @param _krpc_sender: the sender node's IP address and port
319         """
320         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
321         self.insertNode(n, contacted = False)
322
323         return {"id" : self.node.id}
324         
325     def krpc_join(self, id, _krpc_sender):
326         """Add the node by responding with its address and port.
327         
328         @type id: C{string}
329         @param id: the node ID of the sender node
330         @type _krpc_sender: (C{string}, C{int})
331         @param _krpc_sender: the sender node's IP address and port
332         """
333         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
334         self.insertNode(n, contacted = False)
335
336         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
337         
338     def krpc_find_node(self, target, id, _krpc_sender):
339         """Find the K closest nodes to the target in the local routing table.
340         
341         @type target: C{string}
342         @param target: the target ID to find nodes for
343         @type id: C{string}
344         @param id: the node ID of the sender node
345         @type _krpc_sender: (C{string}, C{int})
346         @param _krpc_sender: the sender node's IP address and port
347         """
348         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
349         self.insertNode(n, contacted = False)
350
351         nodes = self.table.findNodes(target)
352         nodes = map(lambda node: node.contactInfo(), nodes)
353         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
354         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
355
356
357 class KhashmirRead(KhashmirBase):
358     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
359
360     _Node = KNodeRead
361
362     #{ Local interface
363     def findValue(self, key, callback, errback=None):
364         """Get the nodes that have values for the key from the global table.
365         
366         @type key: C{string}
367         @param key: the target key to find the values for
368         @type callback: C{method}
369         @param callback: the method to call with the results, it must take 1
370             parameter, the list of nodes with values
371         @type errback: C{method}
372         @param errback: the method to call if an error occurs
373             (optional, defaults to doing nothing when an error occurs)
374         """
375         # Get K nodes out of local table/cache
376         nodes = self.table.findNodes(key)
377         d = Deferred()
378         if errback:
379             d.addCallbacks(callback, errback)
380         else:
381             d.addCallback(callback)
382
383         # Search for others starting with the locally found ones
384         state = FindValue(self, key, d.callback, self.config, self.stats)
385         reactor.callLater(0, state.goWithNodes, nodes)
386
387     def valueForKey(self, key, callback, searchlocal = True):
388         """Get the values found for key in global table.
389         
390         Callback will be called with a list of values for each peer that
391         returns unique values. The final callback will be an empty list.
392
393         @type key: C{string}
394         @param key: the target key to get the values for
395         @type callback: C{method}
396         @param callback: the method to call with the results, it must take 2
397             parameters: the key, and the values found
398         @type searchlocal: C{boolean}
399         @param searchlocal: whether to also look for any local values
400         """
401         # Get any local values
402         if searchlocal:
403             l = self.store.retrieveValues(key)
404             if len(l) > 0:
405                 reactor.callLater(0, callback, key, l)
406         else:
407             l = []
408
409         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
410             """Use the found nodes to send requests for values to."""
411             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
412             reactor.callLater(0, state.goWithNodes, nodes)
413             
414         # First lookup nodes that have values for the key
415         self.findValue(key, _getValueForKey)
416
417     #{ Remote interface
418     def krpc_find_value(self, key, id, _krpc_sender):
419         """Find the number of values stored locally for the key, and the K closest nodes.
420         
421         @type key: C{string}
422         @param key: the target key to find the values and nodes for
423         @type id: C{string}
424         @param id: the node ID of the sender node
425         @type _krpc_sender: (C{string}, C{int})
426         @param _krpc_sender: the sender node's IP address and port
427         """
428         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
429         self.insertNode(n, contacted = False)
430     
431         nodes = self.table.findNodes(key)
432         nodes = map(lambda node: node.contactInfo(), nodes)
433         num_values = self.store.countValues(key)
434         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
435
436     def krpc_get_value(self, key, num, id, _krpc_sender):
437         """Retrieve the values stored locally for the key.
438         
439         @type key: C{string}
440         @param key: the target key to retrieve the values for
441         @type num: C{int}
442         @param num: the maximum number of values to retrieve, or 0 to
443             retrieve all of them
444         @type id: C{string}
445         @param id: the node ID of the sender node
446         @type _krpc_sender: (C{string}, C{int})
447         @param _krpc_sender: the sender node's IP address and port
448         """
449         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
450         self.insertNode(n, contacted = False)
451     
452         l = self.store.retrieveValues(key)
453         if num == 0 or num >= len(l):
454             return {'values' : l, "id": self.node.id}
455         else:
456             shuffle(l)
457             return {'values' : l[:num], "id": self.node.id}
458
459
460 class KhashmirWrite(KhashmirRead):
461     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
462
463     _Node = KNodeWrite
464
465     #{ Local interface
466     def storeValueForKey(self, key, value, callback=None):
467         """Stores the value for the key in the global table.
468         
469         No status in this implementation, peers respond but don't indicate
470         status of storing values.
471
472         @type key: C{string}
473         @param key: the target key to store the value for
474         @type value: C{string}
475         @param value: the value to store with the key
476         @type callback: C{method}
477         @param callback: the method to call with the results, it must take 3
478             parameters: the key, the value stored, and the result of the store
479             (optional, defaults to doing nothing with the results)
480         """
481         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
482             """Use the returned K closest nodes to store the key at."""
483             if not response:
484                 def _storedValueHandler(key, value, sender):
485                     """Default callback that does nothing."""
486                     pass
487                 response = _storedValueHandler
488             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
489             reactor.callLater(0, action.goWithNodes, nodes)
490             
491         # First find the K closest nodes to operate on.
492         self.findNode(key, _storeValueForKey)
493                     
494     #{ Remote interface
495     def krpc_store_value(self, key, value, token, id, _krpc_sender):
496         """Store the value locally with the key.
497         
498         @type key: C{string}
499         @param key: the target key to store the value for
500         @type value: C{string}
501         @param value: the value to store with the key
502         @param token: the token to confirm that this peer contacted us previously
503         @type id: C{string}
504         @param id: the node ID of the sender node
505         @type _krpc_sender: (C{string}, C{int})
506         @param _krpc_sender: the sender node's IP address and port
507         """
508         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
509         self.insertNode(n, contacted = False)
510         for secret in self.token_secrets:
511             this_token = sha(secret + _krpc_sender[0]).digest()
512             if token == this_token:
513                 self.store.storeValue(key, value)
514                 return {"id" : self.node.id}
515         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
516
517
518 class Khashmir(KhashmirWrite):
519     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
520     _Node = KNodeWrite
521
522
523 class SimpleTests(unittest.TestCase):
524     
525     timeout = 10
526     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
527                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
528                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
529                     'MAX_FAILURES': 3,
530                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
531                     'KEY_EXPIRE': 3600, 'SPEW': False, }
532
533     def setUp(self):
534         d = self.DHT_DEFAULTS.copy()
535         d['PORT'] = 4044
536         self.a = Khashmir(d)
537         d = self.DHT_DEFAULTS.copy()
538         d['PORT'] = 4045
539         self.b = Khashmir(d)
540         
541     def tearDown(self):
542         self.a.shutdown()
543         self.b.shutdown()
544         os.unlink(self.a.store.db)
545         os.unlink(self.b.store.db)
546
547     def testAddContact(self):
548         self.failUnlessEqual(len(self.a.table.buckets), 1)
549         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
550
551         self.failUnlessEqual(len(self.b.table.buckets), 1)
552         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
553
554         self.a.addContact('127.0.0.1', 4045)
555         reactor.iterate()
556         reactor.iterate()
557         reactor.iterate()
558         reactor.iterate()
559
560         self.failUnlessEqual(len(self.a.table.buckets), 1)
561         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
562         self.failUnlessEqual(len(self.b.table.buckets), 1)
563         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
564
565     def testStoreRetrieve(self):
566         self.a.addContact('127.0.0.1', 4045)
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         self.got = 0
572         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
573         reactor.iterate()
574         reactor.iterate()
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         self.a.valueForKey(sha('foo').digest(), self._cb)
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583         reactor.iterate()
584         reactor.iterate()
585         reactor.iterate()
586         reactor.iterate()
587
588     def _cb(self, key, val):
589         if not val:
590             self.failUnlessEqual(self.got, 1)
591         elif 'foobar' in val:
592             self.got = 1
593
594
595 class MultiTest(unittest.TestCase):
596     
597     timeout = 30
598     num = 20
599     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
600                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
601                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
602                     'MAX_FAILURES': 3,
603                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
604                     'KEY_EXPIRE': 3600, 'SPEW': False, }
605
606     def _done(self, val):
607         self.done = 1
608         
609     def setUp(self):
610         self.l = []
611         self.startport = 4088
612         for i in range(self.num):
613             d = self.DHT_DEFAULTS.copy()
614             d['PORT'] = self.startport + i
615             self.l.append(Khashmir(d))
616         reactor.iterate()
617         reactor.iterate()
618         
619         for i in self.l:
620             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
621             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
622             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
623             reactor.iterate()
624             reactor.iterate()
625             reactor.iterate() 
626             
627         for i in self.l:
628             self.done = 0
629             i.findCloseNodes(self._done)
630             while not self.done:
631                 reactor.iterate()
632         for i in self.l:
633             self.done = 0
634             i.findCloseNodes(self._done)
635             while not self.done:
636                 reactor.iterate()
637
638     def tearDown(self):
639         for i in self.l:
640             i.shutdown()
641             os.unlink(i.store.db)
642             
643         reactor.iterate()
644         
645     def testStoreRetrieve(self):
646         for i in range(10):
647             K = newID()
648             V = newID()
649             
650             for a in range(3):
651                 self.done = 0
652                 def _scb(key, value, result):
653                     self.done = 1
654                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
655                 while not self.done:
656                     reactor.iterate()
657
658
659                 def _rcb(key, val):
660                     if not val:
661                         self.done = 1
662                         self.failUnlessEqual(self.got, 1)
663                     elif V in val:
664                         self.got = 1
665                 for x in range(3):
666                     self.got = 0
667                     self.done = 0
668                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
669                     while not self.done:
670                         reactor.iterate()