ripped out xmlrpc, experimented with xmlrpc but with bencode, finally
authorburris <burris>
Thu, 16 Jan 2003 16:28:42 +0000 (16:28 +0000)
committerburris <burris>
Thu, 16 Jan 2003 16:28:42 +0000 (16:28 +0000)
settled on bencode rpc over Airhook

14 files changed:
actions.py
airhook.py
const.py
hash.py
khashmir.py
knode.py
krpc.py [new file with mode: 0644]
ktable.py
node.py
test.py
test_airhook.py
test_krpc.py [new file with mode: 0644]
util.py
xmlrpcclient.py [deleted file]

index 5fc11b34ff9d57f3bc0bc02cb1b09f6a70d2a691..f367c533f27e44463f8fe8aec52afc9f43d91e56 100644 (file)
@@ -8,183 +8,237 @@ from knode import KNode as Node
 from ktable import KTable, K
 
 class ActionBase:
-       """ base class for some long running asynchronous proccesses like finding nodes or values """
-       def __init__(self, table, target, callback):
-               self.table = table
-               self.target = target
-               self.num = intify(target)
-               self.found = {}
-               self.queried = {}
-               self.answered = {}
-               self.callback = callback
-               self.outstanding = 0
-               self.finished = 0
-       
-               def sort(a, b, num=self.num):
-                       """ this function is for sorting nodes relative to the ID we are looking for """
-                       x, y = num ^ a.num, num ^ b.num
-                       if x > y:
-                               return 1
-                       elif x < y:
-                               return -1
-                       return 0
-               self.sort = sort
-               
-       def goWithNodes(self, t):
-               pass
-       
-       
+    """ base class for some long running asynchronous proccesses like finding nodes or values """
+    def __init__(self, table, target, callback):
+        self.table = table
+        self.target = target
+        self.num = intify(target)
+        self.found = {}
+        self.queried = {}
+        self.answered = {}
+        self.callback = callback
+        self.outstanding = 0
+        self.finished = 0
+    
+        def sort(a, b, num=self.num):
+            """ this function is for sorting nodes relative to the ID we are looking for """
+            x, y = num ^ a.num, num ^ b.num
+            if x > y:
+                return 1
+            elif x < y:
+                return -1
+            return 0
+        self.sort = sort
+        
+    def goWithNodes(self, t):
+        pass
+    
+    
 
 FIND_NODE_TIMEOUT = 15
 
 class FindNode(ActionBase):
-       """ find node action merits it's own class as it is a long running stateful process """
-       def handleGotNodes(self, args):
-               l, sender = args
-               sender = Node().initWithDict(sender)
-               self.table.table.insertNode(sender)
-               if self.finished or self.answered.has_key(sender.id):
-                       # a day late and a dollar short
-                       return
-               self.outstanding = self.outstanding - 1
-               self.answered[sender.id] = 1
-               for node in l:
-                       n = Node().initWithDict(node)
-                       if not self.found.has_key(n.id):
-                               self.found[n.id] = n
-               self.schedule()
-               
-       def schedule(self):
-               """
-                       send messages to new peers, if necessary
-               """
-               if self.finished:
-                       return
-               l = self.found.values()
-               l.sort(self.sort)
-       
-               for node in l[:K]:
-                       if node.id == self.target:
-                               self.finished=1
-                               return self.callback([node])
-                       if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
-                               #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                               df = node.findNode(self.target, self.table.node.senderDict())
-                               df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
-                               self.outstanding = self.outstanding + 1
-                               self.queried[node.id] = 1
-                       if self.outstanding >= const.CONCURRENT_REQS:
-                               break
-               assert(self.outstanding) >=0
-               if self.outstanding == 0:
-                       ## all done!!
-                       self.finished=1
-                       reactor.callFromThread(self.callback, l[:K])
-       
-       def makeMsgFailed(self, node):
-               def defaultGotNodes(err, self=self, node=node):
-                       self.table.table.nodeFailed(node)
-                       self.outstanding = self.outstanding - 1
-                       self.schedule()
-               return defaultGotNodes
-       
-       def goWithNodes(self, nodes):
-               """
-                       this starts the process, our argument is a transaction with t.extras being our list of nodes
-                       it's a transaction since we got called from the dispatcher
-               """
-               for node in nodes:
-                       if node.id == self.table.node.id:
-                               continue
-                       else:
-                               self.found[node.id] = node
-               
-               self.schedule()
-       
+    """ find node action merits it's own class as it is a long running stateful process """
+    def handleGotNodes(self, dict):
+        l = dict["nodes"]
+        sender = dict["sender"]
+        sender = Node().initWithDict(sender)
+        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        self.table.table.insertNode(sender)
+        if self.finished or self.answered.has_key(sender.id):
+            # a day late and a dollar short
+            return
+        self.outstanding = self.outstanding - 1
+        self.answered[sender.id] = 1
+        for node in l:
+            n = Node().initWithDict(node)
+            n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+            if not self.found.has_key(n.id):
+                self.found[n.id] = n
+        self.schedule()
+        
+    def schedule(self):
+        """
+            send messages to new peers, if necessary
+        """
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        for node in l[:K]:
+            if node.id == self.target:
+                self.finished=1
+                return self.callback([node])
+            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+                df = node.findNode(self.target, self.table.node.senderDict())
+                df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= const.CONCURRENT_REQS:
+                break
+        assert(self.outstanding) >=0
+        if self.outstanding == 0:
+            ## all done!!
+            self.finished=1
+            reactor.callFromThread(self.callback, l[:K])
+    
+    def makeMsgFailed(self, node):
+        def defaultGotNodes(err, self=self, node=node):
+            self.table.table.nodeFailed(node)
+            self.outstanding = self.outstanding - 1
+            self.schedule()
+        return defaultGotNodes
+    
+    def goWithNodes(self, nodes):
+        """
+            this starts the process, our argument is a transaction with t.extras being our list of nodes
+            it's a transaction since we got called from the dispatcher
+        """
+        for node in nodes:
+            if node.id == self.table.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        
+        self.schedule()
+    
 
 GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
-    def handleGotNodes(self, args):
-               l, sender = args
-               sender = Node().initWithDict(sender)
-               self.table.table.insertNode(sender)
-               if self.finished or self.answered.has_key(sender.id):
-                       # a day late and a dollar short
-                       return
-               self.outstanding = self.outstanding - 1
-               self.answered[sender.id] = 1
-               # go through nodes
-               # if we have any closer than what we already got, query them
-               if l.has_key('nodes'):
-                       for node in l['nodes']:
-                               n = Node().initWithDict(node)
-                               if not self.found.has_key(n.id):
-                                       self.found[n.id] = n
-               elif l.has_key('values'):
-                       def x(y, z=self.results):
-                               y = y.decode('base64')
-                               if not z.has_key(y):
-                                       z[y] = 1
-                                       return y
-                               else:
-                                       return None
-                       v = filter(None, map(x, l['values']))
-                       if(len(v)):
-                               reactor.callFromThread(self.callback, v)
-               self.schedule()
-               
+    def handleGotNodes(self, dict):
+        sender = dict["sender"]
+        sender = Node().initWithDict(sender)
+        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        self.table.table.insertNode(sender)
+        if self.finished or self.answered.has_key(sender.id):
+            # a day late and a dollar short
+            return
+        self.outstanding = self.outstanding - 1
+        self.answered[sender.id] = 1
+        # go through nodes
+        # if we have any closer than what we already got, query them
+        if dict.has_key('nodes'):
+            for node in dict['nodes']:
+                n = Node().initWithDict(node)
+                n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+                if not self.found.has_key(n.id):
+                    self.found[n.id] = n
+        elif dict.has_key('values'):
+            def x(y, z=self.results):
+                if not z.has_key(y):
+                    z[y] = 1
+                    return y
+                else:
+                    return None
+            v = filter(None, map(x, dict['values']))
+            if(len(v)):
+                reactor.callFromThread(self.callback, v)
+        self.schedule()
+        
     ## get value
     def schedule(self):
-               if self.finished:
-                       return
-               l = self.found.values()
-               l.sort(self.sort)
-               
-               for node in l[:K]:
-                       if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
-                               #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                               df = node.findValue(self.target, self.table.node.senderDict())
-                               df.addCallback(self.handleGotNodes)
-                               df.addErrback(self.makeMsgFailed(node))
-                               self.outstanding = self.outstanding + 1
-                               self.queried[node.id] = 1
-                       if self.outstanding >= const.CONCURRENT_REQS:
-                               break
-               assert(self.outstanding) >=0
-               if self.outstanding == 0:
-                       ## all done, didn't find it!!
-                       self.finished=1
-                       reactor.callFromThread(self.callback,[])
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        
+        for node in l[:K]:
+            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+                df = node.findValue(self.target, self.table.node.senderDict())
+                df.addCallback(self.handleGotNodes)
+                df.addErrback(self.makeMsgFailed(node))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= const.CONCURRENT_REQS:
+                break
+        assert(self.outstanding) >=0
+        if self.outstanding == 0:
+            ## all done, didn't find it!!
+            self.finished=1
+            reactor.callFromThread(self.callback,[])
 
     ## get value
     def goWithNodes(self, nodes, found=None):
