self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
#self.app = service.Application("krpc")
- self.udp = krpc.hostbroker(self)
+ self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
self._loadRoutingTable()
#######
####### LOCAL INTERFACE - use these methods!
- def addContact(self, host, port, callback=None):
+ def addContact(self, host, port, callback=None, errback=None):
"""
ping this node and add the contact info to the table on pong!
"""
n = self.Node(NULL_ID, host, port)
- self.sendPing(n, callback=callback)
+ self.sendJoin(n, callback=callback, errback=errback)
## this call is async!
def findNode(self, id, callback, errback=None):
df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
- def sendPing(self, node, callback=None):
+ def sendJoin(self, node, callback=None, errback=None):
"""
ping a node
"""
- df = node.ping(self.node.id)
+ df = node.join(self.node.id)
## these are the callbacks we use when we issue a PING
def _pongHandler(dict, node=node, self=self, callback=callback):
n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
self.insertNode(n)
if callback:
- callback()
- def _defaultPong(err, node=node, table=self.table, callback=callback):
+ callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
+ def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
table.nodeFailed(node)
- if callback:
- callback()
+ if errback:
+ errback()
+ else:
+ callback(None)
df.addCallbacks(_pongHandler,_defaultPong)
- def findCloseNodes(self, callback=lambda a: None):
+ def findCloseNodes(self, callback=lambda a: None, errback = None):
"""
This does a findNode on the ID one away from our own.
This will allow us to populate our table with nodes on our network closest to our own.
This is called as soon as we start up with an empty table
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback)
+ self.findNode(id, callback, errback)
def refreshTable(self, force=0):
"""
self.insertNode(n, contacted=0)
return {"id" : self.node.id}
+ def krpc_join(self, id, _krpc_sender):
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
+ return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
+
def krpc_find_node(self, target, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
class KhashmirWrite(KhashmirRead):
_Node = KNodeWrite
## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
+ def storeValueForKey(self, key, value, originated, callback=None):
+ """ stores the value and origination time for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, response, self.config)
+ action = StoreValue(self.table, key, value, originated, response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
self.findNode(key, _storeValueForKey)
#### Remote Interface - called by remote nodes
- def krpc_store_value(self, key, value, id, _krpc_sender):
+ def krpc_store_value(self, key, value, originated, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
- self.store.storeValue(key, value)
+ self.store.storeValue(key, value, originated)
return {"id" : self.node.id}
# the whole shebang, for testing
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
krpc.KRPC.noisy = 0
os.unlink(self.b.store.db)
def testAddContact(self):
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
self.a.addContact('127.0.0.1', 4045)
reactor.iterate()
reactor.iterate()
reactor.iterate()
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 1)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
def testStoreRetrieve(self):
self.a.addContact('127.0.0.1', 4045)
reactor.iterate()
reactor.iterate()
self.got = 0
- self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
reactor.iterate()
reactor.iterate()
reactor.iterate()
def _cb(self, key, val):
if not val:
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif 'foobar' in val:
self.got = 1
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def _done(self, val):
self.done = 1
self.done = 0
def _scb(key, value, result):
self.done = 1
- self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
while not self.done:
reactor.iterate()
def _rcb(key, val):
if not val:
self.done = 1
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif V in val:
self.got = 1
for x in range(3):