ripped out xmlrpc, experimented with xmlrpc but with bencode, finally
[quix0rs-apt-p2p.git] / khashmir.py
index 3ca5d7250d0cfb82c0529114606e7e62ed04ebd8..f3e410aa3087b1b8fc94772f0b3e0eeefe6a981e 100644 (file)
@@ -12,332 +12,325 @@ from knode import KNode as Node
 
 from hash import newID, newIDInRange
 
-from actions import FindNode, GetValue, KeyExpirer
-from twisted.web import xmlrpc
+from actions import FindNode, GetValue, KeyExpirer, StoreValue
+import krpc
+import airhook
+
 from twisted.internet.defer import Deferred
+from twisted.internet import protocol
 from twisted.python import threadable
 from twisted.internet.app import Application
 from twisted.web import server
 threadable.init()
+import sys
 
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
 import pysqlite_exceptions
 
-KhashmirDBExcept = "KhashmirDBExcept"
+class KhashmirDBExcept(Exception):
+    pass
 
 # this is the main class!
-class Khashmir(xmlrpc.XMLRPC):
-       __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
-       def __init__(self, host, port, db='khashmir.db'):
-               self.setup(host, port, db)
-               
-       def setup(self, host, port, db='khashmir.db'):
-               self._findDB(db)
-               self.node = self._loadSelfNode(host, port)
-               self.table = KTable(self.node)
-               self._loadRoutingTable()
-               self.app = Application("xmlrpc")
-               self.app.listenTCP(port, server.Site(self))
-               self.last = time.time()
-               KeyExpirer(store=self.store)
-               #self.refreshTable(force=1)
-               reactor.callLater(60, self.checkpoint, (1,))
-               
-       def _loadSelfNode(self, host, port):
-               c = self.store.cursor()
-               c.execute('select id from self where num = 0;')
-               if c.rowcount > 0:
-                       id = c.fetchone()[0].decode('base64')
-               else:
-                       id = newID()
-               return Node().init(id, host, port)
-               
-       def _saveSelfNode(self):
-               self.store.autocommit = 0
-               c = self.store.cursor()
-               c.execute('delete from self where num = 0;')
-               c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
-               self.store.commit()
-               self.store.autocommit = 1
-               
-       def checkpoint(self, auto=0):
-               self._saveSelfNode()
-               self._dumpRoutingTable()
-               if auto:
-                       reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
-               
-       def _findDB(self, db):
-               import os
-               try:
-                       os.stat(db)
-               except OSError:
-                       self._createNewDB(db)
-               else:
-                       self._loadDB(db)
-           
-       def _loadDB(self, db):
-               try:
-                       self.store = sqlite.connect(db=db)
-                       self.store.autocommit = 1
-               except:
-                       import traceback
-                       raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
-           
-       def _createNewDB(self, db):
-               self.store = sqlite.connect(db=db)
-               self.store.autocommit = 1
-               s = """
-                       create table kv (key text, value text, time timestamp, primary key (key, value));
-                       create index kv_key on kv(key);
-                       create index kv_timestamp on kv(time);
-                       
-                       create table nodes (id text primary key, host text, port number);
-                       
-                       create table self (num number primary key, id text);
-                       """
-               c = self.store.cursor()
-               c.execute(s)
-
-       def _dumpRoutingTable(self):
-               """
-                       save routing table nodes to the database
-               """
-               self.store.autocommit = 0;
-               c = self.store.cursor()
-               c.execute("delete from nodes where id not NULL;")
-               for bucket in self.table.buckets:
-                       for node in bucket.l:
-                               d = node.senderDict()
-                               c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
-               self.store.commit()
-               self.store.autocommit = 1;
-               
-       def _loadRoutingTable(self):
-               """
-                       load routing table nodes from database
-                       it's usually a good idea to call refreshTable(force=1) after loading the table
-               """
-               c = self.store.cursor()
-               c.execute("select * from nodes;")
-               for rec in c.fetchall():
-                       n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
-                       self.table.insertNode(n, contacted=0)
-                       
-       def render(self, request):
-               """
-                       Override the built in render so we can have access to the request object!
-                       note, crequest is probably only valid on the initial call (not after deferred!)
-               """
-               self.crequest = request
-               return xmlrpc.XMLRPC.render(self, request)
+class Khashmir(protocol.Factory):
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+    protocol = krpc.KRPC
+    def __init__(self, host, port, db='khashmir.db'):
+        self.setup(host, port, db)
+        
+    def setup(self, host, port, db='khashmir.db'):
+        self._findDB(db)
+        self.node = self._loadSelfNode(host, port)
+        self.table = KTable(self.node)
+        self.app = Application("krpc")
+        self.airhook = airhook.listenAirhookStream(port, self)
+        self.last = time.time()
+        self._loadRoutingTable()
+        KeyExpirer(store=self.store)
+        #self.refreshTable(force=1)
+        reactor.callLater(60, self.checkpoint, (1,))
+        
+    def _loadSelfNode(self, host, port):
+        c = self.store.cursor()
+        c.execute('select id from self where num = 0;')
+        if c.rowcount > 0:
+            id = c.fetchone()[0].decode('hex')
+        else:
+            id = newID()
+        return Node().init(id, host, port)
+        
+    def _saveSelfNode(self):
+        self.store.autocommit = 0
+        c = self.store.cursor()
+        c.execute('delete from self where num = 0;')
+        c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+        self.store.commit()
+        self.store.autocommit = 1
+        
+    def checkpoint(self, auto=0):
+        self._saveSelfNode()
+        self._dumpRoutingTable()
+        if auto:
+            reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+        
+    def _findDB(self, db):
+        import os
+        try:
+            os.stat(db)
+        except OSError:
+            self._createNewDB(db)
+        else:
+            self._loadDB(db)
+        
+    def _loadDB(self, db):
+        try:
+            self.store = sqlite.connect(db=db)
+            self.store.autocommit = 1
+        except:
+            import traceback
+            raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+        
+    def _createNewDB(self, db):
+        self.store = sqlite.connect(db=db)
+        self.store.autocommit = 1
+        s = """
+            create table kv (key text, value text, time timestamp, primary key (key, value));
+            create index kv_key on kv(key);
+            create index kv_timestamp on kv(time);
+            
+            create table nodes (id text primary key, host text, port number);
+            
+            create table self (num number primary key, id text);
+            """
+        c = self.store.cursor()
+        c.execute(s)
 
