From bb75d0a628c92d0e5602fa6be1ef9a37c3fb0d86 Mon Sep 17 00:00:00 2001 From: burris Date: Sun, 1 Dec 2002 21:08:01 +0000 Subject: [PATCH] fixed borked tab/space problems, damn ProjectBuilder doesn't come with reasonable defaults stopped using "int" for ivars since that conflicts with a builtin, now use node.num for getting the integer representation of a node's ID --- actions.py | 8 +- const.py | 6 + hash.py | 4 +- khashmir.py | 731 ++++++++++++++++++++++++++-------------------------- ktable.py | 46 ++-- node.py | 119 +++++---- 6 files changed, 455 insertions(+), 459 deletions(-) diff --git a/actions.py b/actions.py index 1c58ee0..01fb3c7 100644 --- a/actions.py +++ b/actions.py @@ -1,6 +1,4 @@ from time import time -from bencode import bdecode as loads -from bencode import bencode as dumps from const import reactor import const @@ -14,7 +12,7 @@ class ActionBase: def __init__(self, table, target, callback): self.table = table self.target = target - self.int = intify(target) + self.num = intify(target) self.found = {} self.queried = {} self.answered = {} @@ -22,9 +20,9 @@ class ActionBase: self.outstanding = 0 self.finished = 0 - def sort(a, b, int=self.int): + def sort(a, b, num=self.num): """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = int ^ a.int, int ^ b.int + x, y = num ^ a.num, num ^ b.num if x > y: return 1 elif x < y: diff --git a/const.py b/const.py index f5feed4..4926f00 100644 --- a/const.py +++ b/const.py @@ -7,6 +7,12 @@ main.installReactor(reactor) # magic id to use before we know a peer's id NULL_ID = 20 * '\0' +# Kademlia "K" constant +K = 8 + +# SHA1 is 160 bits long +HASH_LENGTH = 160 + ### SEARCHING/STORING # concurrent xmlrpc calls per find node/value request! diff --git a/hash.py b/hash.py index bc68686..4de069f 100644 --- a/hash.py +++ b/hash.py @@ -8,9 +8,9 @@ def intify(hstr): assert len(hstr) == 20 return long(hstr.encode('hex'), 16) -def stringify(int): +def stringify(num): """long int -> 20-character string""" - str = hex(int)[2:] + str = hex(num)[2:] if str[-1] == 'L': str = str[:-1] if len(str) % 2 != 0: diff --git a/khashmir.py b/khashmir.py index 36ff79f..b0b414d 100644 --- a/khashmir.py +++ b/khashmir.py @@ -27,407 +27,400 @@ KhashmirDBExcept = "KhashmirDBExcept" # this is the main class! class Khashmir(xmlrpc.XMLRPC): - __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') - def __init__(self, host, port, db='khashmir.db'): - self.node = Node().init(newID(), host, port) - self.table = KTable(self.node) - self.app = Application("xmlrpc") - self.app.listenTCP(port, server.Site(self)) - self.findDB(db) - self.last = time.time() - KeyExpirer(store=self.store) - - def findDB(self, db): - import os - try: - os.stat(db) - except OSError: - self.createNewDB(db) - else: - self.loadDB(db) - - def loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - self.store.autocommit = 1 - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') + def __init__(self, host, port, db='khashmir.db'): + self.setup(host, port, db) + def setup(self, host, port, db='khashmir.db'): + self.node = Node().init(newID(), host, port) + self.table = KTable(self.node) + self.app = Application("xmlrpc") + self.app.listenTCP(port, server.Site(self)) + self.findDB(db) + self.last = time.time() + KeyExpirer(store=self.store) + + def findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self.createNewDB(db) + else: + self.loadDB(db) - def createNewDB(self, db): - self.store = sqlite.connect(db=db) - self.store.autocommit = 1 - s = """ - create table kv (key text, value text, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); + def loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback - create table nodes (id text primary key, host text, port number); - """ - c = self.store.cursor() - c.execute(s) - - def render(self, request): - """ - Override the built in render so we can have access to the request object! - note, crequest is probably only valid on the initial call (not after deferred!) - """ - self.crequest = request - return xmlrpc.XMLRPC.render(self, request) - + def createNewDB(self, db): + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + s = """ + create table kv (key text, value text, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id text primary key, host text, port number); + """ + c = self.store.cursor() + c.execute(s) + + def render(self, request): + """ + Override the built in render so we can have access to the request object! + note, crequest is probably only valid on the initial call (not after deferred!) + """ + self.crequest = request + return xmlrpc.XMLRPC.render(self, request) + + + ####### + ####### LOCAL INTERFACE - use these methods! + def addContact(self, host, port): + """ + ping this node and add the contact info to the table on pong! + """ + n =Node().init(const.NULL_ID, host, port) # note, we + self.sendPing(n) + + ## this call is async! + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callFromThread(state.goWithNodes, nodes) - ####### - ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port): - """ - ping this node and add the contact info to the table on pong! - """ - n =Node().init(const.NULL_ID, host, port) # note, we - self.sendPing(n) - - - ## this call is async! - def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : - d.callback(nodes) - else: - # create our search state - state = FindNode(self, id, d.callback) - reactor.callFromThread(state.goWithNodes, nodes) - - - ## also async - def valueForKey(self, key, callback): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) - - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, l) - - - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not callback: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler + ## also async + def valueForKey(self, key, callback): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) + + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + + for node in nodes[:const.STORE_REDUNDANCY]: + def cb(t, table = table, node=node, resp=response): + self.table.insertNode(node) + response(t) + if node.id != self.node.id: + def default(err, node=node, table=table): + table.nodeFailed(node) + df = node.storeValue(key, value, self.node.senderDict()) + df.addCallbacks(cb, default) + # this call is asynch + self.findNode(key, _storeValueForKey) - for node in nodes[:const.STORE_REDUNDANCY]: - def cb(t, table = table, node=node, resp=response): - self.table.insertNode(node) - response(t) - if node.id != self.node.id: - def default(err, node=node, table=table): - table.nodeFailed(node) - df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(cb, lambda x: None) - # this call is asynch - self.findNode(key, _storeValueForKey) + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(sender, old=old): + """ called when we get a pong from the old node """ + args, sender = sender + sender = Node().initWithDict(sender) + if sender.id == old.id: + self.table.justSeenNode(old) + + df = old.ping(self.node.senderDict()) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node): + """ + ping a node + """ + df = node.ping(self.node.senderDict()) + ## these are the callbacks we use when we issue a PING + def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): + l, sender = args + if id != const.NULL_ID and id != sender['id'].decode('base64'): + # whoah, got response from different peer than we were expecting + pass + else: + sender['host'] = host + sender['port'] = port + n = Node().initWithDict(sender) + table.insertNode(n) + return + def _defaultPong(err, node=node, table=self.table): + table.nodeFailed(node) + + df.addCallbacks(_pongHandler,_defaultPong) + + def findCloseNodes(self, callback=lambda a: None): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) + + def refreshTable(self): + """ + + """ + def callback(nodes): + pass - def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary + for bucket in self.table.buckets: + if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS: + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) + + + def retrieveValues(self, key): + s = "select value from kv where key = '%s';" % key.encode('base64') + c = self.store.cursor() + c.execute(s) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n, contacted=contacted) - if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) + ##### + ##### INCOMING MESSAGE HANDLERS - def _notStaleNodeHandler(sender, old=old): - """ called when we get a pong from the old node """ - sender = Node().initWithDict(sender) - if sender.id == old.id: - self.table.justSeenNode(old) - - df = old.ping(self.node.senderDict()) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) - - - def sendPing(self, node): - """ - ping a node - """ - df = node.ping(self.node.senderDict()) - ## these are the callbacks we use when we issue a PING - def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): - l, sender = args - if id != const.NULL_ID and id != sender['id'].decode('base64'): - # whoah, got response from different peer than we were expecting - pass - else: - sender['host'] = host - sender['port'] = port + def xmlrpc_ping(self, sender): + """ + takes sender dict = {'id', , 'port', port} optional keys = 'ip' + returns sender dict + """ + ip = self.crequest.getClientIP() + sender['host'] = ip n = Node().initWithDict(sender) - table.insertNode(n) - return - def _defaultPong(err, node=node, table=self.table): - table.nodeFailed(node) - - df.addCallbacks(_pongHandler,_defaultPong) - - - def findCloseNodes(self, callback=lambda a: None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback) - - def refreshTable(self): - """ - - """ - def callback(nodes): - pass - - for bucket in self.table.buckets: - if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS: - id = newIDInRange(bucket.min, bucket.max) - self.findNode(id, callback) - - - def retrieveValues(self, key): - s = "select value from kv where key = '%s';" % key.encode('base64') - c = self.store.cursor() - c.execute(s) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - - ##### - ##### INCOMING MESSAGE HANDLERS - - def xmlrpc_ping(self, sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict - """ - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() + self.insertNode(n, contacted=0) + return (), self.node.senderDict() - def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target.decode('base64')) - nodes = map(lambda node: node.senderDict(), nodes) - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return nodes, self.node.senderDict() + def xmlrpc_find_node(self, target, sender): + nodes = self.table.findNodes(target.decode('base64')) + nodes = map(lambda node: node.senderDict(), nodes) + ip = self.crequest.getClientIP() + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return nodes, self.node.senderDict() - def xmlrpc_store_value(self, key, value, sender): - t = "%0.6f" % time.time() - s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t) - c = self.store.cursor() - try: - c.execute(s) - except pysqlite_exceptions.IntegrityError, reason: - # update last insert time - s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value) - c.execute(s) - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() + def xmlrpc_store_value(self, key, value, sender): + t = "%0.6f" % time.time() + s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t) + c = self.store.cursor() + try: + c.execute(s) + except pysqlite_exceptions.IntegrityError, reason: + # update last insert time + s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value) + c.execute(s) + ip = self.crequest.getClientIP() + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + return (), self.node.senderDict() - def xmlrpc_find_value(self, key, sender): - ip = self.crequest.getClientIP() - key = key.decode('base64') - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - - l = self.retrieveValues(key) - if len(l) > 0: - return {'values' : l}, self.node.senderDict() - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes}, self.node.senderDict() - - - - - + def xmlrpc_find_value(self, key, sender): + ip = self.crequest.getClientIP() + key = key.decode('base64') + sender['host'] = ip + n = Node().initWithDict(sender) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l}, self.node.senderDict() + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes}, self.node.senderDict() #------ testing -def test_build_net(quiet=0, peers=24, host='localhost', pause=1): - from whrandom import randrange - import threading - import thread - port = 2001 - l = [] - - if not quiet: - print "Building %s peer table." % peers +def test_build_net(quiet=0, peers=24, host='localhost', pause=0): + from whrandom import randrange + import threading + import thread + port = 2001 + l = [] + + if not quiet: + print "Building %s peer table." % peers - for i in xrange(peers): - a = Khashmir(host, port + i, db = '/tmp/test'+`i`) - l.append(a) - - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run() - time.sleep(10) - - print "adding contacts...." - - for peer in l[1:]: - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) + for i in xrange(peers): + a = Khashmir(host, port + i, db = '/tmp/test'+`i`) + l.append(a) + + + thread.start_new_thread(l[0].app.run, ()) + time.sleep(1) + for peer in l[1:]: + peer.app.run() + time.sleep(10) + + print "adding contacts...." + + for peer in l[1:]: + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) + n = l[randrange(0, len(l))].node + peer.addContact(host, n.port) if pause: - time.sleep(.33) + time.sleep(.33) - time.sleep(10) - print "finding close nodes...." - - for peer in l: - flag = threading.Event() - def cb(nodes, f=flag): - f.set() - peer.findCloseNodes(cb) - flag.wait() - -# for peer in l: -# peer.refreshTable() - return l + time.sleep(10) + print "finding close nodes...." + + for peer in l: + flag = threading.Event() + def cb(nodes, f=flag): + f.set() + peer.findCloseNodes(cb) + flag.wait() + + # for peer in l: + # peer.refreshTable() + return l def test_find_nodes(l, quiet=0): - import threading, sys - from whrandom import randrange - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag, id = b.node.id): - if (len(nodes) >0) and (nodes[0].id == id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() + import threading, sys + from whrandom import randrange + flag = threading.Event() + + n = len(l) + + a = l[randrange(0,n)] + b = l[randrange(0,n)] + + def callback(nodes, flag=flag, id = b.node.id): + if (len(nodes) >0) and (nodes[0].id == id): + print "test_find_nodes PASSED" + else: + print "test_find_nodes FAILED" + flag.set() + a.findNode(b.node.id, callback) + flag.wait() def test_find_value(l, quiet=0): - from whrandom import randrange - from sha import sha - from hash import newID - import time, threading, sys - - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = newID() - value = newID() - if not quiet: - print "inserting value..." - sys.stdout.flush() - a.storeValueForKey(key, value) - time.sleep(3) - print "finding..." - sys.stdout.flush() - - class cb: - def __init__(self, flag, value=value): - self.flag = flag - self.val = value - self.found = 0 - def callback(self, values): - try: - if(len(values) == 0): - if not self.found: - print "find NOT FOUND" - else: - print "find FOUND" - sys.stdout.flush() - - else: - if self.val in values: - self.found = 1 - finally: - self.flag.set() - - b.valueForKey(key, cb(fa).callback) - fa.wait() - c.valueForKey(key, cb(fb).callback) - fb.wait() - d.valueForKey(key, cb(fc).callback) - fc.wait() + from whrandom import randrange + from sha import sha + from hash import newID + import time, threading, sys + + fa = threading.Event() + fb = threading.Event() + fc = threading.Event() + + n = len(l) + a = l[randrange(0,n)] + b = l[randrange(0,n)] + c = l[randrange(0,n)] + d = l[randrange(0,n)] + + key = newID() + value = newID() + if not quiet: + print "inserting value..." + sys.stdout.flush() + a.storeValueForKey(key, value) + time.sleep(3) + if not quiet: + print "finding..." + sys.stdout.flush() + + class cb: + def __init__(self, flag, value=value): + self.flag = flag + self.val = value + self.found = 0 + def callback(self, values): + try: + if(len(values) == 0): + if not self.found: + print "find NOT FOUND" + else: + print "find FOUND" + sys.stdout.flush() + else: + if self.val in values: + self.found = 1 + finally: + self.flag.set() + + b.valueForKey(key, cb(fa).callback) + fa.wait() + c.valueForKey(key, cb(fb).callback) + fb.wait() + d.valueForKey(key, cb(fc).callback) + fc.wait() def test_one(host, port, db='/tmp/test'): - import thread - k = Khashmir(host, port, db) - thread.start_new_thread(k.app.run, ()) - return k + import thread + k = Khashmir(host, port, db) + thread.start_new_thread(k.app.run, ()) + return k if __name__ == "__main__": import sys n = 8 if len(sys.argv) > 1: - n = int(sys.argv[1]) + n = int(sys.argv[1]) l = test_build_net(peers=n) time.sleep(3) print "finding nodes..." for i in range(10): - test_find_nodes(l) + test_find_nodes(l) print "inserting and fetching values..." for i in range(10): - test_find_value(l) + test_find_value(l) diff --git a/ktable.py b/ktable.py index 2f1cf31..2fd401e 100644 --- a/ktable.py +++ b/ktable.py @@ -17,9 +17,9 @@ class KTable: self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] self.insertNode(node) - def _bucketIndexForInt(self, int): + def _bucketIndexForInt(self, num): """the index of the bucket that should hold int""" - return bisect_left(self.buckets, int) + return bisect_left(self.buckets, num) def findNodes(self, id): """k nodes in our own local table closest to the ID. @@ -28,20 +28,20 @@ class KTable: to not send messages to yourself if it matters.""" if isinstance(id, str): - int = hash.intify(id) + num = hash.intify(id) elif isinstance(id, Node): - int = id.int + num = id.num elif isinstance(id, int) or isinstance(id, long): - int = id + num = id else: raise TypeError, "findNodes requires an int, string, or Node" nodes = [] - i = self._bucketIndexForInt(int) + i = self._bucketIndexForInt(num) # if this node is already in our table then return it try: - index = self.buckets[i].l.index(int) + index = self.buckets[i].l.index(num) except ValueError: pass else: @@ -60,7 +60,7 @@ class KTable: min = min - 1 max = max + 1 - nodes.sort(lambda a, b, int=int: cmp(int ^ a.int, int ^ b.int)) + nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) return nodes[:K] def _splitBucket(self, a): @@ -70,16 +70,16 @@ class KTable: a.max = a.max - diff # transfer nodes to new bucket for anode in a.l[:]: - if anode.int >= a.max: + if anode.num >= a.max: a.l.remove(anode) b.l.append(anode) def replaceStaleNode(self, stale, new): """this is used by clients to replace a node returned by insertNode after it fails to respond to a Pong message""" - i = self._bucketIndexForInt(stale.int) + i = self._bucketIndexForInt(stale.num) try: - it = self.buckets[i].l.index(stale.int) + it = self.buckets[i].l.index(stale.num) except ValueError: return @@ -96,10 +96,10 @@ class KTable: assert node.id != " "*20 if node.id == self.node.id: return # get the bucket for this node - i = self. _bucketIndexForInt(node.int) + i = self. _bucketIndexForInt(node.num) # check to see if node is in the bucket already try: - it = self.buckets[i].l.index(node.int) + it = self.buckets[i].l.index(node.num) except ValueError: # no pass @@ -143,7 +143,7 @@ class KTable: """call this any time you get a message from a node it will update it in the table if it's there """ try: - n = self.findNodes(node.int)[0] + n = self.findNodes(node.num)[0] except IndexError: return None else: @@ -154,7 +154,7 @@ class KTable: def nodeFailed(self, node): """ call this when a node fails to respond to a message, to invalidate that node """ try: - n = self.findNodes(node.int)[0] + n = self.findNodes(node.num)[0] except IndexError: return None else: @@ -172,8 +172,8 @@ class KBucket: def touch(self): self.lastAccessed = time.time() - def getNodeWithInt(self, int): - if int in self.l: return int + def getNodeWithInt(self, num): + if num in self.l: return num else: raise ValueError def __repr__(self): @@ -183,22 +183,22 @@ class KBucket: # necessary for bisecting list of buckets with a hash expressed as an integer or a distance # compares integer or node object with the bucket's range def __lt__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.max <= a def __le__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.min < a def __gt__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.min > a def __ge__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.max >= a def __eq__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.min <= a and self.max > a def __ne__(self, a): - if isinstance(a, Node): a = a.int + if isinstance(a, Node): a = a.num return self.min >= a or self.max < a diff --git a/node.py b/node.py index 078ef3c..22a4dd9 100644 --- a/node.py +++ b/node.py @@ -4,77 +4,76 @@ from types import * from xmlrpclib import Binary class Node: - """encapsulate contact info""" - - def __init__(self): - self.fails = 0 - self.lastSeen = 0 - self.id = self.host = self.port = '' + """encapsulate contact info""" + def __init__(self): + self.fails = 0 + self.lastSeen = 0 + self.id = self.host = self.port = '' - def init(self, id, host, port): - self.id = id - self.int = hash.intify(id) - self.host = host - self.port = port - self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host} - return self + def init(self, id, host, port): + self.id = id + self.num = hash.intify(id) + self.host = host + self.port = port + self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host} + return self - def initWithDict(self, dict): - self._senderDict = dict - self.id = dict['id'].decode('base64') - self.int = hash.intify(self.id) - self.port = dict['port'] - self.host = dict['host'] - return self + def initWithDict(self, dict): + self._senderDict = dict + self.id = dict['id'].decode('base64') + self.num = hash.intify(self.id) + self.port = dict['port'] + self.host = dict['host'] + return self - def updateLastSeen(self): - self.lastSeen = time.time() - self.fails = 0 + def updateLastSeen(self): + self.lastSeen = time.time() + self.fails = 0 - def msgFailed(self): - self.fails = self.fails + 1 - return self.fails + def msgFailed(self): + self.fails = self.fails + 1 + return self.fails - def senderDict(self): - return self._senderDict + def senderDict(self): + return self._senderDict - def __repr__(self): - return `(self.id, self.host, self.port)` + def __repr__(self): + return `(self.id, self.host, self.port)` ## these comparators let us bisect/index a list full of nodes with either a node or an int/long - def __lt__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int < a - def __le__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int <= a - def __gt__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int > a - def __ge__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int >= a - def __eq__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int == a - def __ne__(self, a): - if type(a) == InstanceType: - a = a.int - return self.int != a + def __lt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num < a + def __le__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num <= a + def __gt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num > a + def __ge__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num >= a + def __eq__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num == a + def __ne__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num != a import unittest class TestNode(unittest.TestCase): - def setUp(self): - self.node = Node().init(hash.newID(), 'localhost', 2002) - def testUpdateLastSeen(self): - t = self.node.lastSeen - self.node.updateLastSeen() - assert t < self.node.lastSeen + def setUp(self): + self.node = Node().init(hash.newID(), 'localhost', 2002) + def testUpdateLastSeen(self): + t = self.node.lastSeen + self.node.updateLastSeen() + assert t < self.node.lastSeen \ No newline at end of file -- 2.39.5