-               self.results = {}
-               if found:
-                       for n in found:
-                               self.results[n] = 1
-               for node in nodes:
-                       if node.id == self.table.node.id:
-                               continue
-                       else:
-                               self.found[node.id] = node
-                       
-               self.schedule()
+        self.results = {}
+        if found:
+            for n in found:
+                self.results[n] = 1
+        for node in nodes:
+            if node.id == self.table.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+            
+        self.schedule()
+
 
+class StoreValue(ActionBase):
+    def __init__(self, table, target, value, callback):
+        ActionBase.__init__(self, table, target, callback)
+        self.value = value
+        self.stored = []
+    
+    def storedValue(self, t, node):
+        self.outstanding -= 1
+        self.table.insertNode(node)
+        if self.finished:
+            return
+        self.stored.append(t)
+        if len(self.stored) >= const.STORE_REDUNDANCY:
+            self.finished=1
+            self.callback(self.stored)
+        else:
+            if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+                self.schedule()
+            
+    def storeFailed(self, t, node):
+        self.table.nodeFailed(node)
+        self.outstanding -= 1
+        if self.finished:
+            return
+        self.schedule()
+        
+    def schedule(self):
+        if self.finished:
+            return
+        num = const.CONCURRENT_REQS - self.outstanding
+        if num > const.STORE_REDUNDANCY:
+            num = const.STORE_REDUNDANCY
+        for i in range(num):
+            try:
+                node = self.nodes.pop()
+            except IndexError:
+                if self.outstanding == 0:
+                    self.finished = 1
+                    self.callback(self.stored)
+            else:
+                if not node.id == self.table.node.id:
+                    self.outstanding += 1
+                    df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+                    df.addCallback(self.storedValue, node=node)
+                    df.addErrback(self.storeFailed, node=node)
+                    
+    def goWithNodes(self, nodes):
+        self.nodes = nodes
+        self.nodes.sort(self.sort)
+        self.schedule()
 
 
 class KeyExpirer:
-       def __init__(self, store):
-               self.store = store
-               reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
-       
-       def doExpire(self):
-               self.cut = "%0.6f" % (time() - const.KE_AGE)
-               self._expire()
-       
-       def _expire(self):
-               c = self.store.cursor()
-               s = "delete from kv where time < '%s';" % self.cut
-               c.execute(s)
-               reactor.callLater(const.KE_DELAY, self.doExpire)
-       
\ No newline at end of file
+    def __init__(self, store):
+        self.store = store
+        reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+    
+    def doExpire(self):
+        self.cut = "%0.6f" % (time() - const.KE_AGE)
+        self._expire()
+    
+    def _expire(self):
+        c = self.store.cursor()
+        s = "delete from kv where time < '%s';" % self.cut
+        c.execute(s)
+        reactor.callLater(const.KE_DELAY, self.doExpire)
+    
\ No newline at end of file
index 96009d4a751b2fe42ab4b9feaae36a6d3a671384..3f1f80beb6a0dccb24a618b3dabe019fecfc658e 100644 (file)
@@ -36,6 +36,8 @@ class Airhook(protocol.DatagramProtocol):
         self.connections = {}
         
     def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
         self.connectionForAddr(addr).datagramReceived(datagram)
 
     def connectionForAddr(self, addr):
@@ -49,7 +51,11 @@ class Airhook(protocol.DatagramProtocol):
         else:
             conn = self.connections[addr]
         return conn
-    
+#    def makeConnection(self, transport):
+#        protocol.DatagramProtocol.makeConnection(self, transport)
+#        tup = transport.getHost()
+#        self.addr = (tup[1], tup[2])
+        
 class AirhookPacket:
     def __init__(self, msg):
         self.datagram = msg
@@ -92,7 +98,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.observed = None  # their session id
         self.sessionID = long(rand(0, 2**32))  # our session id
         
-        self.lastTransmit = -1  # time we last sent a packet with messages
+        self.lastTransmit = 0  # time we last sent a packet with messages
         self.lastReceieved = 0 # time we last received a packet with messages
         self.lastTransmitSeq = -1 # last sequence we sent a packet
         self.state = pending
@@ -103,6 +109,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.sendSession = None  # send session/observed fields until obSeq > sendSession
         self.response = 0 # if we know we have a response now (like resending missed packets)
         self.noisy = 0
+        self.scheduled = 0 # a sendNext is scheduled, don't schedule another
         self.resetMessages()
     
     def resetMessages(self):
@@ -146,14 +153,12 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
                     self.state = pending
                     self.resetMessages()
                     self.inSeq = p.seq
-            self.response = 1
         elif self.state == confirmed:
             if p.session != None or p.observed != None :
-                if p.session != self.observed or p.observed != self.sessionID:
+                if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
                     self.state = pending
-                    if seq == 0:
-                        self.resetMessages()
-                        self.inSeq = p.seq
+                    self.resetMessages()
+                    self.inSeq = p.seq
     
         if self.state != pending:      
             msgs = []          
@@ -262,28 +267,31 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne
         self.lastTransmit = time()
         self.transport.write(packet, self.addr)
         
+        self.scheduled = 0
         self.schedule()
         
     def timeToSend(self):
         # any outstanding messages and are we not too far ahead of our counterparty?
-        if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
-            return 1
+        if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
+            return (1, 0)
         # do we explicitly need to send a response?
         elif self.response:
             self.response = 0
-            return 1
+            return (1, 0)
         # have we not sent anything in a while?
         elif time() - self.lastTransmit > 1.0:
-            return 1
-        
+            return (1, 1)
+        elif self.state == pending:
+            return (1, 1)
+            
         # nothing to send
-        return 0
+        return (0, 0)
 
     def schedule(self):
-        if self.timeToSend():
-            reactor.callLater(0, self.sendNext)
-        else:
-            reactor.callLater(1, self.sendNext)
+        tts, t = self.timeToSend()
+        if tts and not self.scheduled:
+            self.scheduled = 1
+            reactor.callLater(t, self.sendNext)
         
     def write(self, data):
         # micropackets can only be 255 bytes or less
index d81ecbd45f021de20eebd6c2c3faca37240a0f94..ee1da1fc0d0d4ec46d3973fb460db9ffd394679c 100644 (file)
--- a/const.py
+++ b/const.py
@@ -8,7 +8,7 @@ main.installReactor(reactor)
 NULL_ID =  20 * '\0'
 
 # Kademlia "K" constant, this should be an even number
-K = 8
+K = 20
 
 # SHA1 is 160 bits long
 HASH_LENGTH = 160
diff --git a/hash.py b/hash.py
index 4de069ff6818e1c81b31f32b13b468f7adeeb63e..2a312461155d21137407223553251f1b59e04162 100644 (file)
--- a/hash.py
+++ b/hash.py
@@ -3,6 +3,8 @@
 from sha import sha
 import whrandom
 
+random = open('/dev/urandom', 'r') # sucks for windoze
+
 def intify(hstr):
     """20 bit hash, big-endian -> long python integer"""
     assert len(hstr) == 20
@@ -26,8 +28,7 @@ def distance(a, b):
 def newID():
     """returns a new pseudorandom globally unique ID string"""
     h = sha()
-    for i in range(20):
-        h.update(chr(whrandom.randrange(0,256)))
+    h.update(random.read(20))
     return h.digest()
 
 def newIDInRange(min, max):
@@ -35,7 +36,7 @@ def newIDInRange(min, max):
     
 def randRange(min, max):
     return min + intify(newID()) % (max - min)
-
+    
 
 ### Test Cases ###
 import unittest
@@ -49,7 +50,7 @@ class NewID(unittest.TestCase):
 
 class Intify(unittest.TestCase):
     known = [('\0' * 20, 0),
-             ('\xff' * 20, 2L**160 - 1),
+            ('\xff' * 20, 2L**160 - 1),
             ]
     def testKnown(self):
         for str, value in self.known: 
index 3ca5d7250d0cfb82c0529114606e7e62ed04ebd8..f3e410aa3087b1b8fc94772f0b3e0eeefe6a981e 100644 (file)
@@ -12,332 +12,325 @@ from knode import KNode as Node
 
 from hash import newID, newIDInRange
 
-from actions import FindNode, GetValue, KeyExpirer
-from twisted.web import xmlrpc
+from actions import FindNode, GetValue, KeyExpirer, StoreValue
+import krpc
+import airhook
+
 from twisted.internet.defer import Deferred
+from twisted.internet import protocol
 from twisted.python import threadable
 from twisted.internet.app import Application
 from twisted.web import server
 threadable.init()
+import sys
 
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
 import pysqlite_exceptions
 
-KhashmirDBExcept = "KhashmirDBExcept"
+class KhashmirDBExcept(Exception):
+    pass
 
 # this is the main class!