+    def _dumpRoutingTable(self):
+        """
+            save routing table nodes to the database
+        """
+        self.store.autocommit = 0;
+        c = self.store.cursor()
+        c.execute("delete from nodes where id not NULL;")
+        for bucket in self.table.buckets:
+            for node in bucket.l:
+                d = node.senderDict()
+                c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+        self.store.commit()
+        self.store.autocommit = 1;
+        
+    def _loadRoutingTable(self):
+        """
+            load routing table nodes from database
+            it's usually a good idea to call refreshTable(force=1) after loading the table
+        """
+        c = self.store.cursor()
+        c.execute("select * from nodes;")
+        for rec in c.fetchall():
+            n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.airhook.connectionForAddr((n.host, n.port))
+            self.table.insertNode(n, contacted=0)
+            
 
-       #######
-       #######  LOCAL INTERFACE    - use these methods!
-       def addContact(self, host, port):
-               """
-                       ping this node and add the contact info to the table on pong!
-               """
-               n =Node().init(const.NULL_ID, host, port)  # note, we 
-               self.sendPing(n)
+    #######
+    #######  LOCAL INTERFACE    - use these methods!
+    def addContact(self, host, port, callback=None):
+        """
+            ping this node and add the contact info to the table on pong!
+        """
+        n =Node().init(const.NULL_ID, host, port) 
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.sendPing(n, callback=callback)
 
