- ## also async
- def valueForKey(self, key, callback):
- """ returns the values found for key in global table """
- nodes = self.table.findNodes(key)
-
- # get locals
- l = self.retrieveValues(key)
- if len(l) > 0:
- reactor.callFromThread(callback, l)
-
- # create our search state
- state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes, l)
-
-
-
- ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- values are stored in peers on a first-come first-served basis
- this will probably change so more than one value can be stored under a key
- """
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
- if not callback:
- # default callback - this will get called for each successful store value
- def _storedValueHandler(sender):
- pass
- response=_storedValueHandler
-
- for node in nodes:
- def cb(t, table = table, node=node, resp=response):
- self.table.insertNode(node)
- response(t)
- if node.id != self.node.id:
- def default(err, node=node, table=table):
- table.nodeFailed(node)
- df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(cb, default)
- # this call is asynch
- self.findNode(key, _storeValueForKey)
-
-
- def insertNode(self, n):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
-
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
- """
- old = self.table.insertNode(n)
- if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
- # the bucket is full, check to see if old node is still around and if so, replace it
-
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(oldnode=old, newnode = n):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(old, newnode)
-
- def _notStaleNodeHandler(sender, old=old):
- """ called when we get a pong from the old node """
- sender = Node().initWithDict(sender)
- if sender.id == old.id:
- self.table.justSeenNode(old)
-
- df = old.ping(self.node.senderDict())
- df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
-
-
- def sendPing(self, node):
- """
- ping a node
- """
- df = node.ping(self.node.senderDict())
- ## these are the callbacks we use when we issue a PING
- def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
- if id != 20 * ' ' and id != sender['id'].data:
- # whoah, got response from different peer than we were expecting
- pass
- else:
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
- sender['host'] = host
- sender['port'] = port
- n = Node().initWithDict(sender)
- table.insertNode(n)
- return
- def _defaultPong(err, node=node, table=self.table):
- table.nodeFailed(node)
-
- df.addCallbacks(_pongHandler,_defaultPong)
-
-
- def findCloseNodes(self):
- """
- This does a findNode on the ID one away from our own.
- This will allow us to populate our table with nodes on our network closest to our own.
- This is called as soon as we start up with an empty table
- """
- id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- def callback(nodes):
- pass
- self.findNode(id, callback)
-
- def refreshTable(self):
- """
-
- """
- def callback(nodes):
- pass
-
- for bucket in self.table.buckets:
- if time.time() - bucket.lastAccessed >= 60 * 60:
- id = randRange(bucket.min, bucket.max)
- self.findNode(id, callback)
-
-
+ for bucket in self.table.buckets:
+ if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+ id = newIDInRange(bucket.min, bucket.max)
+ self.findNode(id, callback)
+
+ def stats(self):
+ """
+ Returns (num_contacts, num_nodes)
+ num_contacts: number contacts in our routing table
+ num_nodes: number of nodes estimated in the entire dht
+ """
+ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+ num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+ return (num_contacts, num_nodes)
+
+ def krpc_ping(self, id, _krpc_sender):
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
+ def krpc_find_node(self, target, id, _krpc_sender):
+ nodes = self.table.findNodes(target)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = self.Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"nodes" : nodes, "id" : self.node.id}
+
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+ _Node = KNodeRead