]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
major cleanup, updated for twisted
[quix0rs-apt-p2p.git] / khashmir.py
index 0c0ed52467896e88ba2e77d6dcc26793ea828ad3..e9e53a76ebd60a2f574f38c743e5dd95ced378f0 100644 (file)
@@ -11,22 +11,20 @@ from sha import sha
 from ktable import KTable, K
 from knode import KNode as Node
 
-from hash import newID, newIDInRange
+from khash import newID, newIDInRange
 
 from actions import FindNode, GetValue, KeyExpirer, StoreValue
 import krpc
-import airhook
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol
 from twisted.python import threadable
-from twisted.internet.app import Application
+from twisted.application import service, internet
 from twisted.web import server
 threadable.init()
 import sys
 
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
-import pysqlite_exceptions
 
 class KhashmirDBExcept(Exception):
     pass
@@ -34,7 +32,6 @@ class KhashmirDBExcept(Exception):
 # this is the main class!
 class Khashmir(protocol.Factory):
     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
-    protocol = krpc.KRPC
     def __init__(self, host, port, db='khashmir.db'):
         self.setup(host, port, db)
         
@@ -43,19 +40,24 @@ class Khashmir(protocol.Factory):
         self.port = port
         self.node = self._loadSelfNode(host, port)
         self.table = KTable(self.node)
-        self.app = Application("krpc")
-        self.airhook = airhook.listenAirhookStream(port, self)
+        self.app = service.Application("krpc")
+        self.udp = krpc.hostbroker(self)
+        self.udp.protocol = krpc.KRPC
+        self.listenport = reactor.listenUDP(port, self.udp)
         self.last = time.time()
         self._loadRoutingTable()
         KeyExpirer(store=self.store)
         #self.refreshTable(force=1)
         reactor.callLater(60, self.checkpoint, (1,))
         
+    def __del__(self):
+        self.listenport.stopListening()
+        
     def _loadSelfNode(self, host, port):
         c = self.store.cursor()
         c.execute('select id from self where num = 0;')
         if c.rowcount > 0:
-            id = c.fetchone()[0].decode('hex')
+            id = c.fetchone()[0]
         else:
             id = newID()
         return Node().init(id, host, port)
@@ -64,7 +66,7 @@ class Khashmir(protocol.Factory):
         self.store.autocommit = 0
         c = self.store.cursor()
         c.execute('delete from self where num = 0;')
-        c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
         self.store.commit()
         self.store.autocommit = 1
         
@@ -95,13 +97,13 @@ class Khashmir(protocol.Factory):
         self.store = sqlite.connect(db=db)
         self.store.autocommit = 1
         s = """
-            create table kv (key text, value text, time timestamp, primary key (key, value));
+            create table kv (key binary, value binary, time timestamp, primary key (key, value));
             create index kv_key on kv(key);
             create index kv_timestamp on kv(time);
             
-            create table nodes (id text primary key, host text, port number);
+            create table nodes (id binary primary key, host text, port number);
             
-            create table self (num number primary key, id text);
+            create table self (num number primary key, id binary);
             """
         c = self.store.cursor()
         c.execute(s)
@@ -116,7 +118,7 @@ class Khashmir(protocol.Factory):
         for bucket in self.table.buckets:
             for node in bucket.l:
                 d = node.senderDict()
-                c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
         self.store.commit()
         self.store.autocommit = 1;
         
@@ -128,8 +130,8 @@ class Khashmir(protocol.Factory):
         c = self.store.cursor()
         c.execute("select * from nodes;")
         for rec in c.fetchall():
