]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
major cleanup, updated for twisted
authorburris <burris>
Mon, 14 Jun 2004 00:21:57 +0000 (00:21 +0000)
committerburris <burris>
Mon, 14 Jun 2004 00:21:57 +0000 (00:21 +0000)
not using airhook, it's buggy   now KRPC over single UDP packets
values can't be too big, around 1400 bytes

khashmir test code is now more sane though it doesn't do much more
than build a couple of tables and fetch some values

12 files changed:
.cvsignore [new file with mode: 0644]
actions.py
const.py
hash.py [deleted file]
khash.py [new file with mode: 0644]
khashmir.py
knode.py
krpc.py
ktable.py
node.py
test_khashmir.py [new file with mode: 0644]
test_krpc.py

diff --git a/.cvsignore b/.cvsignore
new file mode 100644 (file)
index 0000000..0205d62
--- /dev/null
@@ -0,0 +1,2 @@
+*.pyc
+.DS_Store
index 75b01574a5222404276c5590feb7dca11e0b3aaa..9e77196b7138355be0db415194a8e10b05e9f3cd 100644 (file)
@@ -6,7 +6,7 @@ from time import time
 from const import reactor
 import const
 
-from hash import intify
+from khash import intify
 from knode import KNode as Node
 from ktable import KTable, K
 
@@ -49,7 +49,7 @@ class FindNode(ActionBase):
         sender = dict["sender"]
         sender['port'] = _krpc_sender[1]        
         sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -58,7 +58,7 @@ class FindNode(ActionBase):
         self.answered[sender.id] = 1
         for node in l:
             n = Node().initWithDict(node)
-            n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+            n.conn = self.table.udp.connectionForAddr((n.host, n.port))
             if not self.found.has_key(n.id):
                 self.found[n.id] = n
         self.schedule()
@@ -120,7 +120,7 @@ class GetValue(FindNode):
         sender = dict["sender"]
         sender['port'] = _krpc_sender[1]        
         sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -132,7 +132,7 @@ class GetValue(FindNode):
         if dict.has_key('nodes'):
             for node in dict['nodes']:
                 n = Node().initWithDict(node)
-                n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+                n.conn = self.table.udp.connectionForAddr((n.host, n.port))
                 if not self.found.has_key(n.id):
                     self.found[n.id] = n
         elif dict.has_key('values'):
index b91996a7813ce085fe5940321ae0a74aa9db98e2..1a42485421ac7c2fd33ffb5d5535a460281e9990 100644 (file)
--- a/const.py
+++ b/const.py
@@ -1,6 +1,7 @@
 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
+"""
 from twisted.internet.default import SelectReactor ## twistedmatrix.com
 
 reactor = SelectReactor()
@@ -14,6 +15,8 @@ try:
     reactor.installResolver(twisted.names.client.theResolver)
 except IOError:
     print "no resolv.conf!"
+"""
+from twisted.internet import reactor
 
 # magic id to use before we know a peer's id
 NULL_ID =  20 * '\0'
