from bsddb3 import db
from const import reactor
+import const
from hash import intify
from knode import KNode as Node
def handleGotNodes(self, args):
l, sender = args
sender = Node().initWithDict(sender)
+ self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
if not self.queried.has_key(node.id) and node.id != self.table.node.id:
#xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
df = node.findNode(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding >= N:
self.finished=1
reactor.callFromThread(self.callback, l[:K])
- def defaultGotNodes(self, t):
- if self.finished:
- return
- self.outstanding = self.outstanding - 1
- self.schedule()
-
+ def makeMsgFailed(self, node):
+ def defaultGotNodes(err, self=self, node=node):
+ self.table.table.nodeFailed(node)
+ if self.finished:
+ return
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+ return defaultGotNodes
def goWithNodes(self, nodes):
"""
self.found[node.id] = node
#xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
df = node.findNode(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding == 0:
def handleGotNodes(self, args):
l, sender = args
sender = Node().initWithDict(sender)
+ self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
if not z.has_key(y):
z[y] = 1
return y
+ else:
+ return None
v = filter(None, map(x, l['values']))
if(len(v)):
reactor.callFromThread(self.callback, v)
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
df = node.findValue(self.target, self.table.node.senderDict())
df.addCallback(self.handleGotNodes)
- df.addErrback(self.defaultGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding >= N:
#xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
df = node.findValue(self.target, self.table.node.senderDict())
df.addCallback(self.handleGotNodes)
- df.addErrback(self.defaultGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
if self.outstanding == 0:
reactor.callFromThread(self.callback, [])
-KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
-KE_DELAY = 60 * 60 # 1 hour
-KE_AGE = KEINITIAL_DELAY
class KeyExpirer:
def __init__(self, store, itime, kw):
self.store = store
self.itime = itime
self.kw = kw
- reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+ reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
def doExpire(self):
- self.cut = `time() - KE_AGE`
+ self.cut = `time() - const.KE_AGE`
self._expire()
def _expire(self):
break
irec = f()
- reactor.callLater(KE_DELAY, self.doExpire)
+ reactor.callLater(const.KE_DELAY, self.doExpire)
if(i > 0):
print ">>>KE: done expiring %d" % i
\ No newline at end of file
reactor = SelectReactor(installSignalHandlers=0)
from twisted.internet import main
-main.installReactor(reactor)
\ No newline at end of file
+main.installReactor(reactor)
+
+# how many times a node can fail to respond before it's booted from the routing table
+MAX_FAILURES = 3
+
+# time before expirer starts running
+KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
+
+# time between expirer runs
+KE_DELAY = 60 * 60 # 1 hour
+
+# expire entries older than this
+KE_AGE = KEINITIAL_DELAY
# create our search state
state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes, {'found' : l})
+ reactor.callFromThread(state.goWithNodes, nodes, l)
values are stored in peers on a first-come first-served basis
this will probably change so more than one value can be stored under a key
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not callback:
# default callback - this will get called for each successful store value
def _storedValueHandler(sender):
pass
response=_storedValueHandler
+
for node in nodes:
+ def cb(t, table = table, node=node, resp=response):
+ self.table.insertNode(node)
+ response(t)
if node.id != self.node.id:
+ def default(err, node=node, table=table):
+ table.nodeFailed(node)
df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(response, default)
+ df.addCallbacks(cb, default)
# this call is asynch
self.findNode(key, _storeValueForKey)
self.table.replaceStaleNode(old, newnode)
def _notStaleNodeHandler(sender, old=old):
- """ called when we get a ping from the remote node """
+ """ called when we get a pong from the old node """
sender = Node().initWithDict(sender)
if sender.id == old.id:
- self.table.insertNode(old)
+ self.table.justSeenNode(old)
df = old.ping(self.node.senderDict())
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
n = Node().initWithDict(sender)
table.insertNode(n)
return
- def _defaultPong(err):
- # this should probably increment a failed message counter and dump the node if it gets over a threshold
- return
+ def _defaultPong(err, node=node, table=self.table):
+ table.nodeFailed(node)
df.addCallbacks(_pongHandler,_defaultPong)
from const import reactor
from xmlrpclib import Binary
-class KNode(Node):
+
+class KNode(Node):
def ping(self, sender):
df = Deferred()
f = factory('ping', (sender,), df.callback, df.errback)
import time
from types import *
+import const
from node import Node
# The all-powerful, magical Kademlia "k" constant, bucket depth
return
del(self.buckets[i].l[it])
- self.buckets[i].l.append(new)
+ if new:
+ self.buckets[i].l.append(new)
def insertNode(self, node):
"""
n.updateLastSeen()
return tstamp
-
+ def nodeFailed(self, node):
+ """ call this when a node fails to respond to a message, to invalidate that node """
+ try:
+ n = self.findNodes(node.int)[0]
+ except IndexError:
+ return None
+ else:
+ if(n.msgFailed() >= const.MAX_FAILURES):
+ self.replaceStaleNode(n, None)
+
class KBucket:
__slots = ['min', 'max', 'lastAccessed']
def __init__(self, contents, min, max):
class Node:
"""encapsulate contact info"""
+
+ def __init__(self):
+ self.fails = 0
+ self.lastSeen = time.time()
+
def init(self, id, host, port):
self.id = id
self.int = hash.intify(id)
self.host = host
self.port = port
- self.lastSeen = time.time()
self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host}
return self
self.int = hash.intify(self.id)
self.port = dict['port']
self.host = dict['host']
- self.lastSeen = time.time()
return self
def updateLastSeen(self):
self.lastSeen = time.time()
-
+ self.fails = 0
+
+ def msgFailed(self):
+ self.fails = self.fails + 1
+ return self.fails
+
def senderDict(self):
return self._senderDict