-class Khashmir(xmlrpc.XMLRPC):
-       __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
-       def __init__(self, host, port, db='khashmir.db'):
-               self.setup(host, port, db)
-               
-       def setup(self, host, port, db='khashmir.db'):
-               self._findDB(db)
-               self.node = self._loadSelfNode(host, port)
-               self.table = KTable(self.node)
-               self._loadRoutingTable()
-               self.app = Application("xmlrpc")
-               self.app.listenTCP(port, server.Site(self))
-               self.last = time.time()
-               KeyExpirer(store=self.store)
-               #self.refreshTable(force=1)
-               reactor.callLater(60, self.checkpoint, (1,))
-               
-       def _loadSelfNode(self, host, port):
-               c = self.store.cursor()
-               c.execute('select id from self where num = 0;')
-               if c.rowcount > 0:
-                       id = c.fetchone()[0].decode('base64')
-               else:
-                       id = newID()
-               return Node().init(id, host, port)
-               
-       def _saveSelfNode(self):
-               self.store.autocommit = 0
-               c = self.store.cursor()
-               c.execute('delete from self where num = 0;')
-               c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
-               self.store.commit()
-               self.store.autocommit = 1
-               
-       def checkpoint(self, auto=0):
-               self._saveSelfNode()
-               self._dumpRoutingTable()
-               if auto:
-                       reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
-               
-       def _findDB(self, db):
-               import os
-               try:
-                       os.stat(db)
-               except OSError:
-                       self._createNewDB(db)
-               else:
-                       self._loadDB(db)
-           
-       def _loadDB(self, db):
-               try:
-                       self.store = sqlite.connect(db=db)
-                       self.store.autocommit = 1
-               except:
-                       import traceback
-                       raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
-           
-       def _createNewDB(self, db):
-               self.store = sqlite.connect(db=db)
-               self.store.autocommit = 1
-               s = """
-                       create table kv (key text, value text, time timestamp, primary key (key, value));
-                       create index kv_key on kv(key);
-                       create index kv_timestamp on kv(time);
-                       
-                       create table nodes (id text primary key, host text, port number);
-                       
-                       create table self (num number primary key, id text);
-                       """
-               c = self.store.cursor()
-               c.execute(s)
-
-       def _dumpRoutingTable(self):
-               """
-                       save routing table nodes to the database
-               """
-               self.store.autocommit = 0;
-               c = self.store.cursor()
-               c.execute("delete from nodes where id not NULL;")
-               for bucket in self.table.buckets:
-                       for node in bucket.l:
-                               d = node.senderDict()
-                               c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
-               self.store.commit()
-               self.store.autocommit = 1;
-               
-       def _loadRoutingTable(self):
-               """
-                       load routing table nodes from database
-                       it's usually a good idea to call refreshTable(force=1) after loading the table
-               """
-               c = self.store.cursor()
-               c.execute("select * from nodes;")
-               for rec in c.fetchall():
-                       n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
-                       self.table.insertNode(n, contacted=0)
-                       
-       def render(self, request):
-               """
-                       Override the built in render so we can have access to the request object!
-                       note, crequest is probably only valid on the initial call (not after deferred!)
-               """
-               self.crequest = request
-               return xmlrpc.XMLRPC.render(self, request)
+class Khashmir(protocol.Factory):
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+    protocol = krpc.KRPC
+    def __init__(self, host, port, db='khashmir.db'):
+        self.setup(host, port, db)
+        
+    def setup(self, host, port, db='khashmir.db'):
+        self._findDB(db)
+        self.node = self._loadSelfNode(host, port)
+        self.table = KTable(self.node)
+        self.app = Application("krpc")
+        self.airhook = airhook.listenAirhookStream(port, self)
+        self.last = time.time()
+        self._loadRoutingTable()
+        KeyExpirer(store=self.store)
+        #self.refreshTable(force=1)
+        reactor.callLater(60, self.checkpoint, (1,))
+        
+    def _loadSelfNode(self, host, port):
+        c = self.store.cursor()
+        c.execute('select id from self where num = 0;')
+        if c.rowcount > 0:
+            id = c.fetchone()[0].decode('hex')
+        else:
+            id = newID()
+        return Node().init(id, host, port)
+        
+    def _saveSelfNode(self):
+        self.store.autocommit = 0
+        c = self.store.cursor()
+        c.execute('delete from self where num = 0;')
+        c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+        self.store.commit()
+        self.store.autocommit = 1
+        
+    def checkpoint(self, auto=0):
+        self._saveSelfNode()
+        self._dumpRoutingTable()
+        if auto:
+            reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+        
+    def _findDB(self, db):
+        import os
+        try:
+            os.stat(db)
+        except OSError:
+            self._createNewDB(db)
+        else:
+            self._loadDB(db)
+        
+    def _loadDB(self, db):
+        try:
+            self.store = sqlite.connect(db=db)
+            self.store.autocommit = 1
+        except:
+            import traceback
+            raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+        
+    def _createNewDB(self, db):
+        self.store = sqlite.connect(db=db)
+        self.store.autocommit = 1
+        s = """
+            create table kv (key text, value text, time timestamp, primary key (key, value));
+            create index kv_key on kv(key);
+            create index kv_timestamp on kv(time);
+            
+            create table nodes (id text primary key, host text, port number);
+            
+            create table self (num number primary key, id text);
+            """
+        c = self.store.cursor()
+        c.execute(s)
 
+    def _dumpRoutingTable(self):
+        """
+            save routing table nodes to the database
+        """
+        self.store.autocommit = 0;
+        c = self.store.cursor()
+        c.execute("delete from nodes where id not NULL;")
+        for bucket in self.table.buckets:
+            for node in bucket.l:
+                d = node.senderDict()
+                c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+        self.store.commit()
+        self.store.autocommit = 1;
+        
+    def _loadRoutingTable(self):
+        """
+            load routing table nodes from database
+            it's usually a good idea to call refreshTable(force=1) after loading the table
+        """
+        c = self.store.cursor()
+        c.execute("select * from nodes;")
+        for rec in c.fetchall():
+            n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.airhook.connectionForAddr((n.host, n.port))
+            self.table.insertNode(n, contacted=0)
+            
 
-       #######
-       #######  LOCAL INTERFACE    - use these methods!
-       def addContact(self, host, port):
-               """
-                       ping this node and add the contact info to the table on pong!
-               """
-               n =Node().init(const.NULL_ID, host, port)  # note, we 
-               self.sendPing(n)
+    #######
+    #######  LOCAL INTERFACE    - use these methods!
+    def addContact(self, host, port, callback=None):
+        """
+            ping this node and add the contact info to the table on pong!
+        """
+        n =Node().init(const.NULL_ID, host, port) 
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.sendPing(n, callback=callback)
 
-       ## this call is async!
-       def findNode(self, id, callback, errback=None):
-               """ returns the contact info for node, or the k closest nodes, from the global table """
-               # get K nodes out of local table/cache, or the node we want
-               nodes = self.table.findNodes(id)
-               d = Deferred()
-               if errback:
-                       d.addCallbacks(callback, errback)
-               else:
-                       d.addCallback(callback)
-               if len(nodes) == 1 and nodes[0].id == id :
-                       d.callback(nodes)
-               else:
-                       # create our search state
-                       state = FindNode(self, id, d.callback)
-                       reactor.callFromThread(state.goWithNodes, nodes)
-       
-       
-       ## also async
-       def valueForKey(self, key, callback):
-               """ returns the values found for key in global table
-                       callback will be called with a list of values for each peer that returns unique values
-                       final callback will be an empty list - probably should change to 'more coming' arg
-               """
-               nodes = self.table.findNodes(key)
-               
-               # get locals
-               l = self.retrieveValues(key)
-               if len(l) > 0:
-                       reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
-               
-               # create our search state
-               state = GetValue(self, key, callback)
-               reactor.callFromThread(state.goWithNodes, nodes, l)
+    ## this call is async!
+    def findNode(self, id, callback, errback=None):
+        """ returns the contact info for node, or the k closest nodes, from the global table """
+        # get K nodes out of local table/cache, or the node we want
+        nodes = self.table.findNodes(id)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+        if len(nodes) == 1 and nodes[0].id == id :
+            d.callback(nodes)
+        else:
+            # create our search state
+            state = FindNode(self, id, d.callback)
+            reactor.callFromThread(state.goWithNodes, nodes)
+    
+    
+    ## also async
+    def valueForKey(self, key, callback):
+        """ returns the values found for key in global table
+            callback will be called with a list of values for each peer that returns unique values
+            final callback will be an empty list - probably should change to 'more coming' arg
+        """
+        nodes = self.table.findNodes(key)
+        
+        # get locals
+        l = self.retrieveValues(key)
+        
+        # create our search state
+        state = GetValue(self, key, callback)
+        reactor.callFromThread(state.goWithNodes, nodes, l)
 
