fixed borked tab/space problems, damn ProjectBuilder doesn't come with reasonable...
authorburris <burris>
Sun, 1 Dec 2002 21:08:01 +0000 (21:08 +0000)
committerburris <burris>
Sun, 1 Dec 2002 21:08:01 +0000 (21:08 +0000)
stopped using "int" for ivars since that conflicts with a builtin, now
use node.num for getting the integer representation of a node's ID

actions.py
const.py
hash.py
khashmir.py
ktable.py
node.py

index 1c58ee0785fcc0cd90b165307893894abcce0018..01fb3c76faebb6635d90a4bdd82ae96f5638824f 100644 (file)
@@ -1,6 +1,4 @@
 from time import time
-from bencode import bdecode as loads
-from bencode import bencode as dumps
 
 from const import reactor
 import const
@@ -14,7 +12,7 @@ class ActionBase:
     def __init__(self, table, target, callback):
        self.table = table
        self.target = target
-       self.int = intify(target)
+       self.num = intify(target)
        self.found = {}
        self.queried = {}
        self.answered = {}
@@ -22,9 +20,9 @@ class ActionBase:
        self.outstanding = 0
        self.finished = 0
        
-       def sort(a, b, int=self.int):
+       def sort(a, b, num=self.num):
            """ this function is for sorting nodes relative to the ID we are looking for """
-           x, y = int ^ a.int, int ^ b.int
+           x, y = num ^ a.num, num ^ b.num
            if x > y:
                return 1
            elif x < y:
index f5feed44943b42b73dc3da668cf28d8d7048a6fe..4926f008690d5a3d8e4cb953f67e765e883ed0d5 100644 (file)
--- a/const.py
+++ b/const.py
@@ -7,6 +7,12 @@ main.installReactor(reactor)
 # magic id to use before we know a peer's id
 NULL_ID =  20 * '\0'
 
+# Kademlia "K" constant
+K = 8
+
+# SHA1 is 160 bits long
+HASH_LENGTH = 160
+
 
 ### SEARCHING/STORING
 # concurrent xmlrpc calls per find node/value request!
diff --git a/hash.py b/hash.py
index bc6868668b00c48c27f4616179b703f2428ad4c0..4de069ff6818e1c81b31f32b13b468f7adeeb63e 100644 (file)
--- a/hash.py
+++ b/hash.py
@@ -8,9 +8,9 @@ def intify(hstr):
     assert len(hstr) == 20
     return long(hstr.encode('hex'), 16)
 
-def stringify(int):
+def stringify(num):
     """long int -> 20-character string"""
-    str = hex(int)[2:]
+    str = hex(num)[2:]
     if str[-1] == 'L':
         str = str[:-1]
     if len(str) % 2 != 0:
index 36ff79f8abf02894802285f8c980af4464f8c373..b0b414d7cd4e0193be965a956b93559210d52448 100644 (file)
@@ -27,407 +27,400 @@ KhashmirDBExcept = "KhashmirDBExcept"
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
-    def __init__(self, host, port, db='khashmir.db'):
-       self.node = Node().init(newID(), host, port)
-       self.table = KTable(self.node)
-       self.app = Application("xmlrpc")
-       self.app.listenTCP(port, server.Site(self))
-       self.findDB(db)
-       self.last = time.time()
-       KeyExpirer(store=self.store)
-
-    def findDB(self, db):
-       import os
-       try:
-           os.stat(db)
-       except OSError:
-           self.createNewDB(db)
-       else:
-           self.loadDB(db)
-           
-    def loadDB(self, db):
-       try:
-           self.store = sqlite.connect(db=db)
-            self.store.autocommit = 1
-       except:
-           import traceback
-           raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+       __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+       def __init__(self, host, port, db='khashmir.db'):
+               self.setup(host, port, db)
+       def setup(self, host, port, db='khashmir.db'):
+               self.node = Node().init(newID(), host, port)
+               self.table = KTable(self.node)
+               self.app = Application("xmlrpc")
+               self.app.listenTCP(port, server.Site(self))
+               self.findDB(db)
+               self.last = time.time()
+               KeyExpirer(store=self.store)
+
+       def findDB(self, db):
+               import os
+               try:
+                       os.stat(db)
+               except OSError:
+                       self.createNewDB(db)
+               else:
+                       self.loadDB(db)
            
