- ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
- """
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
- if not response:
- # default callback
- def _storedValueHandler(sender):
- pass
- response=_storedValueHandler
-
- for node in nodes[:const.STORE_REDUNDANCY]:
- def cb(t, table = table, node=node, resp=response):
- self.table.insertNode(node)
- response(t)
- if node.id != self.node.id:
- def default(err, node=node, table=table):
- table.nodeFailed(node)
- df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(cb, default)
- # this call is asynch
- self.findNode(key, _storeValueForKey)
-
-
- def insertNode(self, n, contacted=1):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
-
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
- """
- old = self.table.insertNode(n, contacted=contacted)
- if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
- # the bucket is full, check to see if old node is still around and if so, replace it
-
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(oldnode=old, newnode = n):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(old, newnode)
-
- def _notStaleNodeHandler(sender, old=old):
- """ called when we get a pong from the old node """
- args, sender = sender
- sender = Node().initWithDict(sender)
- if sender.id == old.id:
- self.table.justSeenNode(old)
-
- df = old.ping(self.node.senderDict())
- df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+ def sendPing(self, node, callback=None):
+ """
+ ping a node
+ """
+ df = node.ping(self.node.id)
+ ## these are the callbacks we use when we issue a PING
+ def _pongHandler(dict, node=node, table=self.table, callback=callback):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
+ sender = {'id' : dict['id']}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ table.insertNode(n)
+ if callback:
+ callback()
+ def _defaultPong(err, node=node, table=self.table, callback=callback):
+ table.nodeFailed(node)
+ if callback:
+ callback()
+
+ df.addCallbacks(_pongHandler,_defaultPong)