diff --git a/hash.py b/hash.py
deleted file mode 100644 (file)
index 441c58a..0000000
--- a/hash.py
+++ /dev/null
@@ -1,103 +0,0 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-from sha import sha
-import whrandom
-
-
-def intify(hstr):
-    """20 bit hash, big-endian -> long python integer"""
-    assert len(hstr) == 20
-    return long(hstr.encode('hex'), 16)
-
-def stringify(num):
-    """long int -> 20-character string"""
-    str = hex(num)[2:]
-    if str[-1] == 'L':
-        str = str[:-1]
-    if len(str) % 2 != 0:
-        str = '0' + str
-    str = str.decode('hex')
-    return (20 - len(str)) *'\x00' + str
-    
-def distance(a, b):
-    """distance between two 160-bit hashes expressed as 20-character strings"""
-    return intify(a) ^ intify(b)
-
-
-def newID():
-    """returns a new pseudorandom globally unique ID string"""
-    h = sha()
-    for i in range(20):
-        h.update(chr(whrandom.randint(0,255)))
-    return h.digest()
-
-def newIDInRange(min, max):
-    return stringify(randRange(min,max))
-    
-def randRange(min, max):
-    return min + intify(newID()) % (max - min)
-    
-
-### Test Cases ###
-import unittest
-
-class NewID(unittest.TestCase):
-    def testLength(self):
-        self.assertEqual(len(newID()), 20)
-    def testHundreds(self):
-        for x in xrange(100):
-            self.testLength
-
-class Intify(unittest.TestCase):
-    known = [('\0' * 20, 0),
-            ('\xff' * 20, 2L**160 - 1),
-            ]
-    def testKnown(self):
-        for str, value in self.known: 
-            self.assertEqual(intify(str),  value)
-    def testEndianessOnce(self):
-        h = newID()
-        while h[-1] == '\xff':
-            h = newID()
-        k = h[:-1] + chr(ord(h[-1]) + 1)
-        self.assertEqual(intify(k) - intify(h), 1)
-    def testEndianessLots(self):
-        for x in xrange(100):
-            self.testEndianessOnce()
-
-class Disantance(unittest.TestCase):
-    known = [
-            (("\0" * 20, "\xff" * 20), 2**160L -1),
-            ((sha("foo").digest(), sha("foo").digest()), 0),
-            ((sha("bar").digest(), sha("bar").digest()), 0)
-            ]
-    def testKnown(self):
-        for pair, dist in self.known:
-            self.assertEqual(distance(pair[0], pair[1]), dist)
-    def testCommutitive(self):
-        for i in xrange(100):
-            x, y, z = newID(), newID(), newID()
-            self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
-        
-class RandRange(unittest.TestCase):
-    def testOnce(self):
-        a = intify(newID())
-        b = intify(newID())
-        if a < b:
-            c = randRange(a, b)
-            self.assertEqual(a <= c < b, 1, "output out of range %d  %d  %d" % (b, c, a))
-        else:
-            c = randRange(b, a)
-            assert b <= c < a, "output out of range %d  %d  %d" % (b, c, a)
-
-    def testOneHundredTimes(self):
-        for i in xrange(100):
-            self.testOnce()
-
-
-
-if __name__ == '__main__':
-    unittest.main()   
-
-    
diff --git a/khash.py b/khash.py
new file mode 100644 (file)
index 0000000..deb8d35
--- /dev/null
+++ b/khash.py
@@ -0,0 +1,113 @@
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
+from sha import sha
+import whrandom
+
+#this is ugly, hopefully os.entropy will be in 2.4
+try:
+    from entropy import entropy
+except ImportError:
+    def entropy(n):
+        s = ''
+        for i in range(n):
+            s += chr(whrandom.randint(0,255))
+        return s        
+
+def intify(hstr):
+    """20 bit hash, big-endian -> long python integer"""
+    assert len(hstr) == 20
+    return long(hstr.encode('hex'), 16)
+
+def stringify(num):
+    """long int -> 20-character string"""
+    str = hex(num)[2:]
+    if str[-1] == 'L':
+        str = str[:-1]
+    if len(str) % 2 != 0:
+        str = '0' + str
+    str = str.decode('hex')
+    return (20 - len(str)) *'\x00' + str
+    
+def distance(a, b):
+    """distance between two 160-bit hashes expressed as 20-character strings"""
+    return intify(a) ^ intify(b)
+
+
+def newID():
+    """returns a new pseudorandom globally unique ID string"""
+    h = sha()
+    h.update(entropy(20))
+    return h.digest()
+
+def newIDInRange(min, max):
+    return stringify(randRange(min,max))
+    
+def randRange(min, max):
+    return min + intify(newID()) % (max - min)
+    
+def newTID():
+    return randRange(-2**30, 2**30)
+
+### Test Cases ###
+import unittest
+
+class NewID(unittest.TestCase):
+    def testLength(self):
+        self.assertEqual(len(newID()), 20)
+    def testHundreds(self):
+        for x in xrange(100):
+            self.testLength
+
+class Intify(unittest.TestCase):
+    known = [('\0' * 20, 0),
+            ('\xff' * 20, 2L**160 - 1),
+            ]
+    def testKnown(self):
+        for str, value in self.known: 
+            self.assertEqual(intify(str),  value)
+    def testEndianessOnce(self):
+        h = newID()
+        while h[-1] == '\xff':
+            h = newID()
+        k = h[:-1] + chr(ord(h[-1]) + 1)
+        self.assertEqual(intify(k) - intify(h), 1)
+    def testEndianessLots(self):
+        for x in xrange(100):
+            self.testEndianessOnce()
+
+class Disantance(unittest.TestCase):
+    known = [
+            (("\0" * 20, "\xff" * 20), 2**160L -1),
+            ((sha("foo").digest(), sha("foo").digest()), 0),
+            ((sha("bar").digest(), sha("bar").digest()), 0)
+            ]
+    def testKnown(self):
+        for pair, dist in self.known:
+            self.assertEqual(distance(pair[0], pair[1]), dist)
+    def testCommutitive(self):
+        for i in xrange(100):
+            x, y, z = newID(), newID(), newID()
+            self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
+        
+class RandRange(unittest.TestCase):
+    def testOnce(self):
+        a = intify(newID())
+        b = intify(newID())
+        if a < b:
+            c = randRange(a, b)
+            self.assertEqual(a <= c < b, 1, "output out of range %d  %d  %d" % (b, c, a))
+        else:
+            c = randRange(b, a)
+            assert b <= c < a, "output out of range %d  %d  %d" % (b, c, a)
+
+    def testOneHundredTimes(self):
+        for i in xrange(100):
+            self.testOnce()
+
+
+
+if __name__ == '__main__':
+    unittest.main()   
+
+    
index 0c0ed52467896e88ba2e77d6dcc26793ea828ad3..e9e53a76ebd60a2f574f38c743e5dd95ced378f0 100644 (file)
@@ -11,22 +11,20 @@ from sha import sha
 from ktable import KTable, K
 from knode import KNode as Node
 