-    def createNewDB(self, db):
-       self.store = sqlite.connect(db=db)
-        self.store.autocommit = 1
-       s = """
-           create table kv (key text, value text, time timestamp, primary key (key, value));
-           create index kv_key on kv(key);
-           create index kv_timestamp on kv(time);
+       def loadDB(self, db):
+               try:
+                       self.store = sqlite.connect(db=db)
+                       self.store.autocommit = 1
+               except:
+                       import traceback
+                       raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
            
-           create table nodes (id text primary key, host text, port number);
-           """
-       c = self.store.cursor()
-       c.execute(s)
-               
-    def render(self, request):
-       """
-           Override the built in render so we can have access to the request object!
-           note, crequest is probably only valid on the initial call (not after deferred!)
-       """
-       self.crequest = request
-       return xmlrpc.XMLRPC.render(self, request)
-
+       def createNewDB(self, db):
+               self.store = sqlite.connect(db=db)
+               self.store.autocommit = 1
+               s = """
+                       create table kv (key text, value text, time timestamp, primary key (key, value));
+                       create index kv_key on kv(key);
+                       create index kv_timestamp on kv(time);
+                       
+                       create table nodes (id text primary key, host text, port number);
+                       """
+               c = self.store.cursor()
+               c.execute(s)
+
+       def render(self, request):
+               """
+                       Override the built in render so we can have access to the request object!
+                       note, crequest is probably only valid on the initial call (not after deferred!)
+               """
+               self.crequest = request
+               return xmlrpc.XMLRPC.render(self, request)
+
+
+       #######
+       #######  LOCAL INTERFACE    - use these methods!
+       def addContact(self, host, port):
+               """
+                       ping this node and add the contact info to the table on pong!
+               """
+               n =Node().init(const.NULL_ID, host, port)  # note, we 
+               self.sendPing(n)
+
+       ## this call is async!
+       def findNode(self, id, callback, errback=None):
+               """ returns the contact info for node, or the k closest nodes, from the global table """
+               # get K nodes out of local table/cache, or the node we want
+               nodes = self.table.findNodes(id)
+               d = Deferred()
+               if errback:
+                       d.addCallbacks(callback, errback)
+               else:
+                       d.addCallback(callback)
+               if len(nodes) == 1 and nodes[0].id == id :
+                       d.callback(nodes)
+               else:
+                       # create our search state
+                       state = FindNode(self, id, d.callback)
+                       reactor.callFromThread(state.goWithNodes, nodes)
        
-    #######
-    #######  LOCAL INTERFACE    - use these methods!
-    def addContact(self, host, port):
-       """
-        ping this node and add the contact info to the table on pong!
-       """
-       n =Node().init(const.NULL_ID, host, port)  # note, we 
-       self.sendPing(n)
-
-
-    ## this call is async!
-    def findNode(self, id, callback, errback=None):
-       """ returns the contact info for node, or the k closest nodes, from the global table """
-       # get K nodes out of local table/cache, or the node we want
-       nodes = self.table.findNodes(id)
-       d = Deferred()
-       if errback:
-           d.addCallbacks(callback, errback)
-       else:
-           d.addCallback(callback)
-       if len(nodes) == 1 and nodes[0].id == id :
-           d.callback(nodes)
-       else:
-           # create our search state
-           state = FindNode(self, id, d.callback)
-           reactor.callFromThread(state.goWithNodes, nodes)
-    
-    
-    ## also async
-    def valueForKey(self, key, callback):
-       """ returns the values found for key in global table
-           callback will be called with a list of values for each peer that returns unique values
-           final callback will be an empty list - probably should change to 'more coming' arg
-       """
-       nodes = self.table.findNodes(key)
-
-       # get locals
-       l = self.retrieveValues(key)
-       if len(l) > 0:
-           reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
-
-       # create our search state
-       state = GetValue(self, key, callback)
-       reactor.callFromThread(state.goWithNodes, nodes, l)
        