-       ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-       def storeValueForKey(self, key, value, callback=None):
-               """ stores the value for key in the global table, returns immediately, no status 
-                       in this implementation, peers respond but don't indicate status to storing values
-                       a key can have many values
-               """
-               def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
-                       if not response:
-                               # default callback
-                               def _storedValueHandler(sender):
-                                       pass
-                               response=_storedValueHandler
-               
-                       for node in nodes[:const.STORE_REDUNDANCY]:
-                               def cb(t, table = table, node=node, resp=response):
-                                       self.table.insertNode(node)
-                                       response(t)
-                               if node.id != self.node.id:
-                                       def default(err, node=node, table=table):
-                                               table.nodeFailed(node)
-                                       df = node.storeValue(key, value, self.node.senderDict())
-                                       df.addCallbacks(cb, default)
-               # this call is asynch
-               self.findNode(key, _storeValueForKey)
-       
-       
-       def insertNode(self, n, contacted=1):
-               """
-               insert a node in our local table, pinging oldest contact in bucket, if necessary
-               
-               If all you have is a host/port, then use addContact, which calls this method after
-               receiving the PONG from the remote node.  The reason for the seperation is we can't insert
-               a node into the table without it's peer-ID.  That means of course the node passed into this
-               method needs to be a properly formed Node object with a valid ID.
-               """
-               old = self.table.insertNode(n, contacted=contacted)
-               if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
-                       # the bucket is full, check to see if old node is still around and if so, replace it
-                       
-                       ## these are the callbacks used when we ping the oldest node in a bucket
-                       def _staleNodeHandler(oldnode=old, newnode = n):
-                               """ called if the pinged node never responds """
-                               self.table.replaceStaleNode(old, newnode)
-                       
-                       def _notStaleNodeHandler(sender, old=old):
-                               """ called when we get a pong from the old node """
-                               args, sender = sender
-                               sender = Node().initWithDict(sender)
-                               if sender.id == old.id:
-                                       self.table.justSeenNode(old)
-                       
-                       df = old.ping(self.node.senderDict())
-                       df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
+        """ stores the value for key in the global table, returns immediately, no status 
+            in this implementation, peers respond but don't indicate status to storing values
+            a key can have many values
+        """
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+            if not response:
+                # default callback
+                def _storedValueHandler(sender):
+                    pass
+                response=_storedValueHandler
+            action = StoreValue(self.table, key, value, response)
+            reactor.callFromThread(action.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findNode(key, _storeValueForKey)
+        
+    
+    def insertNode(self, n, contacted=1):
+        """
+        insert a node in our local table, pinging oldest contact in bucket, if necessary
+        
+        If all you have is a host/port, then use addContact, which calls this method after
+        receiving the PONG from the remote node.  The reason for the seperation is we can't insert
+        a node into the table without it's peer-ID.  That means of course the node passed into this
+        method needs to be a properly formed Node object with a valid ID.
+        """
+        old = self.table.insertNode(n, contacted=contacted)
+        if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+            # the bucket is full, check to see if old node is still around and if so, replace it
+            
+            ## these are the callbacks used when we ping the oldest node in a bucket
+            def _staleNodeHandler(oldnode=old, newnode = n):
+                """ called if the pinged node never responds """
+                self.table.replaceStaleNode(old, newnode)
+            
+            def _notStaleNodeHandler(dict, old=old):
+                """ called when we get a pong from the old node """
+                sender = dict['sender']
+                if sender['id'] == old.id:
+                    self.table.justSeenNode(old.id)
+            
+            df = old.ping(self.node.senderDict())
+            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
-       def sendPing(self, node):
-               """
-                       ping a node
-               """
-               df = node.ping(self.node.senderDict())
-               ## these are the callbacks we use when we issue a PING
-               def _pongHandler(args, node=node, table=self.table):
-                       l, sender = args
-                       if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
-                               # whoah, got response from different peer than we were expecting
-                               self.table.invalidateNode(node)
-                       else:
-                               sender['host'] = node.host
-                               sender['port'] = node.port
-                               n = Node().initWithDict(sender)
-                               table.insertNode(n)
-                               return
-               def _defaultPong(err, node=node, table=self.table):
-                       table.nodeFailed(node)
-               
-               df.addCallbacks(_pongHandler,_defaultPong)
+    def sendPing(self, node, callback=None):
+        """
+            ping a node
+        """
+        df = node.ping(self.node.senderDict())
+        ## these are the callbacks we use when we issue a PING
+        def _pongHandler(dict, node=node, table=self.table, callback=callback):
+            sender = dict['sender']
+            if node.id != const.NULL_ID and node.id != sender['id']:
+                # whoah, got response from different peer than we were expecting
+                self.table.invalidateNode(node)
+            else:
+                sender['host'] = node.host
+                sender['port'] = node.port
+                n = Node().initWithDict(sender)
+                n.conn = self.airhook.connectionForAddr((n.host, n.port))
+                table.insertNode(n)
+                if callback:
+                    callback()
+        def _defaultPong(err, node=node, table=self.table, callback=callback):
+            table.nodeFailed(node)
+            if callback:
+                callback()
+        
+        df.addCallbacks(_pongHandler,_defaultPong)
 
-       def findCloseNodes(self, callback=lambda a: None):
-               """
-                       This does a findNode on the ID one away from our own.  
-                       This will allow us to populate our table with nodes on our network closest to our own.
-                       This is called as soon as we start up with an empty table
-               """
-               id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-               self.findNode(id, callback)
+    def findCloseNodes(self, callback=lambda a: None):
+        """
+            This does a findNode on the ID one away from our own.  
+            This will allow us to populate our table with nodes on our network closest to our own.
+            This is called as soon as we start up with an empty table
+        """
+        id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+        self.findNode(id, callback)
 
-       def refreshTable(self, force=0):
-               """
-                       force=1 will refresh table regardless of last bucket access time
-               """
-               def callback(nodes):
-                       pass
-       
-               for bucket in self.table.buckets:
-                       if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
-                               id = newIDInRange(bucket.min, bucket.max)
-                               self.findNode(id, callback)
+    def refreshTable(self, force=0):
+        """
+            force=1 will refresh table regardless of last bucket access time
+        """
+        def callback(nodes):
+            pass
+    
+        for bucket in self.table.buckets:
+            if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+                id = newIDInRange(bucket.min, bucket.max)
+                self.findNode(id, callback)
 
 
-       def retrieveValues(self, key):
-               s = "select value from kv where key = '%s';" % key.encode('base64')
-               c = self.store.cursor()
-               c.execute(s)
-               t = c.fetchone()
-               l = []
-               while t:
-                       l.append(t['value'])
-                       t = c.fetchone()
-               return l
-       
-       #####
-       ##### INCOMING MESSAGE HANDLERS
-       
-       def xmlrpc_ping(self, sender):
-               """
-                       takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-                       returns sender dict
-               """
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return (), self.node.senderDict()
-               
-       def xmlrpc_find_node(self, target, sender):
-               nodes = self.table.findNodes(target.decode('base64'))
-               nodes = map(lambda node: node.senderDict(), nodes)
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return nodes, self.node.senderDict()
-           
-       def xmlrpc_store_value(self, key, value, sender):
-               t = "%0.6f" % time.time()
-               s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
-               c = self.store.cursor()
-               try:
-                       c.execute(s)
-               except pysqlite_exceptions.IntegrityError, reason:
-                       # update last insert time
-                       s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
-                       c.execute(s)
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return (), self.node.senderDict()
-       
-       def xmlrpc_find_value(self, key, sender):
-               ip = self.crequest.getClientIP()
-               key = key.decode('base64')
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-       
-               l = self.retrieveValues(key)
-               if len(l) > 0:
-                       return {'values' : l}, self.node.senderDict()
-               else:
-                       nodes = self.table.findNodes(key)
-                       nodes = map(lambda node: node.senderDict(), nodes)
-                       return {'nodes' : nodes}, self.node.senderDict()
+    def retrieveValues(self, key):
+        s = "select value from kv where key = '%s';" % key.encode('hex')
+        c = self.store.cursor()
+        c.execute(s)
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'].decode('base64'))
+            t = c.fetchone()
+        return l
+    
+    #####
+    ##### INCOMING MESSAGE HANDLERS
+    
+    def krpc_ping(self, sender, _krpc_sender):
+        """
+            takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+            returns sender dict
+        """
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"sender" : self.node.senderDict()}
+        
+    def krpc_find_node(self, target, sender, _krpc_sender):
+        nodes = self.table.findNodes(target)
+        nodes = map(lambda node: node.senderDict(), nodes)
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"nodes" : nodes, "sender" : self.node.senderDict()}
+            
+    def krpc_store_value(self, key, value, sender, _krpc_sender):
+        t = "%0.6f" % time.time()
+        s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
+        c = self.store.cursor()
+        try:
+            c.execute(s)
+        except pysqlite_exceptions.IntegrityError, reason:
+            # update last insert time
+            s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
+            c.execute(s)
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"sender" : self.node.senderDict()}
+    
+    def krpc_find_value(self, key, sender, _krpc_sender):
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+    
+        l = self.retrieveValues(key)
+        if len(l) > 0:
+            return {'values' : l, "sender": self.node.senderDict()}
+        else:
+            nodes = self.table.findNodes(key)
+            nodes = map(lambda node: node.senderDict(), nodes)
+            return {'nodes' : nodes, "sender": self.node.senderDict()}
 
 ### TESTING ###
 from random import randrange
@@ -347,129 +340,151 @@ from hash import newID
 
 
 def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