-from hash import newID, newIDInRange
+from khash import newID, newIDInRange
 
 from actions import FindNode, GetValue, KeyExpirer, StoreValue
 import krpc
-import airhook
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol
 from twisted.python import threadable
-from twisted.internet.app import Application
+from twisted.application import service, internet
 from twisted.web import server
 threadable.init()
 import sys
 
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
-import pysqlite_exceptions
 
 class KhashmirDBExcept(Exception):
     pass
@@ -34,7 +32,6 @@ class KhashmirDBExcept(Exception):
 # this is the main class!
 class Khashmir(protocol.Factory):
     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
-    protocol = krpc.KRPC
     def __init__(self, host, port, db='khashmir.db'):
         self.setup(host, port, db)
         
@@ -43,19 +40,24 @@ class Khashmir(protocol.Factory):
         self.port = port
         self.node = self._loadSelfNode(host, port)
         self.table = KTable(self.node)
-        self.app = Application("krpc")
-        self.airhook = airhook.listenAirhookStream(port, self)
+        self.app = service.Application("krpc")
+        self.udp = krpc.hostbroker(self)
+        self.udp.protocol = krpc.KRPC
+        self.listenport = reactor.listenUDP(port, self.udp)
         self.last = time.time()
         self._loadRoutingTable()
         KeyExpirer(store=self.store)
         #self.refreshTable(force=1)
         reactor.callLater(60, self.checkpoint, (1,))
         
+    def __del__(self):
+        self.listenport.stopListening()
+        
     def _loadSelfNode(self, host, port):
         c = self.store.cursor()
         c.execute('select id from self where num = 0;')
         if c.rowcount > 0:
-            id = c.fetchone()[0].decode('hex')
+            id = c.fetchone()[0]
         else:
             id = newID()
         return Node().init(id, host, port)
@@ -64,7 +66,7 @@ class Khashmir(protocol.Factory):
         self.store.autocommit = 0
         c = self.store.cursor()
         c.execute('delete from self where num = 0;')
-        c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
         self.store.commit()
         self.store.autocommit = 1
         
@@ -95,13 +97,13 @@ class Khashmir(protocol.Factory):
         self.store = sqlite.connect(db=db)
         self.store.autocommit = 1
         s = """
-            create table kv (key text, value text, time timestamp, primary key (key, value));
+            create table kv (key binary, value binary, time timestamp, primary key (key, value));
             create index kv_key on kv(key);
             create index kv_timestamp on kv(time);
             
-            create table nodes (id text primary key, host text, port number);
+            create table nodes (id binary primary key, host text, port number);
             
-            create table self (num number primary key, id text);
+            create table self (num number primary key, id binary);
             """
         c = self.store.cursor()
         c.execute(s)
@@ -116,7 +118,7 @@ class Khashmir(protocol.Factory):
         for bucket in self.table.buckets:
             for node in bucket.l:
                 d = node.senderDict()
-                c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
         self.store.commit()
         self.store.autocommit = 1;
         
@@ -128,8 +130,8 @@ class Khashmir(protocol.Factory):
         c = self.store.cursor()
         c.execute("select * from nodes;")
         for rec in c.fetchall():