-
-
-    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, callback=None):
-       """ stores the value for key in the global table, returns immediately, no status 
-           in this implementation, peers respond but don't indicate status to storing values
-           a key can have many values
-       """
-       def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
-           if not callback:
-               # default callback
-               def _storedValueHandler(sender):
-                   pass
-               response=_storedValueHandler
+       ## also async
+       def valueForKey(self, key, callback):
+               """ returns the values found for key in global table
+                       callback will be called with a list of values for each peer that returns unique values
+                       final callback will be an empty list - probably should change to 'more coming' arg
+               """
+               nodes = self.table.findNodes(key)
+               
+               # get locals
+               l = self.retrieveValues(key)
+               if len(l) > 0:
+                       reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
+               
+               # create our search state
+               state = GetValue(self, key, callback)
+               reactor.callFromThread(state.goWithNodes, nodes, l)
+
+       ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+       def storeValueForKey(self, key, value, callback=None):
+               """ stores the value for key in the global table, returns immediately, no status 
+                       in this implementation, peers respond but don't indicate status to storing values
+                       a key can have many values
+               """
+               def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+                       if not response:
+                               # default callback
+                               def _storedValueHandler(sender):
+                                       pass
+                               response=_storedValueHandler
+               
+                       for node in nodes[:const.STORE_REDUNDANCY]:
+                               def cb(t, table = table, node=node, resp=response):
+                                       self.table.insertNode(node)
+                                       response(t)
+                               if node.id != self.node.id:
+                                       def default(err, node=node, table=table):
+                                               table.nodeFailed(node)
+                                       df = node.storeValue(key, value, self.node.senderDict())
+                                       df.addCallbacks(cb, default)
+               # this call is asynch
+               self.findNode(key, _storeValueForKey)
        
-           for node in nodes[:const.STORE_REDUNDANCY]:
-               def cb(t, table = table, node=node, resp=response):
-                   self.table.insertNode(node)
-                   response(t)
-               if node.id != self.node.id:
-                   def default(err, node=node, table=table):
-                       table.nodeFailed(node)
-                   df = node.storeValue(key, value, self.node.senderDict())
-                   df.addCallbacks(cb, lambda x: None)
-       # this call is asynch
-       self.findNode(key, _storeValueForKey)
        
+       def insertNode(self, n, contacted=1):
+               """
+               insert a node in our local table, pinging oldest contact in bucket, if necessary
+               
+               If all you have is a host/port, then use addContact, which calls this method after
+               receiving the PONG from the remote node.  The reason for the seperation is we can't insert
+               a node into the table without it's peer-ID.  That means of course the node passed into this
+               method needs to be a properly formed Node object with a valid ID.
+               """
+               old = self.table.insertNode(n, contacted=contacted)
+               if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+                       # the bucket is full, check to see if old node is still around and if so, replace it
+                       
+                       ## these are the callbacks used when we ping the oldest node in a bucket
+                       def _staleNodeHandler(oldnode=old, newnode = n):
+                               """ called if the pinged node never responds """
+                               self.table.replaceStaleNode(old, newnode)
+                       
+                       def _notStaleNodeHandler(sender, old=old):
+                               """ called when we get a pong from the old node """
+                               args, sender = sender
+                               sender = Node().initWithDict(sender)
+                               if sender.id == old.id:
+                                       self.table.justSeenNode(old)
+                       
+                       df = old.ping(self.node.senderDict())
+                       df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+
+       def sendPing(self, node):
+               """
+                       ping a node
+               """
+               df = node.ping(self.node.senderDict())
+               ## these are the callbacks we use when we issue a PING
+               def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+                       l, sender = args
+                       if id != const.NULL_ID and id != sender['id'].decode('base64'):
+                               # whoah, got response from different peer than we were expecting
+                               pass
+                       else:
+                               sender['host'] = host
+                               sender['port'] = port
+                               n = Node().initWithDict(sender)
+                               table.insertNode(n)
+                               return
+               def _defaultPong(err, node=node, table=self.table):
+                       table.nodeFailed(node)
+               
+               df.addCallbacks(_pongHandler,_defaultPong)
+
+       def findCloseNodes(self, callback=lambda a: None):
+               """
+                       This does a findNode on the ID one away from our own.  
+                       This will allow us to populate our table with nodes on our network closest to our own.
+                       This is called as soon as we start up with an empty table
+               """
+               id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+               self.findNode(id, callback)
+
+       def refreshTable(self):
+               """
+                       
+               """
+               def callback(nodes):
+                       pass
        