-       import thread
-       l = []
-       for i in xrange(peers):
-               a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
-               l.append(a)
-       thread.start_new_thread(l[0].app.run, ())
-       for peer in l[1:]:
-               peer.app.run()  
-       return l
-       
-def test_build_net(quiet=0, peers=24, host='localhost',  pause=0, startport=2001, dbprefix='/tmp/test'):
-       from whrandom import randrange
-       import threading
-       import thread
-       import sys
-       port = startport
-       l = []
-       if not quiet:
-               print "Building %s peer table." % peers
-       
-       for i in xrange(peers):
-               a = Khashmir(host, port + i, db = dbprefix +`i`)
-               l.append(a)
-       
-       
-       thread.start_new_thread(l[0].app.run, ())
-       time.sleep(1)
-       for peer in l[1:]:
-               peer.app.run()
-       time.sleep(3)
-       
-       print "adding contacts...."
-       
-       for peer in l:
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               if pause:
-                       time.sleep(.33)
-       
-       time.sleep(10)
-       print "finding close nodes...."
-       
-       for peer in l:
-               flag = threading.Event()
-               def cb(nodes, f=flag):
-                       f.set()
-               peer.findCloseNodes(cb)
-               flag.wait()
-       #    for peer in l:
-       #       peer.refreshTable()
-       return l
+    import thread
+    l = []
+    for i in xrange(peers):
+        a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
+        l.append(a)
+    thread.start_new_thread(l[0].app.run, ())
+    for peer in l[1:]:
+        peer.app.run() 
+    return l
+    
+def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
+    from whrandom import randrange
+    import threading
+    import thread
+    import sys
+    port = startport
+    l = []
+    if not quiet:
+        print "Building %s peer table." % peers
+    
+    for i in xrange(peers):
+        a = Khashmir(host, port + i, db = dbprefix +`i`)
+        l.append(a)
+    
+    
+    thread.start_new_thread(l[0].app.run, ())
+    time.sleep(1)
+    for peer in l[1:]:
+        peer.app.run()
+    #time.sleep(3)
+    
+    def spewer(frame, s, ignored):
+        from twisted.python import reflect
+        if frame.f_locals.has_key('self'):
+            se = frame.f_locals['self']
+            print 'method %s of %s at %s' % (
+                frame.f_code.co_name, reflect.qual(se.__class__), id(se)
+                )
+    #sys.settrace(spewer)
+
+    print "adding contacts...."
+    def makecb(flag):
+        def cb(f=flag):
+            f.set()
+        return cb
+
+    for peer in l:
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+    
+    print "finding close nodes...."
+    
+    for peer in l:
+        flag = threading.Event()
+        def cb(nodes, f=flag):
+            f.set()
+        peer.findCloseNodes(cb)
+        flag.wait()
+    #    for peer in l:
+    #  peer.refreshTable()
+    return l
         
 def test_find_nodes(l, quiet=0):
-       flag = threading.Event()
-       
-       n = len(l)
-       
-       a = l[randrange(0,n)]
-       b = l[randrange(0,n)]
-       
-       def callback(nodes, flag=flag, id = b.node.id):
-               if (len(nodes) >0) and (nodes[0].id == id):
-                       print "test_find_nodes  PASSED"
-               else:
-                       print "test_find_nodes  FAILED"
-               flag.set()
-       a.findNode(b.node.id, callback)
-       flag.wait()
+    flag = threading.Event()
+    
+    n = len(l)
+    
+    a = l[randrange(0,n)]
+    b = l[randrange(0,n)]
+    
+    def callback(nodes, flag=flag, id = b.node.id):
+        if (len(nodes) >0) and (nodes[0].id == id):
+            print "test_find_nodes     PASSED"
+        else:
+            print "test_find_nodes     FAILED"
+        flag.set()
+    a.findNode(b.node.id, callback)
+    flag.wait()
     
 def test_find_value(l, quiet=0):
-       
-       fa = threading.Event()
-       fb = threading.Event()
-       fc = threading.Event()
-       
-       n = len(l)
-       a = l[randrange(0,n)]
-       b = l[randrange(0,n)]
-       c = l[randrange(0,n)]
-       d = l[randrange(0,n)]
-       
-       key = newID()
-       value = newID()
-       if not quiet: print "inserting value..."
-       a.storeValueForKey(key, value)
-       time.sleep(3)
-       if not quiet:
-               print "finding..."
-       
-       class cb:
-               def __init__(self, flag, value=value):
-                       self.flag = flag
-                       self.val = value
-                       self.found = 0
-               def callback(self, values):
-                       try:
-                               if(len(values) == 0):
-                                       if not self.found:
-                                               print "find                NOT FOUND"
-                                       else:
-                                               print "find                FOUND"
-                               else:
-                                       if self.val in values:
-                                               self.found = 1
-                       finally:
-                               self.flag.set()
-       
-       b.valueForKey(key, cb(fa).callback)
-       fa.wait()
-       c.valueForKey(key, cb(fb).callback)
-       fb.wait()
-       d.valueForKey(key, cb(fc).callback)    
-       fc.wait()
+    
+    fa = threading.Event()
+    fb = threading.Event()
+    fc = threading.Event()
+    
+    n = len(l)
+    a = l[randrange(0,n)]
+    b = l[randrange(0,n)]
+    c = l[randrange(0,n)]
+    d = l[randrange(0,n)]
+    
+    key = newID()
+    value = newID()
+    if not quiet: print "inserting value..."
+    a.storeValueForKey(key, value)
+    time.sleep(3)
+    if not quiet:
+        print "finding..."
+    
+    class cb:
+        def __init__(self, flag, value=value):
+            self.flag = flag
+            self.val = value
+            self.found = 0
+        def callback(self, values):
+            try:
+                if(len(values) == 0):
+                    if not self.found:
+                        print "find                NOT FOUND"
+                    else:
+                        print "find                FOUND"
+                else:
+                    if self.val in values:
+                        self.found = 1
+            finally:
+                self.flag.set()
+    
+    b.valueForKey(key, cb(fa).callback)
+    fa.wait()
+    c.valueForKey(key, cb(fb).callback)
+    fb.wait()
+    d.valueForKey(key, cb(fc).callback)    
+    fc.wait()
     
 def test_one(host, port, db='/tmp/test'):
-       import thread
-       k = Khashmir(host, port, db)
-       thread.start_new_thread(k.app.run, ())
-       return k
+    import thread
+    k = Khashmir(host, port, db)
+    thread.start_new_thread(reactor.run, ())
+    return k
     
 if __name__ == "__main__":
     import sys
@@ -479,7 +494,7 @@ if __name__ == "__main__":
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):
-               test_find_nodes(l)
+        test_find_nodes(l)
     print "inserting and fetching values..."
     for i in range(10):
-               test_find_value(l)
+        test_find_value(l)
index ef8226034f993926411fe6427d0a29eb60c09e6a..b5ecedcc0bfd6d70b25b2b53a6d689d877f01f94 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -1,39 +1,27 @@
 from node import Node
 from twisted.internet.defer import Deferred
-from xmlrpcclient import XMLRPCClientFactory as factory
 from const import reactor, NULL_ID
 
 class KNode(Node):