-       ## this call is async!
-       def findNode(self, id, callback, errback=None):
-               """ returns the contact info for node, or the k closest nodes, from the global table """
-               # get K nodes out of local table/cache, or the node we want
-               nodes = self.table.findNodes(id)
-               d = Deferred()
-               if errback:
-                       d.addCallbacks(callback, errback)
-               else:
-                       d.addCallback(callback)
-               if len(nodes) == 1 and nodes[0].id == id :
-                       d.callback(nodes)
-               else:
-                       # create our search state
-                       state = FindNode(self, id, d.callback)
-                       reactor.callFromThread(state.goWithNodes, nodes)
-       
-       
-       ## also async
-       def valueForKey(self, key, callback):
-               """ returns the values found for key in global table
-                       callback will be called with a list of values for each peer that returns unique values
-                       final callback will be an empty list - probably should change to 'more coming' arg
-               """
-               nodes = self.table.findNodes(key)
-               
-               # get locals
-               l = self.retrieveValues(key)
-               if len(l) > 0:
-                       reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
-               
-               # create our search state
-               state = GetValue(self, key, callback)
-               reactor.callFromThread(state.goWithNodes, nodes, l)
+    ## this call is async!
+    def findNode(self, id, callback, errback=None):
+        """ returns the contact info for node, or the k closest nodes, from the global table """
+        # get K nodes out of local table/cache, or the node we want
+        nodes = self.table.findNodes(id)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+        if len(nodes) == 1 and nodes[0].id == id :
+            d.callback(nodes)
+        else:
+            # create our search state
+            state = FindNode(self, id, d.callback)
+            reactor.callFromThread(state.goWithNodes, nodes)
+    
+    
+    ## also async
+    def valueForKey(self, key, callback):
+        """ returns the values found for key in global table
+            callback will be called with a list of values for each peer that returns unique values
+            final callback will be an empty list - probably should change to 'more coming' arg
+        """
+        nodes = self.table.findNodes(key)
+        
+        # get locals
+        l = self.retrieveValues(key)
+        
+        # create our search state
+        state = GetValue(self, key, callback)
+        reactor.callFromThread(state.goWithNodes, nodes, l)
 
