## Copyright 2002 Andrew Loewenstern, All Rights Reserved
from const import reactor
+import const
+
import time
from pickle import loads, dumps
from sha import sha
from bsddb3 import db ## find this at http://pybsddb.sf.net/
from bsddb3._db import DBNotFoundError
-from base64 import decodestring as decode
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
+from xmlrpclib import Binary
def valueForKey(self, key, callback):
""" returns the values found for key in global table """
nodes = self.table.findNodes(key)
- # decode values, they will be base64 encoded
- def cbwrap(values, cb=callback):
- values = map(lambda x: decode(x), values)
- callback(values)
- # create our search state
- state = GetValue(self, key, cbwrap)
- reactor.callFromThread(state.goWithNodes, nodes)
+ # get locals
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ reactor.callFromThread(callback, l)
+
+ # create our search state
+ state = GetValue(self, key, callback)
+ reactor.callFromThread(state.goWithNodes, nodes, l)
+
## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
values are stored in peers on a first-come first-served basis
this will probably change so more than one value can be stored under a key
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not callback:
# default callback - this will get called for each successful store value
def _storedValueHandler(sender):
pass
response=_storedValueHandler
+
for node in nodes:
+ def cb(t, table = table, node=node, resp=response):
+ self.table.insertNode(node)
+ response(t)
if node.id != self.node.id:
+ def default(err, node=node, table=table):
+ table.nodeFailed(node)
df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(response, default)
+ df.addCallback(cb)
# this call is asynch
self.findNode(key, _storeValueForKey)
- def insertNode(self, n):
+ def insertNode(self, n, contacted=1):
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
a node into the table without it's peer-ID. That means of course the node passed into this
method needs to be a properly formed Node object with a valid ID.
"""
- old = self.table.insertNode(n)
- if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
## these are the callbacks used when we ping the oldest node in a bucket
self.table.replaceStaleNode(old, newnode)
def _notStaleNodeHandler(sender, old=old):
- """ called when we get a ping from the remote node """
- if sender['id'] == old.id:
- self.table.insertNode(old)
+ """ called when we get a pong from the old node """
+ sender, conn = sender
+ sender['host'] = conn['host']
+ sender = Node().initWithDict(sender)
+ if sender.id == old.id:
+ self.table.justSeenNode(old)
df = old.ping(self.node.senderDict())
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
- if id != 20 * ' ' and id != sender['id']:
+ sender = sender[0]
+ if id != 20 * ' ' and id != sender['id'].data:
# whoah, got response from different peer than we were expecting
pass
else:
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
sender['host'] = host
sender['port'] = port
n = Node().initWithDict(sender)
table.insertNode(n)
return
- def _defaultPong(err):
- # this should probably increment a failed message counter and dump the node if it gets over a threshold
- return
+ def _defaultPong(err, node=node, table=self.table):
+ table.nodeFailed(node)
df.addCallbacks(_pongHandler,_defaultPong)
self.findNode(id, callback)
+ def retrieveValues(self, key):
+ if self.kw.has_key(key):
+ c = self.kw.cursor()
+ tup = c.set(key)
+ l = []
+ while(tup and tup[0] == key):
+ h1 = tup[1]
+ v = loads(self.store[h1])[1]
+ l.append(v)
+ tup = c.next()
+ return l
+ return []
+
#####
##### INCOMING MESSAGE HANDLERS
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target)
+ nodes = self.table.findNodes(target.data)
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
def xmlrpc_store_value(self, key, value, sender):
- key = decode(key)
- h1 = sha(key+value).digest()
+ key = key.data
+ h1 = sha(key+value.data).digest()
t = `time.time()`
if not self.store.has_key(h1):
- v = dumps((key, value, t))
+ v = dumps((key, value.data, t))
self.store.put(h1, v)
self.itime.put(t, h1)
self.kw.put(key, h1)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- key = decode(key)
+ key = key.data
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
- if self.kw.has_key(key):
- c = self.kw.cursor()
- tup = c.set(key)
- l = []
- while(tup):
- h1 = tup[1]
- v = loads(self.store[h1])[1]
- l.append(v)
- tup = c.next()
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ l = map(lambda v: Binary(v), l)
return {'values' : l}, self.node.senderDict()
else:
nodes = self.table.findNodes(key)
try:
if(len(values) == 0):
if not self.found:
- print "find FAILED"
+ print "find NOT FOUND"
else:
print "find FOUND"
sys.stdout.flush()
d.valueForKey(key, cb(fc).callback)
fc.wait()
-def test_one(port):
+def test_one(host, port):
import thread
- k = Khashmir('localhost', port)
+ k = Khashmir(host, port)
thread.start_new_thread(k.app.run, ())
return k
if __name__ == "__main__":
- l = test_build_net()
+ import sys
+ n = 8
+ if len(sys.argv) > 1:
+ n = int(sys.argv[1])
+ l = test_build_net(peers=n)
time.sleep(3)
print "finding nodes..."
for i in range(10):