-    def insertNode(self, n, contacted=1):
-       """
-       insert a node in our local table, pinging oldest contact in bucket, if necessary
+               for bucket in self.table.buckets:
+                       if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+                               id = newIDInRange(bucket.min, bucket.max)
+                               self.findNode(id, callback)
+
+
+       def retrieveValues(self, key):
+               s = "select value from kv where key = '%s';" % key.encode('base64')
+               c = self.store.cursor()
+               c.execute(s)
+               t = c.fetchone()
+               l = []
+               while t:
+                       l.append(t['value'])
+                       t = c.fetchone()
+               return l
        
-       If all you have is a host/port, then use addContact, which calls this method after
-       receiving the PONG from the remote node.  The reason for the seperation is we can't insert
-       a node into the table without it's peer-ID.  That means of course the node passed into this
-       method needs to be a properly formed Node object with a valid ID.
-       """
-       old = self.table.insertNode(n, contacted=contacted)
-       if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
-           # the bucket is full, check to see if old node is still around and if so, replace it
-           
-           ## these are the callbacks used when we ping the oldest node in a bucket
-           def _staleNodeHandler(oldnode=old, newnode = n):
-               """ called if the pinged node never responds """
-               self.table.replaceStaleNode(old, newnode)
+       #####
+       ##### INCOMING MESSAGE HANDLERS
        
-           def _notStaleNodeHandler(sender, old=old):
-               """ called when we get a pong from the old node """
-               sender = Node().initWithDict(sender)
-               if sender.id == old.id:
-                   self.table.justSeenNode(old)
-
-           df = old.ping(self.node.senderDict())
-           df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
-
-
-    def sendPing(self, node):
-       """
-           ping a node
-       """
-       df = node.ping(self.node.senderDict())
-       ## these are the callbacks we use when we issue a PING
-       def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
-           l, sender = args
-           if id != const.NULL_ID and id != sender['id'].decode('base64'):
-               # whoah, got response from different peer than we were expecting
-               pass
-           else:
-               sender['host'] = host
-               sender['port'] = port
+       def xmlrpc_ping(self, sender):
+               """
+                       takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+                       returns sender dict
+               """
+               ip = self.crequest.getClientIP()
+               sender['host'] = ip
                n = Node().initWithDict(sender)
-               table.insertNode(n)
-           return
-       def _defaultPong(err, node=node, table=self.table):
-               table.nodeFailed(node)
-
-       df.addCallbacks(_pongHandler,_defaultPong)
-
-
-    def findCloseNodes(self, callback=lambda a: None):
-       """
-           This does a findNode on the ID one away from our own.  
-           This will allow us to populate our table with nodes on our network closest to our own.
-           This is called as soon as we start up with an empty table
-       """
-       id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-       self.findNode(id, callback)
-
-    def refreshTable(self):
-       """
-           
-       """
-       def callback(nodes):
-           pass
-
-       for bucket in self.table.buckets:
-           if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
-               id = newIDInRange(bucket.min, bucket.max)
-               self.findNode(id, callback)
-       
-    def retrieveValues(self, key):
-       s = "select value from kv where key = '%s';" % key.encode('base64')
-       c = self.store.cursor()
-       c.execute(s)
-       t = c.fetchone()
-       l = []
-       while t:
-           l.append(t['value'])
-           t = c.fetchone()
-       return l
-       
-    #####
-    ##### INCOMING MESSAGE HANDLERS
-    
-    def xmlrpc_ping(self, sender):
-       """
-           takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-           returns sender dict
-       """
-       ip = self.crequest.getClientIP()
-       sender['host'] = ip
-       n = Node().initWithDict(sender)
-       self.insertNode(n, contacted=0)
-       return (), self.node.senderDict()
+               self.insertNode(n, contacted=0)
+               return (), self.node.senderDict()
                