-       def makeResponse(self, df):
-               """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """
-               def _callback(args, d=df):
-                       try:
-                               l, sender = args
-                       except:
-                               d.callback(args)
-                       else:
-                               if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
-                                       d.errback()
-                               else:
-                                       d.callback(args)
-               return _callback
-       def ping(self, sender):
-               df = Deferred()
-               f = factory('ping', (sender,), self.makeResponse(df), df.errback)
-               reactor.connectTCP(self.host, self.port, f)
-               return df
-       def findNode(self, target, sender):
-               df = Deferred()
-               f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
-               reactor.connectTCP(self.host, self.port, f)
-               return df
-       def storeValue(self, key, value, sender):
-               df = Deferred()
-               f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
-               reactor.connectTCP(self.host, self.port, f)
-               return df
-       def findValue(self, key, sender):
-               df = Deferred()
-               f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
-               reactor.connectTCP(self.host, self.port, f)
-               return df
+    def makeResponse(self, df):
+        """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """
+        def _callback(dict, d=df):
+            try:
+                senderid = dict['sender']['id']
+            except KeyError:
+                d.errback()
+            else:
+                if self.id != NULL_ID and senderid != self._senderDict['id']:
+                    d.errback()
+                else:
+                    d.callback(dict)
+        return _callback
+        
+    def ping(self, sender):
+        return self.conn.protocol.sendRequest('ping', {"sender":sender})
+    def findNode(self, target, sender):
+        return self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender})
+    def storeValue(self, key, value, sender):
+        return self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+    def findValue(self, key, sender):
+        return self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender})
\ No newline at end of file
diff --git a/krpc.py b/krpc.py
new file mode 100644 (file)
index 0000000..8569bbf
--- /dev/null
+++ b/krpc.py
@@ -0,0 +1,101 @@
+import airhook
+from twisted.internet.defer import Deferred
+from twisted.protocols import basic
+from bencode import bencode, bdecode
+from twisted.internet import reactor
+
+import hash
+
+KRPC_TIMEOUT = 30
+
+KRPC_ERROR = 1
+KRPC_ERROR_METHOD_UNKNOWN = 2
+KRPC_ERROR_RECEIVED_UNKNOWN = 3
+KRPC_ERROR_TIMEOUT = 4
+
+class KRPC(basic.NetstringReceiver):
+    noisy = 1
+    def __init__(self):
+        self.tids = {}
+
+    def stringReceived(self, str):
+        # bdecode
+        try:
+            msg = bdecode(str)
+        except Exception, e:
+            print "response decode error: " + `e`
+            self.d.errback()
+        else:
+            # look at msg type
+            if msg['typ']  == 'req':
+                ilen = len(str)
+                # if request
+                #      tell factory to handle
+                f = getattr(self.factory ,"krpc_" + msg['req'], None)
+                if f and callable(f):
+                    msg['arg']['_krpc_sender'] =  self.transport.addr
+                    try:
+                        ret = apply(f, (), msg['arg'])
+                    except Exception, e:
+                        ## send error
+                        str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+                        olen = len(str)
+                        self.sendString(str)
+                    else:
+                        if ret:
+                            #  make response
+                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+                        else:
+                            str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []})
+                        #      send response
+                        olen = len(str)
+                        self.sendString(str)
+
+                else:
+                    # unknown method
+                    str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+                    olen = len(str)
+                    self.sendString(str)
+                if self.noisy:
+                    print "%s >>> (%s, %s) - %s %s %s" % (self.transport.addr, self.factory.node.host, self.factory.node.port, 
+                                                    ilen, msg['req'], olen)
+            elif msg['typ'] == 'rsp':
+                # if response
+                #      lookup tid
+                if self.tids.has_key(msg['tid']):
+                    df = self.tids[msg['tid']]
+                    #  callback
+                    df.callback(msg['rsp'])
+                    del(self.tids[msg['tid']])
+                # no tid, perhaps this transaction timed out already...
+            elif msg['typ'] == 'err':
+                # if error
+                #      lookup tid
+                df = self.tids[msg['tid']]
+                #      callback
+                df.errback(msg['err'])
+                del(self.tids[msg['tid']])
+            else:
+                # unknown message type
+                df = self.tids[msg['tid']]
+                #      callback
+                df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
+                del(self.tids[msg['tid']])
+                
+    def sendRequest(self, method, args):
+        # make message
+        # send it
+        msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
+        str = bencode(msg)
+        self.sendString(str)
+        d = Deferred()
+        self.tids[msg['tid']] = d
+        
+        def timeOut(tids = self.tids, id = msg['tid']):
+            if tids.has_key(id):
+                df = tids[id]
+                del(tids[id])
+                df.errback(KRPC_ERROR_TIMEOUT)
+        reactor.callLater(KRPC_TIMEOUT, timeOut)
+        return d
\ No newline at end of file
index bd89ef76adb14c887fa6e9b73e7bab947c73c3c0..05bd51cefb0f2f5b9c76ef1d506c56f922325289 100644 (file)
--- a/ktable.py
+++ b/ktable.py
@@ -6,207 +6,207 @@ from types import *
 
 import hash
 import const
-from const import K, HASH_LENGTH
+from const import K, HASH_LENGTH, NULL_ID
 from node import Node
 
 class KTable:
-       """local routing table for a kademlia like distributed hash table"""
-       def __init__(self, node):
-               # this is the root node, a.k.a. US!
-               self.node = node
-               self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
-               self.insertNode(node)
-               
-       def _bucketIndexForInt(self, num):
-               """the index of the bucket that should hold int"""
-               return bisect_left(self.buckets, num)
-       
-       def findNodes(self, id):
-               """
-                       return K nodes in our own local table closest to the ID.
-               """
-               
-               if isinstance(id, str):
-                       num = hash.intify(id)
-               elif isinstance(id, Node):
-                       num = id.num
-               elif isinstance(id, int) or isinstance(id, long):
-                       num = id
-               else:
-                       raise TypeError, "findNodes requires an int, string, or Node"
-                       
-               nodes = []
-               i = self._bucketIndexForInt(num)
-               
-               # if this node is already in our table then return it
-               try:
-                       index = self.buckets[i].l.index(num)
-               except ValueError:
-                       pass
-               else:
-                       return [self.buckets[i].l[index]]
-                       
-               # don't have the node, get the K closest nodes
-               nodes = nodes + self.buckets[i].l
-               if len(nodes) < K:
-                       # need more nodes
-                       min = i - 1
-                       max = i + 1
-                       while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
-                               #ASw: note that this requires K be even
-                               if min >= 0:
-                                       nodes = nodes + self.buckets[min].l
-                               if max < len(self.buckets):
-                                       nodes = nodes + self.buckets[max].l
-                               min = min - 1
-                               max = max + 1
-       
-               nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
-               return nodes[:K]
-               
-       def _splitBucket(self, a):
-               diff = (a.max - a.min) / 2
-               b = KBucket([], a.max - diff, a.max)
-               self.buckets.insert(self.buckets.index(a.min) + 1, b)
-               a.max = a.max - diff
-               # transfer nodes to new bucket
-               for anode in a.l[:]:
-                       if anode.num >= a.max:
-                               a.l.remove(anode)
-                               b.l.append(anode)
-       
-       def replaceStaleNode(self, stale, new):
-               """this is used by clients to replace a node returned by insertNode after
-               it fails to respond to a Pong message"""
-               i = self._bucketIndexForInt(stale.num)
-               try:
-                       it = self.buckets[i].l.index(stale.num)
-               except ValueError:
-                       return
-       
-               del(self.buckets[i].l[it])
-               if new:
-                       self.buckets[i].l.append(new)
-       
-       def insertNode(self, node, contacted=1):
-               """ 
-               this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
-               the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
-               contacted means that yes, we contacted THEM and we know the node is reachable
-               """
-               assert node.id != " "*20
-               if node.id == self.node.id: return
-               # get the bucket for this node
-               i = self. _bucketIndexForInt(node.num)
-               # check to see if node is in the bucket already
-               try:
-                       it = self.buckets[i].l.index(node.num)
-               except ValueError:
-                       # no
-                       pass
-               else:
-                       if contacted:
-                               node.updateLastSeen()
-                               # move node to end of bucket
-                               xnode = self.buckets[i].l[it]
-                               del(self.buckets[i].l[it])
-                               # note that we removed the original and replaced it with the new one
-                               # utilizing this nodes new contact info
-                               self.buckets[i].l.append(xnode)
-                               self.buckets[i].touch()
-                       return
-               
-               # we don't have this node, check to see if the bucket is full
-               if len(self.buckets[i].l) < K:
-                       # no, append this node and return
-                       if contacted:
-                               node.updateLastSeen()
-                       self.buckets[i].l.append(node)
-                       self.buckets[i].touch()
-                       return
-                       
-               # bucket is full, check to see if self.node is in the bucket
-               if not (self.buckets[i].min <= self.node < self.buckets[i].max):
-                       return self.buckets[i].l[0]
-               
-               # this bucket is full and contains our node, split the bucket
-               if len(self.buckets) >= HASH_LENGTH:
-                       # our table is FULL, this is really unlikely
-                       print "Hash Table is FULL!  Increase K!"
-                       return
-                       
-               self._splitBucket(self.buckets[i])
-               
-               # now that the bucket is split and balanced, try to insert the node again
-               return self.insertNode(node)
-       
-       def justSeenNode(self, node):
-               """call this any time you get a message from a node
-               it will update it in the table if it's there """
-               try:
-                       n = self.findNodes(node.num)[0]
-               except IndexError:
-                       return None
-               else:
-                       tstamp = n.lastSeen
-                       n.updateLastSeen()
-                       return tstamp
-       
-       def invalidateNode(self, n):
-               """
-                       forget about node n - use when you know that node is invalid
-               """
-               self.replaceStaleNode(n, None)
-       
-       def nodeFailed(self, node):
-               """ call this when a node fails to respond to a message, to invalidate that node """
-               try:
-                       n = self.findNodes(node.num)[0]
-               except IndexError:
-                       return None
-               else:
-                       if n.msgFailed() >= const.MAX_FAILURES:
-                               self.invalidateNode(n)
-                                       
+    """local routing table for a kademlia like distributed hash table"""
+    def __init__(self, node):
+        # this is the root node, a.k.a. US!
+        self.node = node
+        self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
+        self.insertNode(node)
+        
+    def _bucketIndexForInt(self, num):
+        """the index of the bucket that should hold int"""
+        return bisect_left(self.buckets, num)
+    
+    def findNodes(self, id):
+        """
+            return K nodes in our own local table closest to the ID.
+        """
+        
+        if isinstance(id, str):
+            num = hash.intify(id)
+        elif isinstance(id, Node):
+            num = id.num
+        elif isinstance(id, int) or isinstance(id, long):
+            num = id
+        else:
+            raise TypeError, "findNodes requires an int, string, or Node"
+            
+        nodes = []
+        i = self._bucketIndexForInt(num)
+        
+        # if this node is already in our table then return it
+        try:
+            index = self.buckets[i].l.index(num)
+        except ValueError:
+            pass
+        else:
+            return [self.buckets[i].l[index]]
+            
+        # don't have the node, get the K closest nodes
+        nodes = nodes + self.buckets[i].l
+        if len(nodes) < K:
+            # need more nodes
+            min = i - 1
+            max = i + 1
+            while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
+                #ASw: note that this requires K be even
+                if min >= 0:
+                    nodes = nodes + self.buckets[min].l
+                if max < len(self.buckets):
+                    nodes = nodes + self.buckets[max].l
+                min = min - 1
+                max = max + 1
+    
+        nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
+        return nodes[:K]
+        
+    def _splitBucket(self, a):
+        diff = (a.max - a.min) / 2
+        b = KBucket([], a.max - diff, a.max)
+        self.buckets.insert(self.buckets.index(a.min) + 1, b)
+        a.max = a.max - diff
+        # transfer nodes to new bucket
+        for anode in a.l[:]:
+            if anode.num >= a.max:
+                a.l.remove(anode)
+                b.l.append(anode)
+    
+    def replaceStaleNode(self, stale, new):
+        """this is used by clients to replace a node returned by insertNode after
+        it fails to respond to a Pong message"""
+        i = self._bucketIndexForInt(stale.num)
+        try:
+            it = self.buckets[i].l.index(stale.num)
+        except ValueError:
+            return
+    
+        del(self.buckets[i].l[it])
+        if new:
+            self.buckets[i].l.append(new)
+    
+    def insertNode(self, node, contacted=1):
+        """ 
+        this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
+        the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
+        contacted means that yes, we contacted THEM and we know the node is reachable
+        """
+        assert node.id != NULL_ID
+        if node.id == self.node.id: return
+        # get the bucket for this node
+        i = self. _bucketIndexForInt(node.num)
+        # check to see if node is in the bucket already
+        try:
+            it = self.buckets[i].l.index(node.num)
+        except ValueError:
+            # no
+            pass
+        else:
+            if contacted:
+                node.updateLastSeen()
+                # move node to end of bucket
+                xnode = self.buckets[i].l[it]
+                del(self.buckets[i].l[it])
+                # note that we removed the original and replaced it with the new one
+                # utilizing this nodes new contact info
+                self.buckets[i].l.append(xnode)
+                self.buckets[i].touch()
+            return
+        
+        # we don't have this node, check to see if the bucket is full
+        if len(self.buckets[i].l) < K:
+            # no, append this node and return
+            if contacted:
+                node.updateLastSeen()
+            self.buckets[i].l.append(node)
+            self.buckets[i].touch()
+            return
+            
+        # bucket is full, check to see if self.node is in the bucket
+        if not (self.buckets[i].min <= self.node < self.buckets[i].max):
+            return self.buckets[i].l[0]
+        
+        # this bucket is full and contains our node, split the bucket
+        if len(self.buckets) >= HASH_LENGTH:
+            # our table is FULL, this is really unlikely
+            print "Hash Table is FULL!  Increase K!"
+            return
+            
+        self._splitBucket(self.buckets[i])
+        
+        # now that the bucket is split and balanced, try to insert the node again
+        return self.insertNode(node)
+    
+    def justSeenNode(self, id):
+        """call this any time you get a message from a node
+        it will update it in the table if it's there """
+        try:
+            n = self.findNodes(id)[0]
+        except IndexError:
+            return None
+        else:
+            tstamp = n.lastSeen
+            n.updateLastSeen()
+            return tstamp
+    
+    def invalidateNode(self, n):
+        """
+            forget about node n - use when you know that node is invalid
+        """
+        self.replaceStaleNode(n, None)
+    
+    def nodeFailed(self, node):
+        """ call this when a node fails to respond to a message, to invalidate that node """
+        try:
+            n = self.findNodes(node.num)[0]
+        except IndexError:
+            return None
+        else:
+            if n.msgFailed() >= const.MAX_FAILURES:
+                self.invalidateNode(n)
+                        
 class KBucket:
