5db5ed8ab48015c9c73073f9c8ceab8eac652701
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         """
174         # Start with our node
175         nodes = [copy(self.node)]
176
177         # Start the finding nodes action
178         state = FindNode(self, id, callback, self.config, self.stats)
179         reactor.callLater(0, state.goWithNodes, nodes)
180     
181     def insertNode(self, node, contacted = True):
182         """Try to insert a node in our local table, pinging oldest contact if necessary.
183         
184         If all you have is a host/port, then use L{addContact}, which calls this
185         method after receiving the PONG from the remote node. The reason for
186         the seperation is we can't insert a node into the table without its
187         node ID. That means of course the node passed into this method needs
188         to be a properly formed Node object with a valid ID.
189
190         @type node: L{node.Node}
191         @param node: the new node to try and insert
192         @type contacted: C{boolean}
193         @param contacted: whether the new node is known to be good, i.e.
194             responded to a request (optional, defaults to True)
195         """
196         old = self.table.insertNode(node, contacted=contacted)
197         if (old and old.id != self.node.id and
198             (datetime.now() - old.lastSeen) > 
199              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
200             
201             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
202                 """The pinged node never responded, so replace it."""
203                 log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
204                 self.stats.completedAction('ping', start)
205                 self.table.replaceStaleNode(oldnode, newnode)
206             
207             def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
208                 """Got a pong from the old node, so update it."""
209                 self.stats.completedAction('ping', start)
210                 if dict['id'] == old.id:
211                     self.table.justSeenNode(old.id)
212             
213             # Bucket is full, check to see if old node is still available
214             self.stats.startedAction('ping')
215             df = old.ping(self.node.id)
216             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
217
218     def sendJoin(self, node, callback=None, errback=None):
219         """Join the DHT by pinging a bootstrap node.
220         
221         @type node: L{node.Node}
222         @param node: the node to send the join to
223         @type callback: C{method}
224         @param callback: the method to call with the results, it must take 1
225             parameter, the contact info returned by the node
226             (optional, defaults to doing nothing with the results)
227         @type errback: C{method}
228         @param errback: the method to call if an error occurs
229             (optional, defaults to calling the callback with None)
230         """
231
232         def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
233             """Node responded properly, callback with response."""
234             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
235             self.stats.completedAction('join', start)
236             reactor.callLater(0, self.insertNode, n)
237             if callback:
238                 callback((dict['ip_addr'], dict['port']))
239
240         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
241             """Error occurred, fail node and errback or callback with error."""
242             log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
243             self.stats.completedAction('join', start)
244             self.table.nodeFailed(node)
245             if errback:
246                 errback()
247             elif callback:
248                 callback(None)
249         
250         self.stats.startedAction('join')
251         df = node.join(self.node.id)
252         df.addCallbacks(_pongHandler, _defaultPong)
253
254     def findCloseNodes(self, callback=lambda a: None):
255         """Perform a findNode on the ID one away from our own.
256
257         This will allow us to populate our table with nodes on our network
258         closest to our own. This is called as soon as we start up with an
259         empty table.
260
261         @type callback: C{method}
262         @param callback: the method to call with the results, it must take 1
263             parameter, the list of K closest nodes
264             (optional, defaults to doing nothing with the results)
265         """
266         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
267         self.findNode(id, callback)
268
269     def refreshTable(self, force = False):
270         """Check all the buckets for those that need refreshing.
271         
272         @param force: refresh all buckets regardless of last bucket access time
273             (optional, defaults to False)
274         """
275         def callback(nodes):
276             pass
277     
278         for bucket in self.table.buckets:
279             if force or (datetime.now() - bucket.lastAccessed > 
280                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
281                 # Choose a random ID in the bucket and try and find it
282                 id = newIDInRange(bucket.min, bucket.max)
283                 self.findNode(id, callback)
284
285     def shutdown(self):
286         """Closes the port and cancels pending later calls."""
287         self.listenport.stopListening()
288         try:
289             self.next_checkpoint.cancel()
290         except:
291             pass
292         self.store.close()
293     
294     def getStats(self):
295         """Gather the statistics for the DHT."""
296         return self.stats.formatHTML()
297
298     #{ Remote interface
299     def krpc_ping(self, id, _krpc_sender = None):
300         """Pong with our ID.
301         
302         @type id: C{string}
303         @param id: the node ID of the sender node
304         @type _krpc_sender: (C{string}, C{int})
305         @param _krpc_sender: the sender node's IP address and port
306         """
307         if _krpc_sender is not None:
308             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
309             reactor.callLater(0, self.insertNode, n, False)
310
311         return {"id" : self.node.id}
312         
313     def krpc_join(self, id, _krpc_sender = None):
314         """Add the node by responding with its address and port.
315         
316         @type id: C{string}
317         @param id: the node ID of the sender node
318         @type _krpc_sender: (C{string}, C{int})
319         @param _krpc_sender: the sender node's IP address and port
320         """
321         if _krpc_sender is not None:
322             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
323             reactor.callLater(0, self.insertNode, n, False)
324         else:
325             _krpc_sender = ('127.0.0.1', self.port)
326
327         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
328         
329     def krpc_find_node(self, id, target, _krpc_sender = None):
330         """Find the K closest nodes to the target in the local routing table.
331         
332         @type target: C{string}
333         @param target: the target ID to find nodes for
334         @type id: C{string}
335         @param id: the node ID of the sender node
336         @type _krpc_sender: (C{string}, C{int})
337         @param _krpc_sender: the sender node's IP address and port
338         """
339         if _krpc_sender is not None:
340             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
341             reactor.callLater(0, self.insertNode, n, False)
342         else:
343             _krpc_sender = ('127.0.0.1', self.port)
344
345         nodes = self.table.findNodes(target)
346         nodes = map(lambda node: node.contactInfo(), nodes)
347         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
348         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
349
350
351 class KhashmirRead(KhashmirBase):
352     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
353
354     _Node = KNodeRead
355
356     #{ Local interface
357     def findValue(self, key, callback):
358         """Get the nodes that have values for the key from the global table.
359         
360         @type key: C{string}
361         @param key: the target key to find the values for
362         @type callback: C{method}
363         @param callback: the method to call with the results, it must take 1
364             parameter, the list of nodes with values
365         """
366         # Start with ourself
367         nodes = [copy(self.node)]
368         
369         # Search for others starting with the locally found ones
370         state = FindValue(self, key, callback, self.config, self.stats)
371         reactor.callLater(0, state.goWithNodes, nodes)
372
373     def valueForKey(self, key, callback, searchlocal = True):
374         """Get the values found for key in global table.
375         
376         Callback will be called with a list of values for each peer that
377         returns unique values. The final callback will be an empty list.
378
379         @type key: C{string}
380         @param key: the target key to get the values for
381         @type callback: C{method}
382         @param callback: the method to call with the results, it must take 2
383             parameters: the key, and the values found
384         @type searchlocal: C{boolean}
385         @param searchlocal: whether to also look for any local values
386         """
387
388         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
389             """Use the found nodes to send requests for values to."""
390             # Get any local values
391             if searchlocal:
392                 l = self.store.retrieveValues(key)
393                 if len(l) > 0:
394                     node = copy(self.node)
395                     node.updateNumValues(len(l))
396                     nodes = nodes + [node]
397
398             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
399             reactor.callLater(0, state.goWithNodes, nodes)
400             
401         # First lookup nodes that have values for the key
402         self.findValue(key, _getValueForKey)
403
404     #{ Remote interface
405     def krpc_find_value(self, id, key, _krpc_sender = None):
406         """Find the number of values stored locally for the key, and the K closest nodes.
407         
408         @type key: C{string}
409         @param key: the target key to find the values and nodes for
410         @type id: C{string}
411         @param id: the node ID of the sender node
412         @type _krpc_sender: (C{string}, C{int})
413         @param _krpc_sender: the sender node's IP address and port
414         """
415         if _krpc_sender is not None:
416             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
417             reactor.callLater(0, self.insertNode, n, False)
418     
419         nodes = self.table.findNodes(key)
420         nodes = map(lambda node: node.contactInfo(), nodes)
421         num_values = self.store.countValues(key)
422         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
423
424     def krpc_get_value(self, id, key, num, _krpc_sender = None):
425         """Retrieve the values stored locally for the key.
426         
427         @type key: C{string}
428         @param key: the target key to retrieve the values for
429         @type num: C{int}
430         @param num: the maximum number of values to retrieve, or 0 to
431             retrieve all of them
432         @type id: C{string}
433         @param id: the node ID of the sender node
434         @type _krpc_sender: (C{string}, C{int})
435         @param _krpc_sender: the sender node's IP address and port
436         """
437         if _krpc_sender is not None:
438             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
439             reactor.callLater(0, self.insertNode, n, False)
440     
441         l = self.store.retrieveValues(key)
442         if num == 0 or num >= len(l):
443             return {'values' : l, "id": self.node.id}
444         else:
445             shuffle(l)
446             return {'values' : l[:num], "id": self.node.id}
447
448
449 class KhashmirWrite(KhashmirRead):
450     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
451
452     _Node = KNodeWrite
453
454     #{ Local interface
455     def storeValueForKey(self, key, value, callback=None):
456         """Stores the value for the key in the global table.
457         
458         No status in this implementation, peers respond but don't indicate
459         status of storing values.
460
461         @type key: C{string}
462         @param key: the target key to store the value for
463         @type value: C{string}
464         @param value: the value to store with the key
465         @type callback: C{method}
466         @param callback: the method to call with the results, it must take 3
467             parameters: the key, the value stored, and the result of the store
468             (optional, defaults to doing nothing with the results)
469         """
470         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
471             """Use the returned K closest nodes to store the key at."""
472             if not response:
473                 def _storedValueHandler(key, value, sender):
474                     """Default callback that does nothing."""
475                     pass
476                 response = _storedValueHandler
477             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
478             reactor.callLater(0, action.goWithNodes, nodes)
479             
480         # First find the K closest nodes to operate on.
481         self.findNode(key, _storeValueForKey)
482                     
483     #{ Remote interface
484     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
485         """Store the value locally with the key.
486         
487         @type key: C{string}
488         @param key: the target key to store the value for
489         @type value: C{string}
490         @param value: the value to store with the key
491         @param token: the token to confirm that this peer contacted us previously
492         @type id: C{string}
493         @param id: the node ID of the sender node
494         @type _krpc_sender: (C{string}, C{int})
495         @param _krpc_sender: the sender node's IP address and port
496         """
497         if _krpc_sender is not None:
498             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
499             reactor.callLater(0, self.insertNode, n, False)
500         else:
501             _krpc_sender = ('127.0.0.1', self.port)
502
503         for secret in self.token_secrets:
504             this_token = sha(secret + _krpc_sender[0]).digest()
505             if token == this_token:
506                 self.store.storeValue(key, value)
507                 return {"id" : self.node.id}
508         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
509
510
511 class Khashmir(KhashmirWrite):
512     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
513     _Node = KNodeWrite
514
515
516 class SimpleTests(unittest.TestCase):
517     
518     timeout = 10
519     DHT_DEFAULTS = {'PORT': 9977,
520                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
521                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
522                     'MAX_FAILURES': 3,
523                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
524                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
525                     'KEY_EXPIRE': 3600, 'SPEW': False, }
526
527     def setUp(self):
528         d = self.DHT_DEFAULTS.copy()
529         d['PORT'] = 4044
530         self.a = Khashmir(d)
531         d = self.DHT_DEFAULTS.copy()
532         d['PORT'] = 4045
533         self.b = Khashmir(d)
534         
535     def tearDown(self):
536         self.a.shutdown()
537         self.b.shutdown()
538         os.unlink(self.a.store.db)
539         os.unlink(self.b.store.db)
540
541     def testAddContact(self):
542         self.failUnlessEqual(len(self.a.table.buckets), 1)
543         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
544
545         self.failUnlessEqual(len(self.b.table.buckets), 1)
546         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
547
548         self.a.addContact('127.0.0.1', 4045)
549         reactor.iterate()
550         reactor.iterate()
551         reactor.iterate()
552         reactor.iterate()
553
554         self.failUnlessEqual(len(self.a.table.buckets), 1)
555         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
556         self.failUnlessEqual(len(self.b.table.buckets), 1)
557         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
558
559     def testStoreRetrieve(self):
560         self.a.addContact('127.0.0.1', 4045)
561         reactor.iterate()
562         reactor.iterate()
563         reactor.iterate()
564         reactor.iterate()
565         self.got = 0
566         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         self.a.valueForKey(sha('foo').digest(), self._cb)
574         reactor.iterate()
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581
582     def _cb(self, key, val):
583         if not val:
584             self.failUnlessEqual(self.got, 1)
585         elif 'foobar' in val:
586             self.got = 1
587
588
589 class MultiTest(unittest.TestCase):
590     
591     timeout = 30
592     num = 20
593     DHT_DEFAULTS = {'PORT': 9977,
594                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
595                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
596                     'MAX_FAILURES': 3,
597                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
598                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
599                     'KEY_EXPIRE': 3600, 'SPEW': False, }
600
601     def _done(self, val):
602         self.done = 1
603         
604     def setUp(self):
605         self.l = []
606         self.startport = 4088
607         for i in range(self.num):
608             d = self.DHT_DEFAULTS.copy()
609             d['PORT'] = self.startport + i
610             self.l.append(Khashmir(d))
611         reactor.iterate()
612         reactor.iterate()
613         
614         for i in self.l:
615             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
617             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
618             reactor.iterate()
619             reactor.iterate()
620             reactor.iterate() 
621             
622         for i in self.l:
623             self.done = 0
624             i.findCloseNodes(self._done)
625             while not self.done:
626                 reactor.iterate()
627         for i in self.l:
628             self.done = 0
629             i.findCloseNodes(self._done)
630             while not self.done:
631                 reactor.iterate()
632
633     def tearDown(self):
634         for i in self.l:
635             i.shutdown()
636             os.unlink(i.store.db)
637             
638         reactor.iterate()
639         
640     def testStoreRetrieve(self):
641         for i in range(10):
642             K = newID()
643             V = newID()
644             
645             for a in range(3):
646                 self.done = 0
647                 def _scb(key, value, result):
648                     self.done = 1
649                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
650                 while not self.done:
651                     reactor.iterate()
652
653
654                 def _rcb(key, val):
655                     if not val:
656                         self.done = 1
657                         self.failUnlessEqual(self.got, 1)
658                     elif V in val:
659                         self.got = 1
660                 for x in range(3):
661                     self.got = 0
662                     self.done = 0
663                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
664                     while not self.done:
665                         reactor.iterate()