-    def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target.decode('base64'))
-       nodes = map(lambda node: node.senderDict(), nodes)
-       ip = self.crequest.getClientIP()
-       sender['host'] = ip
-       n = Node().initWithDict(sender)
-       self.insertNode(n, contacted=0)
-       return nodes, self.node.senderDict()
+       def xmlrpc_find_node(self, target, sender):
+               nodes = self.table.findNodes(target.decode('base64'))
+               nodes = map(lambda node: node.senderDict(), nodes)
+               ip = self.crequest.getClientIP()
+               sender['host'] = ip
+               n = Node().initWithDict(sender)
+               self.insertNode(n, contacted=0)
+               return nodes, self.node.senderDict()
            
-    def xmlrpc_store_value(self, key, value, sender):
-       t = "%0.6f" % time.time()
-       s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
-       c = self.store.cursor()
-       try:
-           c.execute(s)
-       except pysqlite_exceptions.IntegrityError, reason:
-           # update last insert time
-           s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
-           c.execute(s)
-       ip = self.crequest.getClientIP()
-       sender['host'] = ip
-       n = Node().initWithDict(sender)
-       self.insertNode(n, contacted=0)
-       return (), self.node.senderDict()
+       def xmlrpc_store_value(self, key, value, sender):
+               t = "%0.6f" % time.time()
+               s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
+               c = self.store.cursor()
+               try:
+                       c.execute(s)
+               except pysqlite_exceptions.IntegrityError, reason:
+                       # update last insert time
+                       s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
+                       c.execute(s)
+               ip = self.crequest.getClientIP()
+               sender['host'] = ip
+               n = Node().initWithDict(sender)
+               self.insertNode(n, contacted=0)
+               return (), self.node.senderDict()
        
-    def xmlrpc_find_value(self, key, sender):
-       ip = self.crequest.getClientIP()
-       key = key.decode('base64')
-       sender['host'] = ip
-       n = Node().initWithDict(sender)
-       self.insertNode(n, contacted=0)
-
-       l = self.retrieveValues(key)
-       if len(l) > 0:
-           return {'values' : l}, self.node.senderDict()
-       else:
-           nodes = self.table.findNodes(key)
-           nodes = map(lambda node: node.senderDict(), nodes)
-           return {'nodes' : nodes}, self.node.senderDict()
-
-
-
-
-
+       def xmlrpc_find_value(self, key, sender):
+               ip = self.crequest.getClientIP()
+               key = key.decode('base64')
+               sender['host'] = ip
+               n = Node().initWithDict(sender)
+               self.insertNode(n, contacted=0)
+       
+               l = self.retrieveValues(key)
+               if len(l) > 0:
+                       return {'values' : l}, self.node.senderDict()
+               else:
+                       nodes = self.table.findNodes(key)
+                       nodes = map(lambda node: node.senderDict(), nodes)
+                       return {'nodes' : nodes}, self.node.senderDict()
 #------ testing
 
-def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
-    from whrandom import randrange
-    import threading
-    import thread
-    port = 2001
-    l = []
-        
-    if not quiet:
-       print "Building %s peer table." % peers
+def test_build_net(quiet=0, peers=24, host='localhost',  pause=0):
+       from whrandom import randrange
+       import threading
+       import thread
+       port = 2001
+       l = []
+               
+       if not quiet:
+               print "Building %s peer table." % peers
        
-    for i in xrange(peers):
-       a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
-       l.append(a)
-    
-
-    thread.start_new_thread(l[0].app.run, ())
-    time.sleep(1)
-    for peer in l[1:]:
-       peer.app.run()
-    time.sleep(10)
-
-    print "adding contacts...."
-
-    for peer in l[1:]:
-       n = l[randrange(0, len(l))].node
-       peer.addContact(host, n.port)
-       n = l[randrange(0, len(l))].node
-       peer.addContact(host, n.port)
-       n = l[randrange(0, len(l))].node
-       peer.addContact(host, n.port)
+       for i in xrange(peers):
+               a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
+               l.append(a)
+       
+       
+       thread.start_new_thread(l[0].app.run, ())
+       time.sleep(1)
+       for peer in l[1:]:
+               peer.app.run()
+       time.sleep(10)
+       
+       print "adding contacts...."
+       
+       for peer in l[1:]:
+               n = l[randrange(0, len(l))].node
+               peer.addContact(host, n.port)
+               n = l[randrange(0, len(l))].node
+               peer.addContact(host, n.port)
+               n = l[randrange(0, len(l))].node
+               peer.addContact(host, n.port)
        if pause:
