294940c545c1cb5fa13af1fb3aa68ea7d1a747f9
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program.
3
4 @var isLocal: a compiled regular expression suitable for testing if an
5     IP address is from a known local or private range
6 """
7
8 import warnings
9 warnings.simplefilter("ignore", DeprecationWarning)
10
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
13 from sha import sha
14 from copy import copy
15 import os, re
16
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
21
22 from db import DB
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
28 import krpc
29
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31                      '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32                      '(172\.0?([1][6-9])|([2][0-9])|([3][0-1])\.[0-9]{1,3}\.[0-9]{1,3})|'+
33                      '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
34
35 class KhashmirBase(protocol.Factory):
36     """The base Khashmir class, with base functionality and find node, no key-value mappings.
37     
38     @type _Node: L{node.Node}
39     @ivar _Node: the knode implementation to use for this class of DHT
40     @type config: C{dictionary}
41     @ivar config: the configuration parameters for the DHT
42     @type port: C{int}
43     @ivar port: the port to listen on
44     @type store: L{db.DB}
45     @ivar store: the database to store nodes and key/value pairs in
46     @type node: L{node.Node}
47     @ivar node: this node
48     @type table: L{ktable.KTable}
49     @ivar table: the routing table
50     @type token_secrets: C{list} of C{string}
51     @ivar token_secrets: the current secrets to use to create tokens
52     @type stats: L{stats.StatsLogger}
53     @ivar stats: the statistics gatherer
54     @type udp: L{krpc.hostbroker}
55     @ivar udp: the factory for the KRPC protocol
56     @type listenport: L{twisted.internet.interfaces.IListeningPort}
57     @ivar listenport: the UDP listening port
58     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
59     @ivar next_checkpoint: the delayed call for the next checkpoint
60     """
61     
62     _Node = KNodeBase
63     
64     def __init__(self, config, cache_dir='/tmp'):
65         """Initialize the Khashmir class and call the L{setup} method.
66         
67         @type config: C{dictionary}
68         @param config: the configuration parameters for the DHT
69         @type cache_dir: C{string}
70         @param cache_dir: the directory to store all files in
71             (optional, defaults to the /tmp directory)
72         """
73         self.config = None
74         self.setup(config, cache_dir)
75         
76     def setup(self, config, cache_dir):
77         """Setup all the Khashmir sub-modules.
78         
79         @type config: C{dictionary}
80         @param config: the configuration parameters for the DHT
81         @type cache_dir: C{string}
82         @param cache_dir: the directory to store all files in
83         """
84         self.config = config
85         self.port = config['PORT']
86         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
87         self.node = self._loadSelfNode('', self.port)
88         self.table = KTable(self.node, config)
89         self.token_secrets = [newID()]
90         self.stats = StatsLogger(self.table, self.store)
91         
92         # Start listening
93         self.udp = krpc.hostbroker(self, self.stats, config)
94         self.udp.protocol = krpc.KRPC
95         self.listenport = reactor.listenUDP(self.port, self.udp)
96         
97         # Load the routing table and begin checkpointing
98         self._loadRoutingTable()
99         self.refreshTable(force = True)
100         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
101
102     def Node(self, id, host = None, port = None):
103         """Create a new node.
104         
105         @see: L{node.Node.__init__}
106         """
107         n = self._Node(id, host, port)
108         n.table = self.table
109         n.conn = self.udp.connectionForAddr((n.host, n.port))
110         return n
111     
112     def __del__(self):
113         """Stop listening for packets."""
114         self.listenport.stopListening()
115         
116     def _loadSelfNode(self, host, port):
117         """Create this node, loading any previously saved one."""
118         id = self.store.getSelfNode()
119         if not id:
120             id = newID()
121         return self._Node(id, host, port)
122         
123     def checkpoint(self):
124         """Perform some periodic maintenance operations."""
125         # Create a new token secret
126         self.token_secrets.insert(0, newID())
127         if len(self.token_secrets) > 3:
128             self.token_secrets.pop()
129             
130         # Save some parameters for reloading
131         self.store.saveSelfNode(self.node.id)
132         self.store.dumpRoutingTable(self.table.buckets)
133         
134         # DHT maintenance
135         self.store.expireValues(self.config['KEY_EXPIRE'])
136         self.refreshTable()
137         
138         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
139                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
140                                                  self.checkpoint)
141         
142     def _loadRoutingTable(self):
143         """Load the previous routing table nodes from the database.
144         
145         It's usually a good idea to call refreshTable(force = True) after
146         loading the table.
147         """
148         nodes = self.store.getRoutingTable()
149         for rec in nodes:
150             n = self.Node(rec[0], rec[1], int(rec[2]))
151             self.table.insertNode(n, contacted = False)
152             
153     #{ Local interface
154     def addContact(self, host, port, callback=None, errback=None):
155         """Ping this node and add the contact info to the table on pong.
156         
157         @type host: C{string}
158         @param host: the IP address of the node to contact
159         @type port: C{int}
160         @param port:the port of the node to contact
161         @type callback: C{method}
162         @param callback: the method to call with the results, it must take 1
163             parameter, the contact info returned by the node
164             (optional, defaults to doing nothing with the results)
165         @type errback: C{method}
166         @param errback: the method to call if an error occurs
167             (optional, defaults to calling the callback with None)
168         """
169         n = self.Node(NULL_ID, host, port)
170         self.sendJoin(n, callback=callback, errback=errback)
171
172     def findNode(self, id, callback):
173         """Find the contact info for the K closest nodes in the global table.
174         
175         @type id: C{string}
176         @param id: the target ID to find the K closest nodes of
177         @type callback: C{method}
178         @param callback: the method to call with the results, it must take 1
179             parameter, the list of K closest nodes
180         """
181         # Start with our node
182         nodes = [copy(self.node)]
183
184         # Start the finding nodes action
185         state = FindNode(self, id, callback, self.config, self.stats)
186         reactor.callLater(0, state.goWithNodes, nodes)
187     
188     def insertNode(self, node, contacted = True):
189         """Try to insert a node in our local table, pinging oldest contact if necessary.
190         
191         If all you have is a host/port, then use L{addContact}, which calls this
192         method after receiving the PONG from the remote node. The reason for
193         the seperation is we can't insert a node into the table without its
194         node ID. That means of course the node passed into this method needs
195         to be a properly formed Node object with a valid ID.
196
197         @type node: L{node.Node}
198         @param node: the new node to try and insert
199         @type contacted: C{boolean}
200         @param contacted: whether the new node is known to be good, i.e.
201             responded to a request (optional, defaults to True)
202         """
203         # Don't add any local nodes to the routing table
204         if not self.config['LOCAL_OK'] and isLocal.match(node.host):
205             return
206
207         old = self.table.insertNode(node, contacted=contacted)
208         if (old and old.id != self.node.id and
209             (datetime.now() - old.lastSeen) > 
210              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
211             
212             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
213                 """The pinged node never responded, so replace it."""
214                 log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
215                 self.stats.completedAction('ping', start)
216                 self.table.replaceStaleNode(oldnode, newnode)
217             
218             def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
219                 """Got a pong from the old node, so update it."""
220                 self.stats.completedAction('ping', start)
221                 if dict['id'] == old.id:
222                     self.table.justSeenNode(old.id)
223             
224             # Bucket is full, check to see if old node is still available
225             self.stats.startedAction('ping')
226             df = old.ping(self.node.id)
227             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
228
229     def sendJoin(self, node, callback=None, errback=None):
230         """Join the DHT by pinging a bootstrap node.
231         
232         @type node: L{node.Node}
233         @param node: the node to send the join to
234         @type callback: C{method}
235         @param callback: the method to call with the results, it must take 1
236             parameter, the contact info returned by the node
237             (optional, defaults to doing nothing with the results)
238         @type errback: C{method}
239         @param errback: the method to call if an error occurs
240             (optional, defaults to calling the callback with None)
241         """
242
243         def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
244             """Node responded properly, callback with response."""
245             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
246             self.stats.completedAction('join', start)
247             reactor.callLater(0, self.insertNode, n)
248             if callback:
249                 callback((dict['ip_addr'], dict['port']))
250
251         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
252             """Error occurred, fail node and errback or callback with error."""
253             log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
254             self.stats.completedAction('join', start)
255             self.table.nodeFailed(node)
256             if errback:
257                 errback()
258             elif callback:
259                 callback(None)
260         
261         self.stats.startedAction('join')
262         df = node.join(self.node.id)
263         df.addCallbacks(_pongHandler, _defaultPong)
264
265     def findCloseNodes(self, callback=lambda a: None):
266         """Perform a findNode on the ID one away from our own.
267
268         This will allow us to populate our table with nodes on our network
269         closest to our own. This is called as soon as we start up with an
270         empty table.
271
272         @type callback: C{method}
273         @param callback: the method to call with the results, it must take 1
274             parameter, the list of K closest nodes
275             (optional, defaults to doing nothing with the results)
276         """
277         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
278         self.findNode(id, callback)
279
280     def refreshTable(self, force = False):
281         """Check all the buckets for those that need refreshing.
282         
283         @param force: refresh all buckets regardless of last bucket access time
284             (optional, defaults to False)
285         """
286         def callback(nodes):
287             pass
288     
289         for bucket in self.table.buckets:
290             if force or (datetime.now() - bucket.lastAccessed > 
291                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
292                 # Choose a random ID in the bucket and try and find it
293                 id = newIDInRange(bucket.min, bucket.max)
294                 self.findNode(id, callback)
295
296     def shutdown(self):
297         """Closes the port and cancels pending later calls."""
298         self.listenport.stopListening()
299         try:
300             self.next_checkpoint.cancel()
301         except:
302             pass
303         self.store.close()
304     
305     def getStats(self):
306         """Gather the statistics for the DHT."""
307         return self.stats.formatHTML()
308
309     #{ Remote interface
310     def krpc_ping(self, id, _krpc_sender = None):
311         """Pong with our ID.
312         
313         @type id: C{string}
314         @param id: the node ID of the sender node
315         @type _krpc_sender: (C{string}, C{int})
316         @param _krpc_sender: the sender node's IP address and port
317         """
318         if _krpc_sender is not None:
319             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
320             reactor.callLater(0, self.insertNode, n, False)
321
322         return {"id" : self.node.id}
323         
324     def krpc_join(self, id, _krpc_sender = None):
325         """Add the node by responding with its address and port.
326         
327         @type id: C{string}
328         @param id: the node ID of the sender node
329         @type _krpc_sender: (C{string}, C{int})
330         @param _krpc_sender: the sender node's IP address and port
331         """
332         if _krpc_sender is not None:
333             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
334             reactor.callLater(0, self.insertNode, n, False)
335         else:
336             _krpc_sender = ('127.0.0.1', self.port)
337
338         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
339         
340     def krpc_find_node(self, id, target, _krpc_sender = None):
341         """Find the K closest nodes to the target in the local routing table.
342         
343         @type target: C{string}
344         @param target: the target ID to find nodes for
345         @type id: C{string}
346         @param id: the node ID of the sender node
347         @type _krpc_sender: (C{string}, C{int})
348         @param _krpc_sender: the sender node's IP address and port
349         """
350         if _krpc_sender is not None:
351             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
352             reactor.callLater(0, self.insertNode, n, False)
353         else:
354             _krpc_sender = ('127.0.0.1', self.port)
355
356         nodes = self.table.findNodes(target)
357         nodes = map(lambda node: node.contactInfo(), nodes)
358         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
359         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
360
361
362 class KhashmirRead(KhashmirBase):
363     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
364
365     _Node = KNodeRead
366
367     #{ Local interface
368     def findValue(self, key, callback):
369         """Get the nodes that have values for the key from the global table.
370         
371         @type key: C{string}
372         @param key: the target key to find the values for
373         @type callback: C{method}
374         @param callback: the method to call with the results, it must take 1
375             parameter, the list of nodes with values
376         """
377         # Start with ourself
378         nodes = [copy(self.node)]
379         
380         # Search for others starting with the locally found ones
381         state = FindValue(self, key, callback, self.config, self.stats)
382         reactor.callLater(0, state.goWithNodes, nodes)
383
384     def valueForKey(self, key, callback, searchlocal = True):
385         """Get the values found for key in global table.
386         
387         Callback will be called with a list of values for each peer that
388         returns unique values. The final callback will be an empty list.
389
390         @type key: C{string}
391         @param key: the target key to get the values for
392         @type callback: C{method}
393         @param callback: the method to call with the results, it must take 2
394             parameters: the key, and the values found
395         @type searchlocal: C{boolean}
396         @param searchlocal: whether to also look for any local values
397         """
398
399         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
400             """Use the found nodes to send requests for values to."""
401             # Get any local values
402             if searchlocal:
403                 l = self.store.retrieveValues(key)
404                 if len(l) > 0:
405                     node = copy(self.node)
406                     node.updateNumValues(len(l))
407                     nodes = nodes + [node]
408
409             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
410             reactor.callLater(0, state.goWithNodes, nodes)
411             
412         # First lookup nodes that have values for the key
413         self.findValue(key, _getValueForKey)
414
415     #{ Remote interface
416     def krpc_find_value(self, id, key, _krpc_sender = None):
417         """Find the number of values stored locally for the key, and the K closest nodes.
418         
419         @type key: C{string}
420         @param key: the target key to find the values and nodes for
421         @type id: C{string}
422         @param id: the node ID of the sender node
423         @type _krpc_sender: (C{string}, C{int})
424         @param _krpc_sender: the sender node's IP address and port
425         """
426         if _krpc_sender is not None:
427             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
428             reactor.callLater(0, self.insertNode, n, False)
429     
430         nodes = self.table.findNodes(key)
431         nodes = map(lambda node: node.contactInfo(), nodes)
432         num_values = self.store.countValues(key)
433         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
434
435     def krpc_get_value(self, id, key, num, _krpc_sender = None):
436         """Retrieve the values stored locally for the key.
437         
438         @type key: C{string}
439         @param key: the target key to retrieve the values for
440         @type num: C{int}
441         @param num: the maximum number of values to retrieve, or 0 to
442             retrieve all of them
443         @type id: C{string}
444         @param id: the node ID of the sender node
445         @type _krpc_sender: (C{string}, C{int})
446         @param _krpc_sender: the sender node's IP address and port
447         """
448         if _krpc_sender is not None:
449             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
450             reactor.callLater(0, self.insertNode, n, False)
451     
452         l = self.store.retrieveValues(key)
453         if num == 0 or num >= len(l):
454             return {'values' : l, "id": self.node.id}
455         else:
456             shuffle(l)
457             return {'values' : l[:num], "id": self.node.id}
458
459
460 class KhashmirWrite(KhashmirRead):
461     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
462
463     _Node = KNodeWrite
464
465     #{ Local interface
466     def storeValueForKey(self, key, value, callback=None):
467         """Stores the value for the key in the global table.
468         
469         No status in this implementation, peers respond but don't indicate
470         status of storing values.
471
472         @type key: C{string}
473         @param key: the target key to store the value for
474         @type value: C{string}
475         @param value: the value to store with the key
476         @type callback: C{method}
477         @param callback: the method to call with the results, it must take 3
478             parameters: the key, the value stored, and the result of the store
479             (optional, defaults to doing nothing with the results)
480         """
481         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
482             """Use the returned K closest nodes to store the key at."""
483             if not response:
484                 def _storedValueHandler(key, value, sender):
485                     """Default callback that does nothing."""
486                     pass
487                 response = _storedValueHandler
488             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
489             reactor.callLater(0, action.goWithNodes, nodes)
490             
491         # First find the K closest nodes to operate on.
492         self.findNode(key, _storeValueForKey)
493                     
494     #{ Remote interface
495     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
496         """Store the value locally with the key.
497         
498         @type key: C{string}
499         @param key: the target key to store the value for
500         @type value: C{string}
501         @param value: the value to store with the key
502         @param token: the token to confirm that this peer contacted us previously
503         @type id: C{string}
504         @param id: the node ID of the sender node
505         @type _krpc_sender: (C{string}, C{int})
506         @param _krpc_sender: the sender node's IP address and port
507         """
508         if _krpc_sender is not None:
509             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
510             reactor.callLater(0, self.insertNode, n, False)
511         else:
512             _krpc_sender = ('127.0.0.1', self.port)
513
514         for secret in self.token_secrets:
515             this_token = sha(secret + _krpc_sender[0]).digest()
516             if token == this_token:
517                 self.store.storeValue(key, value)
518                 return {"id" : self.node.id}
519         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
520
521
522 class Khashmir(KhashmirWrite):
523     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
524     _Node = KNodeWrite
525
526
527 class SimpleTests(unittest.TestCase):
528     
529     timeout = 10
530     DHT_DEFAULTS = {'PORT': 9977,
531                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
532                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
533                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
534                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
535                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
536                     'KEY_EXPIRE': 3600, 'SPEW': False, }
537
538     def setUp(self):
539         d = self.DHT_DEFAULTS.copy()
540         d['PORT'] = 4044
541         self.a = Khashmir(d)
542         d = self.DHT_DEFAULTS.copy()
543         d['PORT'] = 4045
544         self.b = Khashmir(d)
545         
546     def tearDown(self):
547         self.a.shutdown()
548         self.b.shutdown()
549         os.unlink(self.a.store.db)
550         os.unlink(self.b.store.db)
551
552     def testAddContact(self):
553         self.failUnlessEqual(len(self.a.table.buckets), 1)
554         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
555
556         self.failUnlessEqual(len(self.b.table.buckets), 1)
557         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
558
559         self.a.addContact('127.0.0.1', 4045)
560         reactor.iterate()
561         reactor.iterate()
562         reactor.iterate()
563         reactor.iterate()
564
565         self.failUnlessEqual(len(self.a.table.buckets), 1)
566         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
567         self.failUnlessEqual(len(self.b.table.buckets), 1)
568         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
569
570     def testStoreRetrieve(self):
571         self.a.addContact('127.0.0.1', 4045)
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575         reactor.iterate()
576         self.got = 0
577         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583         reactor.iterate()
584         self.a.valueForKey(sha('foo').digest(), self._cb)
585         reactor.iterate()
586         reactor.iterate()
587         reactor.iterate()
588         reactor.iterate()
589         reactor.iterate()
590         reactor.iterate()
591         reactor.iterate()
592
593     def _cb(self, key, val):
594         if not val:
595             self.failUnlessEqual(self.got, 1)
596         elif 'foobar' in val:
597             self.got = 1
598
599
600 class MultiTest(unittest.TestCase):
601     
602     timeout = 30
603     num = 20
604     DHT_DEFAULTS = {'PORT': 9977,
605                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
606                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
607                     'MAX_FAILURES': 3, 'LOCAL_OK': True,
608                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
609                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
610                     'KEY_EXPIRE': 3600, 'SPEW': False, }
611
612     def _done(self, val):
613         self.done = 1
614         
615     def setUp(self):
616         self.l = []
617         self.startport = 4088
618         for i in range(self.num):
619             d = self.DHT_DEFAULTS.copy()
620             d['PORT'] = self.startport + i
621             self.l.append(Khashmir(d))
622         reactor.iterate()
623         reactor.iterate()
624         
625         for i in self.l:
626             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
627             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
628             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
629             reactor.iterate()
630             reactor.iterate()
631             reactor.iterate() 
632             
633         for i in self.l:
634             self.done = 0
635             i.findCloseNodes(self._done)
636             while not self.done:
637                 reactor.iterate()
638         for i in self.l:
639             self.done = 0
640             i.findCloseNodes(self._done)
641             while not self.done:
642                 reactor.iterate()
643
644     def tearDown(self):
645         for i in self.l:
646             i.shutdown()
647             os.unlink(i.store.db)
648             
649         reactor.iterate()
650         
651     def testStoreRetrieve(self):
652         for i in range(10):
653             K = newID()
654             V = newID()
655             
656             for a in range(3):
657                 self.done = 0
658                 def _scb(key, value, result):
659                     self.done = 1
660                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
661                 while not self.done:
662                     reactor.iterate()
663
664
665                 def _rcb(key, val):
666                     if not val:
667                         self.done = 1
668                         self.failUnlessEqual(self.got, 1)
669                     elif V in val:
670                         self.got = 1
671                 for x in range(3):
672                     self.got = 0
673                     self.done = 0
674                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
675                     while not self.done:
676                         reactor.iterate()