-       __slots__ = ('min', 'max', 'lastAccessed')
-       def __init__(self, contents, min, max):
-               self.l = contents
-               self.min = min
-               self.max = max
-               self.lastAccessed = time.time()
-               
-       def touch(self):
-               self.lastAccessed = time.time()
-       
-       def getNodeWithInt(self, num):
-               if num in self.l: return num
-               else: raise ValueError
-               
-       def __repr__(self):
-               return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
-       
-       ## Comparators    
-       # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
-       # compares integer or node object with the bucket's range
-       def __lt__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.max <= a
-       def __le__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.min < a
-       def __gt__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.min > a
-       def __ge__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.max >= a
-       def __eq__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.min <= a and self.max > a
-       def __ne__(self, a):
-               if isinstance(a, Node): a = a.num
-               return self.min >= a or self.max < a
+    __slots__ = ('min', 'max', 'lastAccessed')
+    def __init__(self, contents, min, max):
+        self.l = contents
+        self.min = min
+        self.max = max
+        self.lastAccessed = time.time()
+        
+    def touch(self):
+        self.lastAccessed = time.time()
+    
+    def getNodeWithInt(self, num):
+        if num in self.l: return num
+        else: raise ValueError
+        
+    def __repr__(self):
+        return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+    
+    ## Comparators    
+    # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
+    # compares integer or node object with the bucket's range
+    def __lt__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.max <= a
+    def __le__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.min < a
+    def __gt__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.min > a
+    def __ge__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.max >= a
+    def __eq__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.min <= a and self.max > a
+    def __ne__(self, a):
+        if isinstance(a, Node): a = a.num
+        return self.min >= a or self.max < a
 
 
 ### UNIT TESTS ###
@@ -216,13 +216,28 @@ class TestKTable(unittest.TestCase):
     def setUp(self):
         self.a = Node().init(hash.newID(), 'localhost', 2002)
         self.t = KTable(self.a)
-        print self.t.buckets[0].l
 
-    def test_replace_stale_node(self):
+    def testAddNode(self):
         self.b = Node().init(hash.newID(), 'localhost', 2003)
-        self.t.replaceStaleNode(self.a, self.b)
-        assert len(self.t.buckets[0].l) == 1
-        assert self.t.buckets[0].l[0].id == self.b.id
+        self.t.insertNode(self.b)
+        self.assertEqual(len(self.t.buckets[0].l), 1)
+        self.assertEqual(self.t.buckets[0].l[0], self.b)
+
+    def testRemove(self):
+        self.testAddNode()
+        self.t.invalidateNode(self.b)
+        self.assertEqual(len(self.t.buckets[0].l), 0)
+
+    def testFail(self):
+        self.testAddNode()
+        for i in range(const.MAX_FAILURES - 1):
+            self.t.nodeFailed(self.b)
+            self.assertEqual(len(self.t.buckets[0].l), 1)
+            self.assertEqual(self.t.buckets[0].l[0], self.b)
+            
+        self.t.nodeFailed(self.b)
+        self.assertEqual(len(self.t.buckets[0].l), 0)
+
 
 if __name__ == "__main__":
     unittest.main()
diff --git a/node.py b/node.py
index 22a4dd9b12b69acfe936021bcfb9b459e309f420..93e16052c2e9d92ddc9f45f8a89c80e4f4d08767 100644 (file)
--- a/node.py
+++ b/node.py
@@ -1,79 +1,78 @@
 import hash
 import time
 from types import *
-from xmlrpclib import Binary
 
 class Node:
-       """encapsulate contact info"""
-       def __init__(self):
-               self.fails = 0
-               self.lastSeen = 0
-               self.id = self.host = self.port = ''
-       
-       def init(self, id, host, port):
-               self.id = id
-               self.num = hash.intify(id)
-               self.host = host
-               self.port = port
-               self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host}
-               return self
-       
-       def initWithDict(self, dict):
-               self._senderDict = dict
-               self.id = dict['id'].decode('base64')
-               self.num = hash.intify(self.id)
-               self.port = dict['port']
-               self.host = dict['host']
-               return self
-       
-       def updateLastSeen(self):
-               self.lastSeen = time.time()
-               self.fails = 0
-       
-       def msgFailed(self):
-               self.fails = self.fails + 1
-               return self.fails
-       
-       def senderDict(self):
-               return self._senderDict
-       
-       def __repr__(self):
-               return `(self.id, self.host, self.port)`
-       
+    """encapsulate contact info"""
+    def __init__(self):
+        self.fails = 0
+        self.lastSeen = 0
+        self.id = self.host = self.port = ''
+    
+    def init(self, id, host, port):
+        self.id = id
+        self.num = hash.intify(id)
+        self.host = host
+        self.port = port
+        self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
+        return self
+    
+    def initWithDict(self, dict):
+        self._senderDict = dict
+        self.id = dict['id']
+        self.num = hash.intify(self.id)
+        self.port = dict['port']
+        self.host = dict['host']
+        return self
+    
+    def updateLastSeen(self):
+        self.lastSeen = time.time()
+        self.fails = 0
+    
+    def msgFailed(self):
+        self.fails = self.fails + 1
+        return self.fails
+    
+    def senderDict(self):
+        return self._senderDict
+    
+    def __repr__(self):
+        return `(self.id, self.host, self.port)`
+    
     ## these comparators let us bisect/index a list full of nodes with either a node or an int/long
-       def __lt__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num < a
-       def __le__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num <= a
-       def __gt__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num > a
-       def __ge__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num >= a
-       def __eq__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num == a
-       def __ne__(self, a):
-               if type(a) == InstanceType:
-                       a = a.num
-               return self.num != a
+    def __lt__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num < a
+    def __le__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num <= a
+    def __gt__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num > a
+    def __ge__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num >= a
+    def __eq__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num == a
+    def __ne__(self, a):
+        if type(a) == InstanceType:
+            a = a.num
+        return self.num != a
 
 
 import unittest
 
 class TestNode(unittest.TestCase):