-           time.sleep(.33)
+               time.sleep(.33)
        
-    time.sleep(10)
-    print "finding close nodes...."
-
-    for peer in l:
-       flag = threading.Event()
-       def cb(nodes, f=flag):
-           f.set()
-       peer.findCloseNodes(cb)
-       flag.wait()
-
-#    for peer in l:
-#      peer.refreshTable()
-    return l
+       time.sleep(10)
+       print "finding close nodes...."
+       
+       for peer in l:
+               flag = threading.Event()
+               def cb(nodes, f=flag):
+                       f.set()
+               peer.findCloseNodes(cb)
+               flag.wait()
+       
+       #    for peer in l:
+       #       peer.refreshTable()
+       return l
         
 def test_find_nodes(l, quiet=0):
-    import threading, sys
-    from whrandom import randrange
-    flag = threading.Event()
-    
-    n = len(l)
-    
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    
-    def callback(nodes, flag=flag, id = b.node.id):
-       if (len(nodes) >0) and (nodes[0].id == id):
-           print "test_find_nodes      PASSED"
-       else:
-           print "test_find_nodes      FAILED"
-       flag.set()
-    a.findNode(b.node.id, callback)
-    flag.wait()
+       import threading, sys
+       from whrandom import randrange
+       flag = threading.Event()
+       
+       n = len(l)
+       
+       a = l[randrange(0,n)]
+       b = l[randrange(0,n)]
+       
+       def callback(nodes, flag=flag, id = b.node.id):
+               if (len(nodes) >0) and (nodes[0].id == id):
+                       print "test_find_nodes  PASSED"
+               else:
+                       print "test_find_nodes  FAILED"
+               flag.set()
+       a.findNode(b.node.id, callback)
+       flag.wait()
     
 def test_find_value(l, quiet=0):
-    from whrandom import randrange
-    from sha import sha
-    from hash import newID
-    import time, threading, sys
-    
-    fa = threading.Event()
-    fb = threading.Event()
-    fc = threading.Event()
-    
-    n = len(l)
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    c = l[randrange(0,n)]
-    d = l[randrange(0,n)]
-
-    key = newID()
-    value = newID()
-    if not quiet:
-       print "inserting value..."
-       sys.stdout.flush()
-    a.storeValueForKey(key, value)
-    time.sleep(3)
-    print "finding..."
-    sys.stdout.flush()
-    
-    class cb:
-       def __init__(self, flag, value=value):
-           self.flag = flag
-           self.val = value
-           self.found = 0
-       def callback(self, values):
-           try:
-               if(len(values) == 0):
-                   if not self.found:
-                       print "find                NOT FOUND"
-                   else:
-                       print "find                FOUND"
-                   sys.stdout.flush()
-
-               else:
-                   if self.val in values:
-                       self.found = 1
-           finally:
-               self.flag.set()
-
-    b.valueForKey(key, cb(fa).callback)
-    fa.wait()
-    c.valueForKey(key, cb(fb).callback)
-    fb.wait()
-    d.valueForKey(key, cb(fc).callback)    
-    fc.wait()
+       from whrandom import randrange
+       from sha import sha
+       from hash import newID
+       import time, threading, sys
+       
+       fa = threading.Event()
+       fb = threading.Event()
+       fc = threading.Event()
+       
+       n = len(l)
+       a = l[randrange(0,n)]
+       b = l[randrange(0,n)]
+       c = l[randrange(0,n)]
+       d = l[randrange(0,n)]
+       
+       key = newID()
+       value = newID()
+       if not quiet:
+               print "inserting value..."
+               sys.stdout.flush()
+       a.storeValueForKey(key, value)
+       time.sleep(3)
+       if not quiet:
+               print "finding..."
+               sys.stdout.flush()
+       
+       class cb:
+               def __init__(self, flag, value=value):
+                       self.flag = flag
+                       self.val = value
+                       self.found = 0
+               def callback(self, values):
+                       try:
+                               if(len(values) == 0):
+                                       if not self.found:
+                                               print "find                NOT FOUND"
+                                       else:
+                                               print "find                FOUND"
+                                       sys.stdout.flush()
+                               else:
+                                       if self.val in values:
+                                               self.found = 1
+                       finally:
+                               self.flag.set()
+       
+       b.valueForKey(key, cb(fa).callback)
+       fa.wait()
+       c.valueForKey(key, cb(fb).callback)
+       fb.wait()
+       d.valueForKey(key, cb(fc).callback)    
+       fc.wait()
     
 def test_one(host, port, db='/tmp/test'):