-       ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-       def storeValueForKey(self, key, value, callback=None):
-               """ stores the value for key in the global table, returns immediately, no status 
-                       in this implementation, peers respond but don't indicate status to storing values
-                       a key can have many values
-               """
-               def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
-                       if not response:
-                               # default callback
-                               def _storedValueHandler(sender):
-                                       pass
-                               response=_storedValueHandler
-               
-                       for node in nodes[:const.STORE_REDUNDANCY]:
-                               def cb(t, table = table, node=node, resp=response):
-                                       self.table.insertNode(node)
-                                       response(t)
-                               if node.id != self.node.id:
-                                       def default(err, node=node, table=table):
-                                               table.nodeFailed(node)
-                                       df = node.storeValue(key, value, self.node.senderDict())
-                                       df.addCallbacks(cb, default)
-               # this call is asynch
-               self.findNode(key, _storeValueForKey)
-       
-       
-       def insertNode(self, n, contacted=1):
-               """
-               insert a node in our local table, pinging oldest contact in bucket, if necessary
-               
-               If all you have is a host/port, then use addContact, which calls this method after
-               receiving the PONG from the remote node.  The reason for the seperation is we can't insert
-               a node into the table without it's peer-ID.  That means of course the node passed into this
-               method needs to be a properly formed Node object with a valid ID.
-               """
-               old = self.table.insertNode(n, contacted=contacted)
-               if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
-                       # the bucket is full, check to see if old node is still around and if so, replace it
-                       
-                       ## these are the callbacks used when we ping the oldest node in a bucket
-                       def _staleNodeHandler(oldnode=old, newnode = n):
-                               """ called if the pinged node never responds """
-                               self.table.replaceStaleNode(old, newnode)
-                       
-                       def _notStaleNodeHandler(sender, old=old):
-                               """ called when we get a pong from the old node """
-                               args, sender = sender
-                               sender = Node().initWithDict(sender)
-                               if sender.id == old.id:
-                                       self.table.justSeenNode(old)
-                       
-                       df = old.ping(self.node.senderDict())
-                       df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
+        """ stores the value for key in the global table, returns immediately, no status 
+            in this implementation, peers respond but don't indicate status to storing values
+            a key can have many values
+        """
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+            if not response:
+                # default callback
+                def _storedValueHandler(sender):
+                    pass
+                response=_storedValueHandler
+            action = StoreValue(self.table, key, value, response)
+            reactor.callFromThread(action.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findNode(key, _storeValueForKey)
+        
+    
+    def insertNode(self, n, contacted=1):
+        """
+        insert a node in our local table, pinging oldest contact in bucket, if necessary
+        
+        If all you have is a host/port, then use addContact, which calls this method after
+        receiving the PONG from the remote node.  The reason for the seperation is we can't insert
+        a node into the table without it's peer-ID.  That means of course the node passed into this
+        method needs to be a properly formed Node object with a valid ID.
+        """
+        old = self.table.insertNode(n, contacted=contacted)
+        if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+            # the bucket is full, check to see if old node is still around and if so, replace it
+            
+            ## these are the callbacks used when we ping the oldest node in a bucket
+            def _staleNodeHandler(oldnode=old, newnode = n):
+                """ called if the pinged node never responds """
+                self.table.replaceStaleNode(old, newnode)
+            
+            def _notStaleNodeHandler(dict, old=old):
+                """ called when we get a pong from the old node """
+                sender = dict['sender']
+                if sender['id'] == old.id:
+                    self.table.justSeenNode(old.id)
+            
+            df = old.ping(self.node.senderDict())
+            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
-       def sendPing(self, node):
-               """
-                       ping a node
-               """
-               df = node.ping(self.node.senderDict())
-               ## these are the callbacks we use when we issue a PING
-               def _pongHandler(args, node=node, table=self.table):
-                       l, sender = args
-                       if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
-                               # whoah, got response from different peer than we were expecting
-                               self.table.invalidateNode(node)
-                       else:
-                               sender['host'] = node.host
-                               sender['port'] = node.port
-                               n = Node().initWithDict(sender)
-                               table.insertNode(n)
-                               return
-               def _defaultPong(err, node=node, table=self.table):
-                       table.nodeFailed(node)
-               
-               df.addCallbacks(_pongHandler,_defaultPong)
+    def sendPing(self, node, callback=None):
+        """
+            ping a node
+        """
+        df = node.ping(self.node.senderDict())
+        ## these are the callbacks we use when we issue a PING
+        def _pongHandler(dict, node=node, table=self.table, callback=callback):
+            sender = dict['sender']
+            if node.id != const.NULL_ID and node.id != sender['id']:
+                # whoah, got response from different peer than we were expecting
+                self.table.invalidateNode(node)
+            else:
+                sender['host'] = node.host
+                sender['port'] = node.port
+                n = Node().initWithDict(sender)
+                n.conn = self.airhook.connectionForAddr((n.host, n.port))
+                table.insertNode(n)
+                if callback:
+                    callback()
+        def _defaultPong(err, node=node, table=self.table, callback=callback):
+            table.nodeFailed(node)
+            if callback:
+                callback()
+        
+        df.addCallbacks(_pongHandler,_defaultPong)
 
-       def findCloseNodes(self, callback=lambda a: None):
-               """
-                       This does a findNode on the ID one away from our own.  
-                       This will allow us to populate our table with nodes on our network closest to our own.
-                       This is called as soon as we start up with an empty table
-               """
-               id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-               self.findNode(id, callback)
+    def findCloseNodes(self, callback=lambda a: None):
+        """
+            This does a findNode on the ID one away from our own.  
+            This will allow us to populate our table with nodes on our network closest to our own.
+            This is called as soon as we start up with an empty table
+        """
+        id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+        self.findNode(id, callback)
 
-       def refreshTable(self, force=0):
-               """
-                       force=1 will refresh table regardless of last bucket access time
-               """
-               def callback(nodes):
-                       pass
-       
-               for bucket in self.table.buckets:
-                       if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
-                               id = newIDInRange(bucket.min, bucket.max)
-                               self.findNode(id, callback)
+    def refreshTable(self, force=0):
+        """
+            force=1 will refresh table regardless of last bucket access time
+        """
+        def callback(nodes):
+            pass
+    
+        for bucket in self.table.buckets:
+            if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+                id = newIDInRange(bucket.min, bucket.max)
+                self.findNode(id, callback)
 
 
-       def retrieveValues(self, key):
-               s = "select value from kv where key = '%s';" % key.encode('base64')
-               c = self.store.cursor()
-               c.execute(s)
-               t = c.fetchone()
-               l = []
-               while t:
-                       l.append(t['value'])
-                       t = c.fetchone()
-               return l
-       
-       #####
-       ##### INCOMING MESSAGE HANDLERS
-       
-       def xmlrpc_ping(self, sender):
-               """
-                       takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-                       returns sender dict
-               """
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return (), self.node.senderDict()
-               
-       def xmlrpc_find_node(self, target, sender):
-               nodes = self.table.findNodes(target.decode('base64'))
-               nodes = map(lambda node: node.senderDict(), nodes)
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return nodes, self.node.senderDict()
-           
-       def xmlrpc_store_value(self, key, value, sender):
-               t = "%0.6f" % time.time()
-               s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
-               c = self.store.cursor()
-               try:
-                       c.execute(s)
-               except pysqlite_exceptions.IntegrityError, reason:
-                       # update last insert time
-                       s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
-                       c.execute(s)
-               ip = self.crequest.getClientIP()
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-               return (), self.node.senderDict()
-       
-       def xmlrpc_find_value(self, key, sender):
-               ip = self.crequest.getClientIP()
-               key = key.decode('base64')
-               sender['host'] = ip
-               n = Node().initWithDict(sender)
-               self.insertNode(n, contacted=0)
-       
-               l = self.retrieveValues(key)
-               if len(l) > 0:
-                       return {'values' : l}, self.node.senderDict()
-               else:
-                       nodes = self.table.findNodes(key)
-                       nodes = map(lambda node: node.senderDict(), nodes)
-                       return {'nodes' : nodes}, self.node.senderDict()
+    def retrieveValues(self, key):
+        s = "select value from kv where key = '%s';" % key.encode('hex')
+        c = self.store.cursor()
+        c.execute(s)
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'].decode('base64'))
+            t = c.fetchone()
+        return l
+    
+    #####
+    ##### INCOMING MESSAGE HANDLERS
+    
+    def krpc_ping(self, sender, _krpc_sender):
+        """
+            takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+            returns sender dict
+        """
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"sender" : self.node.senderDict()}
+        
+    def krpc_find_node(self, target, sender, _krpc_sender):
+        nodes = self.table.findNodes(target)
+        nodes = map(lambda node: node.senderDict(), nodes)
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"nodes" : nodes, "sender" : self.node.senderDict()}
+            
+    def krpc_store_value(self, key, value, sender, _krpc_sender):
+        t = "%0.6f" % time.time()
+        s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
+        c = self.store.cursor()
+        try:
+            c.execute(s)
+        except pysqlite_exceptions.IntegrityError, reason:
+            # update last insert time
+            s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
+            c.execute(s)
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"sender" : self.node.senderDict()}
+    
+    def krpc_find_value(self, key, sender, _krpc_sender):
+        sender['host'] = _krpc_sender[0]
+        n = Node().initWithDict(sender)
+        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+    
+        l = self.retrieveValues(key)
+        if len(l) > 0:
+            return {'values' : l, "sender": self.node.senderDict()}
+        else:
+            nodes = self.table.findNodes(key)
+            nodes = map(lambda node: node.senderDict(), nodes)
+            return {'nodes' : nodes, "sender": self.node.senderDict()}
 
 ### TESTING ###
 from random import randrange