-            n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
-            n.conn = self.airhook.connectionForAddr((n.host, n.port))
+            n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
             
 
@@ -140,7 +142,7 @@ class Khashmir(protocol.Factory):
             ping this node and add the contact info to the table on pong!
         """
         n =Node().init(const.NULL_ID, host, port) 
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.sendPing(n, callback=callback)
 
     ## this call is async!
@@ -246,7 +248,7 @@ class Khashmir(protocol.Factory):
                 sender['host'] = node.host
                 sender['port'] = node.port
                 n = Node().initWithDict(sender)
-                n.conn = self.airhook.connectionForAddr((n.host, n.port))
+                n.conn = self.udp.connectionForAddr((n.host, n.port))
                 table.insertNode(n)
                 if callback:
                     callback()
@@ -280,13 +282,12 @@ class Khashmir(protocol.Factory):
 
 
     def retrieveValues(self, key):
-        s = "select value from kv where key = '%s';" % key.encode('hex')
         c = self.store.cursor()
-        c.execute(s)
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
         t = c.fetchone()
         l = []
         while t:
-            l.append(t['value'].decode('base64'))
+            l.append(t['value'])
             t = c.fetchone()
         return l
     
@@ -301,7 +302,7 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"sender" : self.node.senderDict()}
         
@@ -311,24 +312,22 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"nodes" : nodes, "sender" : self.node.senderDict()}
             
     def krpc_store_value(self, key, value, sender, _krpc_sender):
         t = "%0.6f" % time.time()
-        s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
         c = self.store.cursor()
         try:
-            c.execute(s)
-        except pysqlite_exceptions.IntegrityError, reason:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
             # update last insert time
-            s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
-            c.execute(s)
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"sender" : self.node.senderDict()}
     
@@ -336,7 +335,7 @@ class Khashmir(protocol.Factory):
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
         l = self.retrieveValues(key)
@@ -347,171 +346,3 @@ class Khashmir(protocol.Factory):
             nodes = map(lambda node: node.senderDict(), nodes)
             return {'nodes' : nodes, "sender": self.node.senderDict()}
 
-### TESTING ###
-from random import randrange
-import threading, thread, sys, time
-from sha import sha
-from hash import newID
-
-
-def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
-    import thread
-    l = []
-    for i in xrange(peers):
-        a = Khashmir(host, startport + i, db = dbprefix+`i`)
-        l.append(a)
-    thread.start_new_thread(l[0].app.run, ())
-    for peer in l[1:]:
-        peer.app.run(installSignalHandlers=0)  
-    return l
-    
-def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
-    from whrandom import randrange
-    import threading
-    import thread
-    import sys
-    port = startport
-    l = []
-    if not quiet:
-        print "Building %s peer table." % peers
-    
-    for i in xrange(peers):
-        a = Khashmir(host, port + i, db = dbprefix +`i`)
-        l.append(a)
-    
-    
-    thread.start_new_thread(l[0].app.run, ())
-    time.sleep(1)
-    for peer in l[1:]:
-        peer.app.run(installSignalHandlers=0)
-    #time.sleep(3)
-    
-    def spewer(frame, s, ignored):
-        from twisted.python import reflect
-        if frame.f_locals.has_key('self'):
-            se = frame.f_locals['self']
-            print 'method %s of %s at %s' % (
-                frame.f_code.co_name, reflect.qual(se.__class__), id(se)
-                )
-    #sys.settrace(spewer)
-
-    print "adding contacts...."
-    def makecb(flag):
-        def cb(f=flag):
-            f.set()
-        return cb
-
-    for peer in l:
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-    
-    print "finding close nodes...."
-    
-    for peer in l:
-        flag = threading.Event()
-        def cb(nodes, f=flag):
-            f.set()
-        peer.findCloseNodes(cb)
-        flag.wait()
-    #    for peer in l:
-    #  peer.refreshTable()
-    return l
-        
-def test_find_nodes(l, quiet=0):
-    flag = threading.Event()
-    
-    n = len(l)
-    
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    
-    def callback(nodes, flag=flag, id = b.node.id):
-        if (len(nodes) >0) and (nodes[0].id == id):
-            print "test_find_nodes     PASSED"
-        else:
-            print "test_find_nodes     FAILED"
-        flag.set()
-    a.findNode(b.node.id, callback)
-    flag.wait()
-    
-def test_find_value(l, quiet=0):
-    ff = threading.Event()
-    fa = threading.Event()
-    fb = threading.Event()
-    fc = threading.Event()
-    
-    n = len(l)
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    c = l[randrange(0,n)]
-    d = l[randrange(0,n)]
-    
-    key = newID()
-    value = newID()
-    if not quiet: print "inserting value..."
-    def acb(p, f=ff):
-        f.set()
-    a.storeValueForKey(key, value, acb)
-    ff.wait()
-    
-    if not quiet:
-        print "finding..."
-    
-    class cb:
-        def __init__(self, flag, value=value, port=None):
-            self.flag = flag
-            self.val = value
-            self.found = 0
-            self.port = port
-        def callback(self, values):
-                if(len(values) == 0):
-                    if not self.found:
-                        print "find   %s           NOT FOUND" % self.port
-                    else:
-                        print "find   %s           FOUND" % self.port
-                    self.flag.set()
-                else:
-                    if self.val in values:
-                        self.found = 1
-    
-    b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
-    fa.wait()
-    c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
-    fb.wait()
-    d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
-    fc.wait()
-    
-def test_one(host, port, db='/tmp/test'):
-    import thread
-    k = Khashmir(host, port, db)
-    thread.start_new_thread(reactor.run, ())
-    return k
-    
-if __name__ == "__main__":
-    import sys
-    n = 8
-    if len(sys.argv) > 1: n = int(sys.argv[1])
-    l = test_build_net(peers=n)
-    time.sleep(3)
-    print "finding nodes..."
-    for i in range(n):
-        test_find_nodes(l)
-    print "inserting and fetching values..."
-    for i in range(10):
-        test_find_value(l)