-            n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
-            n.conn = self.airhook.connectionForAddr((n.host, n.port))
+            n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
             
 
@@ -140,7 +142,7 @@ class Khashmir(protocol.Factory):
             ping this node and add the contact info to the table on pong!
         """
         n =Node().init(const.NULL_ID, host, port) 
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.sendPing(n, callback=callback)
 
     ## this call is async!
@@ -246,7 +248,7 @@ class Khashmir(protocol.Factory):
                 sender['host'] = node.host
                 sender['port'] = node.port
                 n = Node().initWithDict(sender)
-                n.conn = self.airhook.connectionForAddr((n.host, n.port))
+                n.conn = self.udp.connectionForAddr((n.host, n.port))
                 table.insertNode(n)
                 if callback:
                     callback()
@@ -280,13 +282,12 @@ class Khashmir(protocol.Factory):
 
 
     def retrieveValues(self, key):
-        s = "select value from kv where key = '%s';" % key.encode('hex')
         c = self.store.cursor()
-        c.execute(s)
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
         t = c.fetchone()
         l = []
         while t:
-            l.append(t['value'].decode('base64'))
+            l.append(t['value'])
             t = c.fetchone()
         return l
     
@@ -301,7 +302,7 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"sender" : self.node.senderDict()}
         
@@ -311,24 +312,22 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"nodes" : nodes, "sender" : self.node.senderDict()}
             
     def krpc_store_value(self, key, value, sender, _krpc_sender):
         t = "%0.6f" % time.time()
-        s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
         c = self.store.cursor()
         try:
-            c.execute(s)
-        except pysqlite_exceptions.IntegrityError, reason:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
             # update last insert time
-            s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
-            c.execute(s)
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"sender" : self.node.senderDict()}
     
@@ -336,7 +335,7 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
         l = self.retrieveValues(key)
@@ -347,171 +346,3 @@ class Khashmir(protocol.Factory):
             nodes = map(lambda node: node.senderDict(), nodes)
             return {'nodes' : nodes, "sender": self.node.senderDict()}
 
-### TESTING ###
-from random import randrange
-import threading, thread, sys, time
-from sha import sha
-from hash import newID
-
-
-def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
-    import thread
-    l = []
-    for i in xrange(peers):
-        a = Khashmir(host, startport + i, db = dbprefix+`i`)
-        l.append(a)
-    thread.start_new_thread(l[0].app.run, ())
-    for peer in l[1:]:
-        peer.app.run(installSignalHandlers=0)  
-    return l
-    
-def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
-    from whrandom import randrange
-    import threading
-    import thread
-    import sys
-    port = startport
-    l = []
-    if not quiet:
-        print "Building %s peer table." % peers
-    
-    for i in xrange(peers):
-        a = Khashmir(host, port + i, db = dbprefix +`i`)
-        l.append(a)
-    
-    
-    thread.start_new_thread(l[0].app.run, ())
-    time.sleep(1)
-    for peer in l[1:]:
-        peer.app.run(installSignalHandlers=0)
-    #time.sleep(3)
-    
-    def spewer(frame, s, ignored):
-        from twisted.python import reflect
-        if frame.f_locals.has_key('self'):
-            se = frame.f_locals['self']
-            print 'method %s of %s at %s' % (
-                frame.f_code.co_name, reflect.qual(se.__class__), id(se)
-                )
-    #sys.settrace(spewer)
-
-    print "adding contacts...."
-    def makecb(flag):
-        def cb(f=flag):
-            f.set()
-        return cb
-
-    for peer in l:
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-    
-    print "finding close nodes...."
-    
-    for peer in l:
-        flag = threading.Event()
-        def cb(nodes, f=flag):
-            f.set()
-        peer.findCloseNodes(cb)
-        flag.wait()
-    #    for peer in l:
-    #  peer.refreshTable()
-    return l
-        
-def test_find_nodes(l, quiet=0):
-    flag = threading.Event()
-    
-    n = len(l)
-    
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    
-    def callback(nodes, flag=flag, id = b.node.id):
-        if (len(nodes) >0) and (nodes[0].id == id):
-            print "test_find_nodes     PASSED"
-        else:
-            print "test_find_nodes     FAILED"
-        flag.set()
-    a.findNode(b.node.id, callback)
-    flag.wait()
-    
-def test_find_value(l, quiet=0):
-    ff = threading.Event()
-    fa = threading.Event()
-    fb = threading.Event()
-    fc = threading.Event()
-    
-    n = len(l)
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    c = l[randrange(0,n)]
-    d = l[randrange(0,n)]
-    
-    key = newID()
-    value = newID()
-    if not quiet: print "inserting value..."
-    def acb(p, f=ff):
-        f.set()
-    a.storeValueForKey(key, value, acb)
-    ff.wait()
-    
-    if not quiet:
-        print "finding..."
-    
-    class cb:
-        def __init__(self, flag, value=value, port=None):
-            self.flag = flag
-            self.val = value
-            self.found = 0
-            self.port = port
-        def callback(self, values):
-                if(len(values) == 0):
-                    if not self.found:
-                        print "find   %s           NOT FOUND" % self.port
-                    else:
-                        print "find   %s           FOUND" % self.port
-                    self.flag.set()
-                else:
-                    if self.val in values:
-                        self.found = 1
-    
-    b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
-    fa.wait()
-    c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
-    fb.wait()
-    d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
-    fc.wait()
-    
-def test_one(host, port, db='/tmp/test'):
-    import thread
-    k = Khashmir(host, port, db)
-    thread.start_new_thread(reactor.run, ())
-    return k
-    
-if __name__ == "__main__":
-    import sys
-    n = 8
-    if len(sys.argv) > 1: n = int(sys.argv[1])
-    l = test_build_net(peers=n)
-    time.sleep(3)
-    print "finding nodes..."
-    for i in range(n):
-        test_find_nodes(l)
-    print "inserting and fetching values..."
-    for i in range(10):
-        test_find_value(l)
index 3d9c4f1b3a5beb94638b191dd368f6c117532e1e..70069fae710339686bd978bfca8ecf802afe505a 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -24,18 +24,18 @@ class KNode(Node):
         return dict
         
     def ping(self, sender):
-        df = self.conn.protocol.sendRequest('ping', {"sender":sender})
+        df = self.conn.sendRequest('ping', {"sender":sender})
         df.addCallback(self.checkSender)
         return df
     def findNode(self, target, sender):
-        df = self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender})
+        df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender})
         df.addCallback(self.checkSender)
         return df
     def storeValue(self, key, value, sender):
-        df = self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+        df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
         df.addCallback(self.checkSender)
         return df
     def findValue(self, key, sender):
-        df =  self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender})
+        df =  self.conn.sendRequest('find_value', {"key" : key, "sender" : sender})
         df.addCallback(self.checkSender)
         return df
diff --git a/krpc.py b/krpc.py
index 5f277692e1abf2e73af66c6e3b1665490e030b37..24a0f16c9a06f4c703f401bda85f473f2f4fd318 100644 (file)
--- a/krpc.py
+++ b/krpc.py
@@ -5,10 +5,12 @@ import airhook
 from twisted.internet.defer import Deferred
 from twisted.protocols import basic
 from bencode import bencode, bdecode
+from twisted.internet import protocol
+
 from twisted.internet import reactor
 import time
 
-import hash
+import khash as hash
 
 KRPC_TIMEOUT = 60
 
@@ -17,110 +19,143 @@ KRPC_ERROR_METHOD_UNKNOWN = 2
 KRPC_ERROR_RECEIVED_UNKNOWN = 3
 KRPC_ERROR_TIMEOUT = 4
 
-class KRPC(basic.NetstringReceiver):
-    noisy = 1
-    def __init__(self):
-        self.tids = {}
+# commands
+TID = 'tid'
+REQ = 'req'
+RSP = 'rsp'
+TYP = 'typ'
+ARG = 'arg'
+ERR = 'err'
+
+class hostbroker(protocol.DatagramProtocol):       
+    def __init__(self, server):
+        self.noisy = 0
+        self.server = server
+        # this should be changed to storage that drops old entries
+        self.connections = {}
+        
+    def datagramReceived(self, datagram, addr):
+        #print `addr`, `datagram`
+        #if addr != self.addr:
+        c = self.connectionForAddr(addr)
+        c.datagramReceived(datagram)
+        #if c.idle():
+        #    del self.connections[addr]
 
+    def connectionForAddr(self, addr):
+        if addr == self.addr:
+            raise Exception
+        if not self.connections.has_key(addr):
+            conn = self.protocol(addr, self.server, self.transport)
+            self.connections[addr] = conn
+        else:
+            conn = self.connections[addr]
+        return conn
+
+    def makeConnection(self, transport):
+        protocol.DatagramProtocol.makeConnection(self, transport)
+        tup = transport.getHost()
+        self.addr = (tup.host, tup.port)
 
-    def dataRecieved(self, data):
-        basic.NetstringReceiver(self, data)
-        if self.brokenPeer:
-            self.resetConnection()
-            
-    def resetConnection(self):
-        self.brokenPeer = 0
-        self._readerState = basic.LENGTH
-        self._readerLength = 0
+## connection
+class KRPC:
+    noisy = 1
+    def __init__(self, addr, server, transport):
+        self.transport = transport
+        self.factory = server
+        self.addr = addr
+        self.tids = {}
 
-    def stringReceived(self, str):
+    def datagramReceived(self, str):
         # bdecode
         try:
             msg = bdecode(str)
         except Exception, e:
-            if self.naisy:
+            if self.noisy:
                 print "response decode error: " + `e`
             self.d.errback()
         else:
+            #if self.noisy:
+            #    print msg
             # look at msg type
-            if msg['typ']  == 'req':
+            if msg[TYP]  == REQ:
                 ilen = len(str)
                 # if request
                 #      tell factory to handle
-                f = getattr(self.factory ,"krpc_" + msg['req'], None)
+                f = getattr(self.factory ,"krpc_" + msg[REQ], None)
                 if f and callable(f):
-                    msg['arg']['_krpc_sender'] =  self.transport.addr
+                    msg[ARG]['_krpc_sender'] =  self.addr
                     try:
-                        ret = apply(f, (), msg['arg'])
+                        ret = apply(f, (), msg[ARG])
                     except Exception, e:
                         ## send error
-                        out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
                         olen = len(out)
-                        self.sendString(out)
+                        self.transport.write(out, self.addr)
                     else:
                         if ret:
                             #  make response
-                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
                         else:
-                            out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+                            out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
                         #      send response
                         olen = len(out)
-                        self.sendString(out)
+                        self.transport.write(out, self.addr)
 
                 else:
                     if self.noisy:
-                        print "don't know about method %s" % msg['req']
+                        print "don't know about method %s" % msg[REQ]
                     # unknown method
-                    out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+                    out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
                     olen = len(out)
-                    self.sendString(out)
+                    self.transport.write(out, self.addr)
                 if self.noisy:
-                    print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, 
-                                                    ilen, msg['req'], olen)
-            elif msg['typ'] == 'rsp':
+                    print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port, 
+                                                    ilen, msg[REQ], olen)
+            elif msg[TYP] == RSP:
                 # if response
                 #      lookup tid
-                if self.tids.has_key(msg['tid']):
-                    df = self.tids[msg['tid']]
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
                     #  callback
-                    del(self.tids[msg['tid']])
-                    df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
+                    del(self.tids[msg[TID]])
+                    df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
                 else:
-                    print 'timeout ' + `msg['rsp']['sender']`
+                    print 'timeout ' + `msg[RSP]['sender']`
                     # no tid, this transaction timed out already...
-            elif msg['typ'] == 'err':
+            elif msg[TYP] == ERR:
                 # if error
                 #      lookup tid
-                if self.tids.has_key(msg['tid']):
-                    df = self.tids[msg['tid']]
+                if self.tids.has_key(msg[TID]):
+                    df = self.tids[msg[TID]]
                     #  callback
-                    df.errback(msg['err'])
-                    del(self.tids[msg['tid']])
+                    df.errback(msg[ERR])
+                    del(self.tids[msg[TID]])
                 else:
                     # day late and dollar short
                     pass
             else:
                 print "unknown message type " + `msg`
                 # unknown message type
-                df = self.tids[msg['tid']]
+                df = self.tids[msg[TID]]
                 #      callback
                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
-                del(self.tids[msg['tid']])
+                del(self.tids[msg[TID]])
                 
     def sendRequest(self, method, args):
         # make message
         # send it
-        msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
+        msg = {TID : hash.newTID(), TYP : REQ,  REQ : method, ARG : args}
         str = bencode(msg)
         d = Deferred()
-        self.tids[msg['tid']] = d
-        def timeOut(tids = self.tids, id = msg['tid']):
+        self.tids[msg[TID]] = d
+        def timeOut(tids = self.tids, id = msg[TID]):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 print ">>>>>> KRPC_ERROR_TIMEOUT"
                 df.errback(KRPC_ERROR_TIMEOUT)
         reactor.callLater(KRPC_TIMEOUT, timeOut)
-        self.sendString(str)
+        self.transport.write(str, self.addr)
         return d
  
index 4d4c5bb2f3d65934680d0ba18acda4b028742a4d..67b23c920ab09da45e2f823f754e03e9b7606ba9 100644 (file)
--- a/ktable.py
+++ b/ktable.py
@@ -5,7 +5,7 @@ import time
 from bisect import *
 from types import *
 
-import hash
+import khash as hash
 import const
 from const import K, HASH_LENGTH, NULL_ID
 from node import Node
diff --git a/node.py b/node.py
index 3d91a896672c1ff64aa8f45f0c80182dfa584b54..f8d2801c1d69e5b9fde13bca5d1d1d95a48726a0 100644 (file)
--- a/node.py
+++ b/node.py
@@ -1,7 +1,7 @@
 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
-import hash
+import khash
 import time
 from types import *
 
@@ -14,7 +14,7 @@ class Node:
     
     def init(self, id, host, port):
         self.id = id
-        self.num = hash.intify(id)
+        self.num = khash.intify(id)
         self.host = host
         self.port = port
         self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
@@ -23,7 +23,7 @@ class Node:
     def initWithDict(self, dict):
         self._senderDict = dict
         self.id = dict['id']
-        self.num = hash.intify(self.id)
+        self.num = khash.intify(self.id)
         self.port = dict['port']
         self.host = dict['host']
         return self
@@ -73,7 +73,7 @@ import unittest
 
 class TestNode(unittest.TestCase):
     def setUp(self):
-        self.node = Node().init(hash.newID(), 'localhost', 2002)
+        self.node = Node().init(khash.newID(), 'localhost', 2002)
     def testUpdateLastSeen(self):
         t = self.node.lastSeen
         self.node.updateLastSeen()
diff --git a/test_khashmir.py b/test_khashmir.py
new file mode 100644 (file)
index 0000000..7dcc247
--- /dev/null
@@ -0,0 +1,137 @@
+from unittest import *
+from khashmir import *
+import khash
+
+from whrandom import randrange
+
+import os
+
+if __name__ =="__main__":
+    tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+    result = TextTestRunner().run(tests)
+
+class SimpleTests(TestCase):
+    def setUp(self):
+        self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test')
+        self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test')
+        
+    def tearDown(self):
+        self.a.listenport.stopListening()
+        self.b.listenport.stopListening()
+        os.unlink('/tmp/a.test')
+        os.unlink('/tmp/b.test')                
+        reactor.iterate()
+        reactor.iterate()
+
+    def addContacts(self):
+        self.a.addContact('127.0.0.1', 4045)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+
+    def testAddContact(self):
+        self.assertEqual(len(self.a.table.buckets), 1) 
+        self.assertEqual(len(self.a.table.buckets[0].l), 0)
+
+        self.assertEqual(len(self.b.table.buckets), 1) 
+        self.assertEqual(len(self.b.table.buckets[0].l), 0)
+
+        self.addContacts()
+
+        self.assertEqual(len(self.a.table.buckets), 1) 
+        self.assertEqual(len(self.a.table.buckets[0].l), 1)
+        self.assertEqual(len(self.b.table.buckets), 1) 
+        self.assertEqual(len(self.b.table.buckets[0].l), 1)
+
+    def testStoreRetrieve(self):
+        self.addContacts()
+        self.got = 0
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.a.valueForKey(sha('foo').digest(), self._cb)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+
+    def _cb(self, val):
+        if not val:
+            self.assertEqual(self.got, 1)
+        elif 'foobar' in val:
+            self.got = 1
+
+
+class MultiTest(TestCase):
+    num = 30
+    def _done(self, val):
+        self.done = 1
+        
+    def setUp(self):
+        self.l = []
+        self.startport = 4044
+        for i in range(self.num):
+            self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i)))
+        reactor.iterate()
+        reactor.iterate()
+        
+        for i in self.l:
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
+            
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+
+    def tearDown(self):
+        for i in self.l:
+            i.listenport.stopListening()
+        for i in range(self.startport, self.startport+self.num):
+            os.unlink('/tmp/%s.test' % i)
+            
+        reactor.iterate()
+        
+    def testStoreRetrieve(self):
+        for i in range(10):
+            K = khash.newID()
+            V = khash.newID()
+            
+            self.done = 0
+            def _cb(val):
+                self.done = 1
+            self.l[randrange(0, self.num)].storeValueForKey(K, V, _cb)
+            while not self.done:
+                reactor.iterate()
+
+            self.got = 0
+            self.done = 0
+
+            def _cb(val):
+                if not val:
+                    self.done = 1
+                    self.assertEqual(self.got, 1)
+                elif V in val:
+                    self.got = 1
+                                    
+            self.l[randrange(0, self.num)].valueForKey(K, _cb)
+            while not self.done:
+                reactor.iterate()
index d7f975403c8aaca6efb82237eb0497b7bd81307b..a7bef9d54b49dfda756886565ee0f2748e2a20fb 100644 (file)
@@ -3,20 +3,21 @@
 
 from unittest import *
 from krpc import *
-from airhook import *
+#from airhook import *
 
 KRPC.noisy = 0
 
 import sys
 
 if __name__ =="__main__":
-    tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
-    result = unittest.TextTestRunner().run(tests)
+    tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+    result = TextTestRunner().run(tests)
 
 
 def connectionForAddr(host, port):
     return host
     
+
 class Receiver(protocol.Factory):
     protocol = KRPC
     def __init__(self):
@@ -26,51 +27,35 @@ class Receiver(protocol.Factory):
     def krpc_echo(self, msg, _krpc_sender):
         return msg
 
-class SimpleTest(TestCase):
+def make(port):
+    af = Receiver()
+    a = hostbroker(af)
+    a.protocol = KRPC
+    p = reactor.listenUDP(port, a)
+    return af, a, p
+    
+class KRPCTests(TestCase):
     def setUp(self):
         self.noisy = 0
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4040, self.af)
-        self.b = listenAirhookStream(4041, self.bf)
-        
-    def testSimpleMessage(self):
-        self.noisy = 0
-        self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."})
-        reactor.iterate()
+        self.af, self.a, self.ap = make(1180)
+        self.bf, self.b, self.bp = make(1181)
+
+    def tearDown(self):
+        self.ap.stopListening()
+        self.bp.stopListening()
         reactor.iterate()
         reactor.iterate()
-        self.assertEqual(self.bf.buf, ["This is a test."])
 
-class SimpleTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4050, self.af)
-        self.b = listenAirhookStream(4051, self.bf)
-        
     def testSimpleMessage(self):
         self.noisy = 0
-        self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
+        self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         self.assertEqual(self.bf.buf, ["This is a test."])
 
-class BlastTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4060, self.af)
-        self.b = listenAirhookStream(4061, self.bf)
-
     def testMessageBlast(self):
-        self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+        self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
@@ -78,23 +63,13 @@ class BlastTest(TestCase):
         self.bf.buf = []
         
         for i in range(100):
-            self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+            self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
             reactor.iterate()
             #self.bf.buf = []
         self.assertEqual(self.bf.buf, ["This is a test."] * 100)
 
-class EchoTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        self.msg = None
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4042, self.af)
-        self.b = listenAirhookStream(4043, self.bf)
-        
     def testEcho(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -104,21 +79,11 @@ class EchoTest(TestCase):
 
     def gotMsg(self, dict):
         _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
+        msg = dict[RSP]
         self.msg = msg
 
-class ManyEchoTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        self.msg = None
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4588, self.af)
-        self.b = listenAirhookStream(4589, self.bf)
-        
     def testManyEcho(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -127,32 +92,17 @@ class ManyEchoTest(TestCase):
         self.assertEqual(self.msg, "This is a test.")
         for i in xrange(100):
             self.msg = None
-            df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+            df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
             df.addCallback(self.gotMsg)
             reactor.iterate()
             reactor.iterate()
             reactor.iterate()
             reactor.iterate()
             self.assertEqual(self.msg, "This is a test.")
-            
-    def gotMsg(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
-        self.msg = msg
 
-class MultiEchoTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        self.msg = None
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4048, self.af)
-        self.b = listenAirhookStream(4049, self.bf)
-        
     def testMultiEcho(self):
         self.noisy = 1
-        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -160,7 +110,7 @@ class MultiEchoTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is a test.")
 
-        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -168,7 +118,7 @@ class MultiEchoTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is another test.")
 
-        df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -176,24 +126,9 @@ class MultiEchoTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is yet another test.")
 
-    def gotMsg(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
-        self.msg = msg
-
-class EchoResetTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        self.msg = None
-        
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4078, self.af)
-        self.b = listenAirhookStream(4079, self.bf)
-        
     def testEchoReset(self):
         self.noisy = 1
-        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -201,7 +136,7 @@ class EchoResetTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is a test.")
 
-        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -209,8 +144,8 @@ class EchoResetTest(TestCase):
         reactor.iterate()
         self.assertEqual(self.msg, "This is another test.")
 
-        del(self.a.connections[('127.0.0.1', 4079)])
-        df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+        del(self.a.connections[('127.0.0.1', 1181)])
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
         df.addCallback(self.gotMsg)
         reactor.iterate()
         reactor.iterate()
@@ -221,23 +156,10 @@ class EchoResetTest(TestCase):
     def testLotsofEchoReset(self):
         for i in range(100):
             self.testEchoReset()
-    def gotMsg(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
-        self.msg = msg
-
-class UnknownMethErrTest(TestCase):
-    def setUp(self):
-        self.noisy = 0
-        self.err = None
-        self.af = Receiver()
-        self.bf = Receiver()        
-        self.a = listenAirhookStream(4044, self.af)
-        self.b = listenAirhookStream(4045, self.bf)
-        
+            
     def testUnknownMeth(self):
         self.noisy = 1
-        df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."})
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
         df.addErrback(self.gotErr)
         reactor.iterate()
         reactor.iterate()
@@ -247,4 +169,4 @@ class UnknownMethErrTest(TestCase):
 
     def gotErr(self, err):
         self.err = err.value
-        
+        
\ No newline at end of file