-    import thread
-    k = Khashmir(host, port, db)
-    thread.start_new_thread(k.app.run, ())
-    return k
+       import thread
+       k = Khashmir(host, port, db)
+       thread.start_new_thread(k.app.run, ())
+       return k
     
 if __name__ == "__main__":
     import sys
     n = 8
     if len(sys.argv) > 1:
-       n = int(sys.argv[1])
+               n = int(sys.argv[1])
     l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):
-       test_find_nodes(l)
+               test_find_nodes(l)
     print "inserting and fetching values..."
     for i in range(10):
-       test_find_value(l)
+               test_find_value(l)
index 2f1cf314598ab50a104946d97e60df0aa147ebf4..2fd401e1fa0b75148e26ed71942757749d0438f0 100644 (file)
--- a/ktable.py
+++ b/ktable.py
@@ -17,9 +17,9 @@ class KTable:
         self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
         self.insertNode(node)
         
-    def _bucketIndexForInt(self, int):
+    def _bucketIndexForInt(self, num):
         """the index of the bucket that should hold int"""
-        return bisect_left(self.buckets, int)
+        return bisect_left(self.buckets, num)
     
     def findNodes(self, id):
         """k nodes in our own local table closest to the ID.
@@ -28,20 +28,20 @@ class KTable:
         to not send messages to yourself if it matters."""
         
         if isinstance(id, str):
-            int = hash.intify(id)
+            num = hash.intify(id)
         elif isinstance(id, Node):
-            int = id.int
+            num = id.num
         elif isinstance(id, int) or isinstance(id, long):
-            int = id
+            num = id
         else:
             raise TypeError, "findNodes requires an int, string, or Node"
             
         nodes = []
-        i = self._bucketIndexForInt(int)
+        i = self._bucketIndexForInt(num)
         
         # if this node is already in our table then return it
         try:
-            index = self.buckets[i].l.index(int)
+            index = self.buckets[i].l.index(num)
         except ValueError:
             pass
         else:
@@ -60,7 +60,7 @@ class KTable:
                 min = min - 1
                 max = max + 1
 
-        nodes.sort(lambda a, b, int=int: cmp(int ^ a.int, int ^ b.int))
+        nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
         return nodes[:K]
         
     def _splitBucket(self, a):
@@ -70,16 +70,16 @@ class KTable:
         a.max = a.max - diff
         # transfer nodes to new bucket
         for anode in a.l[:]:
-            if anode.int >= a.max:
+            if anode.num >= a.max:
                 a.l.remove(anode)
                 b.l.append(anode)
 
     def replaceStaleNode(self, stale, new):
         """this is used by clients to replace a node returned by insertNode after
         it fails to respond to a Pong message"""
-        i = self._bucketIndexForInt(stale.int)
+        i = self._bucketIndexForInt(stale.num)
         try:
-            it = self.buckets[i].l.index(stale.int)
+            it = self.buckets[i].l.index(stale.num)
         except ValueError:
             return
 
@@ -96,10 +96,10 @@ class KTable:
         assert node.id != " "*20
         if node.id == self.node.id: return
         # get the bucket for this node
-        i = self. _bucketIndexForInt(node.int)
+        i = self. _bucketIndexForInt(node.num)
         # check to see if node is in the bucket already
         try:
-            it = self.buckets[i].l.index(node.int)
+            it = self.buckets[i].l.index(node.num)
         except ValueError:
             # no
             pass
@@ -143,7 +143,7 @@ class KTable:
         """call this any time you get a message from a node
         it will update it in the table if it's there """
         try:
-            n = self.findNodes(node.int)[0]
+            n = self.findNodes(node.num)[0]
         except IndexError:
             return None
         else:
@@ -154,7 +154,7 @@ class KTable:
     def nodeFailed(self, node):
         """ call this when a node fails to respond to a message, to invalidate that node """
         try:
-            n = self.findNodes(node.int)[0]
+            n = self.findNodes(node.num)[0]
         except IndexError:
             return None
         else:
@@ -172,8 +172,8 @@ class KBucket:
     def touch(self):
         self.lastAccessed = time.time()
 
-    def getNodeWithInt(self, int):
-        if int in self.l: return int
+    def getNodeWithInt(self, num):
+        if num in self.l: return num
         else: raise ValueError
         
     def __repr__(self):
@@ -183,22 +183,22 @@ class KBucket:
     # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
     # compares integer or node object with the bucket's range
     def __lt__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.max <= a
     def __le__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.min < a
     def __gt__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.min > a
     def __ge__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.max >= a
     def __eq__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.min <= a and self.max > a
     def __ne__(self, a):
-        if isinstance(a, Node): a = a.int
+        if isinstance(a, Node): a = a.num
         return self.min >= a or self.max < a
 
 
diff --git a/node.py b/node.py
index 078ef3cc992c8b1bcfa44b7e8a52c9032b333aa5..22a4dd9b12b69acfe936021bcfb9b459e309f420 100644 (file)
--- a/node.py
+++ b/node.py
@@ -4,77 +4,76 @@ from types import *
 from xmlrpclib import Binary
 
 class Node:
-    """encapsulate contact info"""
-    
-    def __init__(self):
-       self.fails = 0
-       self.lastSeen = 0
-       self.id = self.host = self.port = ''
+       """encapsulate contact info"""
+       def __init__(self):
+               self.fails = 0
+               self.lastSeen = 0
+               self.id = self.host = self.port = ''
        
-    def init(self, id, host, port):
-       self.id = id
-       self.int = hash.intify(id)
-       self.host = host
-       self.port = port
-       self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host}
-       return self
+       def init(self, id, host, port):
+               self.id = id
+               self.num = hash.intify(id)
+               self.host = host
+               self.port = port
+               self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host}
+               return self
        
-    def initWithDict(self, dict):
-       self._senderDict = dict
-       self.id = dict['id'].decode('base64')
-       self.int = hash.intify(self.id)
-       self.port = dict['port']
-       self.host = dict['host']
-       return self
+       def initWithDict(self, dict):
+               self._senderDict = dict
+               self.id = dict['id'].decode('base64')
+               self.num = hash.intify(self.id)
+               self.port = dict['port']
+               self.host = dict['host']
+               return self
        
-    def updateLastSeen(self):
-       self.lastSeen = time.time()
-       self.fails = 0
+       def updateLastSeen(self):
+               self.lastSeen = time.time()
+               self.fails = 0
        
-    def msgFailed(self):
-       self.fails = self.fails + 1
-       return self.fails
+       def msgFailed(self):
+               self.fails = self.fails + 1
+               return self.fails
        
-    def senderDict(self):
-       return self._senderDict
+       def senderDict(self):
+               return self._senderDict
        
-    def __repr__(self):
-       return `(self.id, self.host, self.port)`
+       def __repr__(self):
+               return `(self.id, self.host, self.port)`
        
     ## these comparators let us bisect/index a list full of nodes with either a node or an int/long
-    def __lt__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int < a
-    def __le__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int <= a
-    def __gt__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int > a
-    def __ge__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int >= a
-    def __eq__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int == a
-    def __ne__(self, a):
-       if type(a) == InstanceType:
-           a = a.int
-       return self.int != a
+       def __lt__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num < a
+       def __le__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num <= a
+       def __gt__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num > a
+       def __ge__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num >= a
+       def __eq__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num == a
+       def __ne__(self, a):
+               if type(a) == InstanceType:
+                       a = a.num
+               return self.num != a
 
 
 import unittest
 
 class TestNode(unittest.TestCase):
-    def setUp(self):
-       self.node = Node().init(hash.newID(), 'localhost', 2002)
-    def testUpdateLastSeen(self):
-       t = self.node.lastSeen
-       self.node.updateLastSeen()
-       assert t < self.node.lastSeen
+       def setUp(self):
+               self.node = Node().init(hash.newID(), 'localhost', 2002)
+       def testUpdateLastSeen(self):
+               t = self.node.lastSeen
+               self.node.updateLastSeen()
+               assert t < self.node.lastSeen
        
\ No newline at end of file