@@ -347,129 +340,151 @@ from hash import newID
 
 
 def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
-       import thread
-       l = []
-       for i in xrange(peers):
-               a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
-               l.append(a)
-       thread.start_new_thread(l[0].app.run, ())
-       for peer in l[1:]:
-               peer.app.run()  
-       return l
-       
-def test_build_net(quiet=0, peers=24, host='localhost',  pause=0, startport=2001, dbprefix='/tmp/test'):
-       from whrandom import randrange
-       import threading
-       import thread
-       import sys
-       port = startport
-       l = []
-       if not quiet:
-               print "Building %s peer table." % peers
-       
-       for i in xrange(peers):
-               a = Khashmir(host, port + i, db = dbprefix +`i`)
-               l.append(a)
-       
-       
-       thread.start_new_thread(l[0].app.run, ())
-       time.sleep(1)
-       for peer in l[1:]:
-               peer.app.run()
-       time.sleep(3)
-       
-       print "adding contacts...."
-       
-       for peer in l:
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               n = l[randrange(0, len(l))].node
-               peer.addContact(host, n.port)
-               if pause:
-                       time.sleep(.33)
-       
-       time.sleep(10)
-       print "finding close nodes...."
-       
-       for peer in l:
-               flag = threading.Event()
-               def cb(nodes, f=flag):
-                       f.set()
-               peer.findCloseNodes(cb)
-               flag.wait()
-       #    for peer in l:
-       #       peer.refreshTable()
-       return l
+    import thread
+    l = []
+    for i in xrange(peers):
+        a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
+        l.append(a)
+    thread.start_new_thread(l[0].app.run, ())
+    for peer in l[1:]:
+        peer.app.run() 
+    return l
+    
+def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
+    from whrandom import randrange
+    import threading
+    import thread
+    import sys
+    port = startport
+    l = []
+    if not quiet:
+        print "Building %s peer table." % peers
+    
+    for i in xrange(peers):
+        a = Khashmir(host, port + i, db = dbprefix +`i`)
+        l.append(a)
+    
+    
+    thread.start_new_thread(l[0].app.run, ())
+    time.sleep(1)
+    for peer in l[1:]:
+        peer.app.run()
+    #time.sleep(3)
+    
+    def spewer(frame, s, ignored):
+        from twisted.python import reflect
+        if frame.f_locals.has_key('self'):
+            se = frame.f_locals['self']
+            print 'method %s of %s at %s' % (
+                frame.f_code.co_name, reflect.qual(se.__class__), id(se)
+                )
+    #sys.settrace(spewer)
+
+    print "adding contacts...."
+    def makecb(flag):
+        def cb(f=flag):
+            f.set()
+        return cb
+
+    for peer in l:
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+        p = l[randrange(0, len(l))]
+        if p != peer:
+            n = p.node
+            flag = threading.Event()
+            peer.addContact(host, n.port, makecb(flag))
+            flag.wait()
+    
+    print "finding close nodes...."
+    
+    for peer in l:
+        flag = threading.Event()
+        def cb(nodes, f=flag):
+            f.set()
+        peer.findCloseNodes(cb)
+        flag.wait()
+    #    for peer in l:
+    #  peer.refreshTable()
+    return l
         
 def test_find_nodes(l, quiet=0):