-       def setUp(self):
-               self.node = Node().init(hash.newID(), 'localhost', 2002)
-       def testUpdateLastSeen(self):
-               t = self.node.lastSeen
-               self.node.updateLastSeen()
-               assert t < self.node.lastSeen
-       
\ No newline at end of file
+    def setUp(self):
+        self.node = Node().init(hash.newID(), 'localhost', 2002)
+    def testUpdateLastSeen(self):
+        t = self.node.lastSeen
+        self.node.updateLastSeen()
+        assert t < self.node.lastSeen
+    
\ No newline at end of file
diff --git a/test.py b/test.py
index 2dc44dd461ae3ee69f3dbf9185adbf29a030a3fe..0dd3f419b2037ea9df867a5da253b32cabe53fac 100644 (file)
--- a/test.py
+++ b/test.py
@@ -2,8 +2,9 @@ import unittest
 
 import ktable, khashmir
 import hash, node, knode
-import actions, xmlrpcclient
+import actions
 import btemplate
+import test_airhook
 
-tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'btemplate', 'actions',  'ktable', 'xmlrpcclient'])
+tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions',  'ktable', 'test_airhook'])
 result = unittest.TextTestRunner().run(tests)
index aabaebc0befb924d406f3ca12b636c0050ea739e..e491d840232cf5f53c228d8fce6abdb01315039a 100644 (file)
@@ -27,28 +27,19 @@ class StreamReceiver(protocol.Protocol):
         self.buf = ""
     def dataReceived(self, data):
         self.buf += data
-        
-class EchoFactory(protocol.Factory):
-    def buildProtocol(self, addr):
-        return Echo()
-class NoisyFactory(protocol.Factory):
-    def buildProtocol(self, addr):
-        return Noisy()
-class ReceiverFactory(protocol.Factory):
-    def buildProtocol(self, addr):
-        return Receiver()
-class StreamReceiverFactory(protocol.Factory):
-    def buildProtocol(self, addr):
-        return StreamReceiver()
-        
+                
 def makeEcho(port):
-    return listenAirhookStream(port, EchoFactory())
+    f = protocol.Factory(); f.protocol = Echo
+    return listenAirhookStream(port, f)
 def makeNoisy(port):
-    return listenAirhookStream(port, NoisyFactory())
+    f = protocol.Factory(); f.protocol = Noisy
+    return listenAirhookStream(port, f)
 def makeReceiver(port):
-    return listenAirhookStream(port, ReceiverFactory())
+    f = protocol.Factory(); f.protocol = Receiver
+    return listenAirhookStream(port, f)
 def makeStreamReceiver(port):
-    return listenAirhookStream(port, StreamReceiverFactory())
+    f = protocol.Factory(); f.protocol = StreamReceiver
+    return listenAirhookStream(port, f)
 
 class DummyTransport:
     def __init__(self):
@@ -662,18 +653,15 @@ class EchoReactorStreamBig(unittest.TestCase):
         self.noisy = 0
         self.a = makeStreamReceiver(2028)
         self.b = makeEcho(2029)
-        self.ac = self.a.connectionForAddr(('127.0.0.1', 2028))
-        self.bc = self.b.connectionForAddr(('127.0.0.1', 2029))
+        self.ac = self.a.connectionForAddr(('127.0.0.1', 2029))
     def testBig(self):
-        msg = open('/dev/urandom').read(4096)
+        msg = open('/dev/urandom').read(256)
         self.ac.write(msg)
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
-        reactor.iterate()
-        reactor.iterate()
         self.assertEqual(self.ac.protocol.buf, msg)
 
         
\ No newline at end of file
diff --git a/test_krpc.py b/test_krpc.py
new file mode 100644 (file)
index 0000000..5f2c5b7
--- /dev/null
@@ -0,0 +1,140 @@
+from unittest import *
+from krpc import *
+from airhook import *
+
+import sys
+
+if __name__ =="__main__":
+    tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+    result = unittest.TextTestRunner().run(tests)
+
+
+def connectionForAddr(host, port):
+    return host
+    
+class Receiver(protocol.Factory):
+    protocol = KRPC
+    def __init__(self):
+        self.buf = []
+    def krpc_store(self, msg, _krpc_sender):
+        self.buf += [msg]
+    def krpc_echo(self, msg, _krpc_sender):
+        return msg
+
+class SimpleTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4040, self.af)
+        self.b = listenAirhookStream(4041, self.bf)
+        
+    def testSimpleMessage(self):
+        self.noisy = 1
+        self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."})
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.bf.buf, ["This is a test."])
+
+class SimpleTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4050, self.af)
+        self.b = listenAirhookStream(4051, self.bf)
+        
+    def testSimpleMessage(self):
+        self.noisy = 1
+        self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.bf.buf, ["This is a test."])
+
+class EchoTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        self.msg = None
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4042, self.af)
+        self.b = listenAirhookStream(4043, self.bf)
+        
+    def testEcho(self):
+        self.noisy = 1
+        df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is a test.")
+
+    def gotMsg(self, msg):
+        self.msg = msg
+
+class MultiEchoTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        self.msg = None
+        
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4048, self.af)
+        self.b = listenAirhookStream(4049, self.bf)
+        
+    def testMultiEcho(self):
+        self.noisy = 1
+        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is a test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is another test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+        df.addCallback(self.gotMsg)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.msg, "This is yet another test.")
+
+    def gotMsg(self, msg):
+        self.msg = msg
+
+class UnknownMethErrTest(TestCase):
+    def setUp(self):
+        self.noisy = 0
+        self.err = None
+        self.af = Receiver()
+        self.bf = Receiver()        
+        self.a = listenAirhookStream(4044, self.af)
+        self.b = listenAirhookStream(4045, self.bf)
+        
+    def testUnknownMeth(self):
+        self.noisy = 1
+        df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."})
+        df.addErrback(self.gotErr)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN)
+
+    def gotErr(self, err):
+        self.err = err.value
diff --git a/util.py b/util.py
index ea1e7f27d9bd3b6108bb261d174d310001dc4fd2..633772ad8a159db7a4833ee700e276367bb8bd62 100644 (file)
--- a/util.py
+++ b/util.py
@@ -1,20 +1,20 @@
 def bucket_stats(l):
-       """given a list of khashmir instances, finds min, max, and average number of nodes in tables"""
-       max = avg = 0
-       min = None
-       def count(buckets):
-               c = 0
-               for bucket in buckets:
-                       c = c + len(bucket.l)
-               return c
-       for node in l:
-               c = count(node.table.buckets)
-               if min == None:
-                       min = c
-               elif c < min:
-                       min = c
-               if c > max:
-                       max = c
-               avg = avg + c
-       avg = avg / len(l)
-       return {'min':min, 'max':max, 'avg':avg}
+    """given a list of khashmir instances, finds min, max, and average number of nodes in tables"""
+    max = avg = 0
+    min = None
+    def count(buckets):
+        c = 0
+        for bucket in buckets:
+            c = c + len(bucket.l)
+        return c
+    for node in l:
+        c = count(node.table.buckets)
+        if min == None:
+            min = c
+        elif c < min:
+            min = c
+        if c > max:
+            max = c
+        avg = avg + c
+    avg = avg / len(l)
+    return {'min':min, 'max':max, 'avg':avg}
diff --git a/xmlrpcclient.py b/xmlrpcclient.py
deleted file mode 100644 (file)
index f8d33b2..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-from twisted.internet.protocol import ClientFactory
-from twisted.protocols.http import HTTPClient
-from twisted.internet.defer import Deferred
-
-from xmlrpclib import loads, dumps
-import socket
-
-USER_AGENT = 'Python/Twisted XMLRPC 0.1'
-class XMLRPCClient(HTTPClient):
-    def connectionMade(self):
-       payload = dumps(self.args, self.method)
-       self.sendCommand('POST', '/RPC2')
-       self.sendHeader('User-Agent', USER_AGENT)
-       self.sendHeader('Content-Type', 'text/xml')
-       self.sendHeader('Content-Length', len(payload))
-       self.endHeaders()
-       self.transport.write(payload)
-        self.transport.write('\r\n')
-       
-    def handleResponse(self, buf):
-       try:
-           args, name = loads(buf)
-       except Exception, e:
-           print "response decode error: " + `e`
-           self.d.errback()
-       else:
-           apply(self.d.callback, args)
-
-class XMLRPCClientFactory(ClientFactory):
-    def __init__(self, method, args, callback=None, errback=None):
-       self.method = method
-       self.args = args
-       self.d = Deferred()
-       if callback:
-           self.d.addCallback(callback)
-       if errback:
-           self.d.addErrback(errback)
-        self.noisy = 0
-        
-    def buildProtocol(self, addr):
-        prot =  XMLRPCClient()
-       prot.method = self.method
-       prot.args = self.args
-       prot.d = self.d
-       return prot
-
-    def clientConnectionFailed(self, connector, reason):
-       self.d.errback()
\ No newline at end of file