-       flag = threading.Event()
-       
-       n = len(l)
-       
-       a = l[randrange(0,n)]
-       b = l[randrange(0,n)]
-       
-       def callback(nodes, flag=flag, id = b.node.id):
-               if (len(nodes) >0) and (nodes[0].id == id):
-                       print "test_find_nodes  PASSED"
-               else:
-                       print "test_find_nodes  FAILED"
-               flag.set()
-       a.findNode(b.node.id, callback)
-       flag.wait()
+    flag = threading.Event()
+    
+    n = len(l)
+    
+    a = l[randrange(0,n)]
+    b = l[randrange(0,n)]
+    
+    def callback(nodes, flag=flag, id = b.node.id):
+        if (len(nodes) >0) and (nodes[0].id == id):
+            print "test_find_nodes     PASSED"
+        else:
+            print "test_find_nodes     FAILED"
+        flag.set()
+    a.findNode(b.node.id, callback)
+    flag.wait()
     
 def test_find_value(l, quiet=0):
-       
-       fa = threading.Event()
-       fb = threading.Event()
-       fc = threading.Event()
-       
-       n = len(l)
-       a = l[randrange(0,n)]
-       b = l[randrange(0,n)]
-       c = l[randrange(0,n)]
-       d = l[randrange(0,n)]
-       
-       key = newID()
-       value = newID()
-       if not quiet: print "inserting value..."
-       a.storeValueForKey(key, value)
-       time.sleep(3)
-       if not quiet:
-               print "finding..."
-       
-       class cb:
-               def __init__(self, flag, value=value):
-                       self.flag = flag
-                       self.val = value
-                       self.found = 0
-               def callback(self, values):
-                       try:
-                               if(len(values) == 0):
-                                       if not self.found:
-                                               print "find                NOT FOUND"
-                                       else:
-                                               print "find                FOUND"
-                               else:
-                                       if self.val in values:
-                                               self.found = 1
-                       finally:
-                               self.flag.set()
-       
-       b.valueForKey(key, cb(fa).callback)
-       fa.wait()
-       c.valueForKey(key, cb(fb).callback)
-       fb.wait()
-       d.valueForKey(key, cb(fc).callback)    
-       fc.wait()
+    
+    fa = threading.Event()
+    fb = threading.Event()
+    fc = threading.Event()
+    
+    n = len(l)
+    a = l[randrange(0,n)]
+    b = l[randrange(0,n)]
+    c = l[randrange(0,n)]
+    d = l[randrange(0,n)]
+    
+    key = newID()
+    value = newID()
+    if not quiet: print "inserting value..."
+    a.storeValueForKey(key, value)
+    time.sleep(3)
+    if not quiet:
+        print "finding..."
+    
+    class cb:
+        def __init__(self, flag, value=value):
+            self.flag = flag
+            self.val = value
+            self.found = 0
+        def callback(self, values):
+            try:
+                if(len(values) == 0):
+                    if not self.found:
+                        print "find                NOT FOUND"
+                    else:
+                        print "find                FOUND"
+                else:
+                    if self.val in values:
+                        self.found = 1
+            finally:
+                self.flag.set()
+    
+    b.valueForKey(key, cb(fa).callback)
+    fa.wait()
+    c.valueForKey(key, cb(fb).callback)
+    fb.wait()
+    d.valueForKey(key, cb(fc).callback)    
+    fc.wait()
     
 def test_one(host, port, db='/tmp/test'):
-       import thread
-       k = Khashmir(host, port, db)
-       thread.start_new_thread(k.app.run, ())
-       return k
+    import thread
+    k = Khashmir(host, port, db)
+    thread.start_new_thread(reactor.run, ())
+    return k
     
 if __name__ == "__main__":
     import sys
@@ -479,7 +494,7 @@ if __name__ == "__main__":
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):
-               test_find_nodes(l)
+        test_find_nodes(l)
     print "inserting and fetching values..."
     for i in range(10):
-               test_find_value(l)
+        test_find_value(l)