]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
Remove all the airhook stuff.
[quix0rs-apt-p2p.git] / khashmir.py
index a738a97fad8048e2296aeee876a9d8a20bddfe24..46946ebafab4863aa2c83fc0424a94efcf809be4 100644 (file)
@@ -1,4 +1,5 @@
-## Copyright 2002 Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
 
 from const import reactor
 import const
@@ -8,32 +9,32 @@ import time
 from sha import sha
 
 from ktable import KTable, K
-from knode import KNode as Node
+from knode import *
 
-from hash import newID, newIDInRange
+from khash import newID, newIDInRange
 
 from actions import FindNode, GetValue, KeyExpirer, StoreValue
 import krpc
-import airhook
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol
 from twisted.python import threadable
-from twisted.internet.app import Application
+from twisted.application import service, internet
 from twisted.web import server
 threadable.init()
 import sys
 
+from random import randrange
+
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
-import pysqlite_exceptions
 
 class KhashmirDBExcept(Exception):
     pass
 
-# this is the main class!
-class Khashmir(protocol.Factory):
+# this is the base class, has base functionality and find node, no key-value mappings
+class KhashmirBase(protocol.Factory):
     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
-    protocol = krpc.KRPC
+    _Node = KNodeBase
     def __init__(self, host, port, db='khashmir.db'):
         self.setup(host, port, db)
         
@@ -42,36 +43,45 @@ class Khashmir(protocol.Factory):
         self.port = port
         self.node = self._loadSelfNode(host, port)
         self.table = KTable(self.node)
-        self.app = Application("krpc")
-        self.airhook = airhook.listenAirhookStream(port, self)
+        #self.app = service.Application("krpc")
+        self.udp = krpc.hostbroker(self)
+        self.udp.protocol = krpc.KRPC
+        self.listenport = reactor.listenUDP(port, self.udp)
         self.last = time.time()
         self._loadRoutingTable()
         KeyExpirer(store=self.store)
-        #self.refreshTable(force=1)
+        self.refreshTable(force=1)
         reactor.callLater(60, self.checkpoint, (1,))
+
+    def Node(self):
+        n = self._Node()
+        n.table = self.table
+        return n
+    
+    def __del__(self):
+        self.listenport.stopListening()
         
     def _loadSelfNode(self, host, port):
         c = self.store.cursor()
         c.execute('select id from self where num = 0;')
         if c.rowcount > 0:
-            id = c.fetchone()[0].decode('hex')
+            id = c.fetchone()[0]
         else:
             id = newID()
-        return Node().init(id, host, port)
+        return self._Node().init(id, host, port)
         
     def _saveSelfNode(self):
-        self.store.autocommit = 0
         c = self.store.cursor()
         c.execute('delete from self where num = 0;')
-        c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
         self.store.commit()
-        self.store.autocommit = 1
         
     def checkpoint(self, auto=0):
         self._saveSelfNode()
         self._dumpRoutingTable()
+        self.refreshTable()
         if auto:
-            reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+            reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
         
     def _findDB(self, db):
         import os
@@ -85,39 +95,36 @@ class Khashmir(protocol.Factory):
     def _loadDB(self, db):
         try:
             self.store = sqlite.connect(db=db)
-            self.store.autocommit = 1
+            #self.store.autocommit = 0
         except:
             import traceback
             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
         
     def _createNewDB(self, db):
         self.store = sqlite.connect(db=db)
-        self.store.autocommit = 1
         s = """
-            create table kv (key text, value text, time timestamp, primary key (key, value));
+            create table kv (key binary, value binary, time timestamp, primary key (key, value));
             create index kv_key on kv(key);
             create index kv_timestamp on kv(time);
             
-            create table nodes (id text primary key, host text, port number);
+            create table nodes (id binary primary key, host text, port number);
             
-            create table self (num number primary key, id text);
+            create table self (num number primary key, id binary);
             """
         c = self.store.cursor()
         c.execute(s)
+        self.store.commit()
 
     def _dumpRoutingTable(self):
         """
             save routing table nodes to the database
         """
-        self.store.autocommit = 0;
         c = self.store.cursor()
         c.execute("delete from nodes where id not NULL;")
         for bucket in self.table.buckets:
             for node in bucket.l:
-                d = node.senderDict()
-                c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
         self.store.commit()
-        self.store.autocommit = 1;
         
     def _loadRoutingTable(self):
         """
@@ -127,8 +134,8 @@ class Khashmir(protocol.Factory):
         c = self.store.cursor()
         c.execute("select * from nodes;")
         for rec in c.fetchall():
-            n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
-            n.conn = self.airhook.connectionForAddr((n.host, n.port))
+            n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
             
 
@@ -138,8 +145,8 @@ class Khashmir(protocol.Factory):
         """
             ping this node and add the contact info to the table on pong!
         """
-        n =Node().init(const.NULL_ID, host, port) 
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n =self.Node().init(const.NULL_ID, host, port) 
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.sendPing(n, callback=callback)
 
     ## this call is async!
@@ -159,46 +166,6 @@ class Khashmir(protocol.Factory):
             state = FindNode(self, id, d.callback)
             reactor.callFromThread(state.goWithNodes, nodes)
     
-    
-    ## also async
-    def valueForKey(self, key, callback, searchlocal = 1):
-        """ returns the values found for key in global table
-            callback will be called with a list of values for each peer that returns unique values
-            final callback will be an empty list - probably should change to 'more coming' arg
-        """
-        nodes = self.table.findNodes(key)
-        
-        # get locals
-        if searchlocal:
-            l = self.retrieveValues(key)
-            if len(l) > 0:
-                reactor.callLater(0, callback, (l))
-        else:
-            l = []
-        
-        # create our search state
-        state = GetValue(self, key, callback)
-        reactor.callFromThread(state.goWithNodes, nodes, l)
-
-    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, callback=None):
-        """ stores the value for key in the global table, returns immediately, no status 
-            in this implementation, peers respond but don't indicate status to storing values
-            a key can have many values
-        """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
-            if not response:
-                # default callback
-                def _storedValueHandler(sender):
-                    pass
-                response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response)
-            reactor.callFromThread(action.goWithNodes, nodes)
-            
-        # this call is asynch
-        self.findNode(key, _storeValueForKey)
-        
-    
     def insertNode(self, n, contacted=1):
         """
         insert a node in our local table, pinging oldest contact in bucket, if necessary
@@ -219,36 +186,30 @@ class Khashmir(protocol.Factory):
             
             def _notStaleNodeHandler(dict, old=old):
                 """ called when we get a pong from the old node """
-                _krpc_sender = dict['_krpc_sender']
                 dict = dict['rsp']
-                sender = dict['sender']
-                if sender['id'] == old.id:
+                if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
-            df = old.ping(self.node.senderDict())
+            df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
     def sendPing(self, node, callback=None):
         """
             ping a node
         """
-        df = node.ping(self.node.senderDict())
+        df = node.ping(self.node.id)
         ## these are the callbacks we use when we issue a PING
         def _pongHandler(dict, node=node, table=self.table, callback=callback):
             _krpc_sender = dict['_krpc_sender']
             dict = dict['rsp']
-            sender = dict['sender']
-            if node.id != const.NULL_ID and node.id != sender['id']:
-                # whoah, got response from different peer than we were expecting
-                self.table.invalidateNode(node)
-            else:
-                sender['host'] = node.host
-                sender['port'] = node.port
-                n = Node().initWithDict(sender)
-                n.conn = self.airhook.connectionForAddr((n.host, n.port))
-                table.insertNode(n)
-                if callback:
-                    callback()
+            sender = {'id' : dict['id']}
+            sender['host'] = _krpc_sender[0]
+            sender['port'] = _krpc_sender[1]
+            n = self.Node().initWithDict(sender)
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
+            table.insertNode(n)
+            if callback:
+                callback()
         def _defaultPong(err, node=node, table=self.table, callback=callback):
             table.nodeFailed(node)
             if callback:
@@ -277,242 +238,125 @@ class Khashmir(protocol.Factory):
                 id = newIDInRange(bucket.min, bucket.max)
                 self.findNode(id, callback)
 
-
-    def retrieveValues(self, key):
-        s = "select value from kv where key = '%s';" % key.encode('hex')
-        c = self.store.cursor()
-        c.execute(s)
-        t = c.fetchone()
-        l = []
-        while t:
-            l.append(t['value'].decode('base64'))
-            t = c.fetchone()
-        return l
-    
-    #####
-    ##### INCOMING MESSAGE HANDLERS
-    
-    def krpc_ping(self, sender, _krpc_sender):
+    def stats(self):
         """
-            takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
-            returns sender dict
+        Returns (num_contacts, num_nodes)
+        num_contacts: number contacts in our routing table
+        num_nodes: number of nodes estimated in the entire dht
         """
+        num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+        num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+        return (num_contacts, num_nodes)
+
+    def krpc_ping(self, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
+        return {"id" : self.node.id}
         
-    def krpc_find_node(self, target, sender, _krpc_sender):
+    def krpc_find_node(self, target, id, _krpc_sender):
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
-        return {"nodes" : nodes, "sender" : self.node.senderDict()}
-            
-    def krpc_store_value(self, key, value, sender, _krpc_sender):
-        t = "%0.6f" % time.time()
-        s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
+        return {"nodes" : nodes, "id" : self.node.id}
+
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+    _Node = KNodeRead
+    def retrieveValues(self, key):
         c = self.store.cursor()
-        try:
-            c.execute(s)
-        except pysqlite_exceptions.IntegrityError, reason:
-            # update last insert time
-            s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
-            c.execute(s)
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
-        self.insertNode(n, contacted=0)
-        return {"sender" : self.node.senderDict()}
-    
-    def krpc_find_value(self, key, sender, _krpc_sender):
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'])
+            t = c.fetchone()
+        return l
+    ## also async
+    def valueForKey(self, key, callback, searchlocal = 1):
+        """ returns the values found for key in global table
+            callback will be called with a list of values for each peer that returns unique values
+            final callback will be an empty list - probably should change to 'more coming' arg
+        """
+        nodes = self.table.findNodes(key)
+        
+        # get locals
+        if searchlocal:
+            l = self.retrieveValues(key)
+            if len(l) > 0:
+                reactor.callLater(0, callback, (l))
+        else:
+            l = []
+        
+        # create our search state
+        state = GetValue(self, key, callback)
+        reactor.callFromThread(state.goWithNodes, nodes, l)
+
+    def krpc_find_value(self, key, id, _krpc_sender):
+        sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.airhook.connectionForAddr((n.host, n.port))
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
         l = self.retrieveValues(key)
         if len(l) > 0:
-            return {'values' : l, "sender": self.node.senderDict()}
+            return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
             nodes = map(lambda node: node.senderDict(), nodes)
-            return {'nodes' : nodes, "sender": self.node.senderDict()}
-
-### TESTING ###
-from random import randrange
-import threading, thread, sys, time
-from sha import sha
-from hash import newID
-
-
-def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
-    import thread
-    l = []
-    for i in xrange(peers):
-        a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
-        l.append(a)
-    thread.start_new_thread(l[0].app.run, ())
-    for peer in l[1:]:
-        peer.app.run() 
-    return l
-    
-def test_build_net(quiet=0, peers=24, host='127.0.0.1',  pause=0, startport=2001, dbprefix='/tmp/test'):
-    from whrandom import randrange
-    import threading
-    import thread
-    import sys
-    port = startport
-    l = []
-    if not quiet:
-        print "Building %s peer table." % peers
-    
-    for i in xrange(peers):
-        a = Khashmir(host, port + i, db = dbprefix +`i`)
-        l.append(a)
-    
-    
-    thread.start_new_thread(l[0].app.run, ())
-    time.sleep(1)
-    for peer in l[1:]:
-        peer.app.run()
-    #time.sleep(3)
-    
-    def spewer(frame, s, ignored):
-        from twisted.python import reflect
-        if frame.f_locals.has_key('self'):
-            se = frame.f_locals['self']
-            print 'method %s of %s at %s' % (
-                frame.f_code.co_name, reflect.qual(se.__class__), id(se)
-                )
-    #sys.settrace(spewer)
+            return {'nodes' : nodes, "id": self.node.id}
 
-    print "adding contacts...."
-    def makecb(flag):
-        def cb(f=flag):
-            f.set()
-        return cb
+###  provides a generic write method, you probably don't want to deploy something that allows
+###  arbitrary value storage
+class KhashmirWrite(KhashmirRead):
+    _Node = KNodeWrite
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
+        """ stores the value for key in the global table, returns immediately, no status 
+            in this implementation, peers respond but don't indicate status to storing values
+            a key can have many values
+        """
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+            if not response:
+                # default callback
+                def _storedValueHandler(sender):
+                    pass
+                response=_storedValueHandler
+            action = StoreValue(self.table, key, value, response)
+            reactor.callFromThread(action.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findNode(key, _storeValueForKey)
+                    
+    def krpc_store_value(self, key, value, id, _krpc_sender):
+        t = "%0.6f" % time.time()
+        c = self.store.cursor()
+        try:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
+            # update last insert time
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        self.store.commit()
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"id" : self.node.id}
 
-    for peer in l:
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-        p = l[randrange(0, len(l))]
-        if p != peer:
-            n = p.node
-            flag = threading.Event()
-            peer.addContact(host, n.port, makecb(flag))
-            flag.wait()
-    
-    print "finding close nodes...."
-    
-    for peer in l:
-        flag = threading.Event()
-        def cb(nodes, f=flag):
-            f.set()
-        peer.findCloseNodes(cb)
-        flag.wait()
-    #    for peer in l:
-    #  peer.refreshTable()
-    return l
-        
-def test_find_nodes(l, quiet=0):
-    flag = threading.Event()
-    
-    n = len(l)
-    
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    
-    def callback(nodes, flag=flag, id = b.node.id):
-        if (len(nodes) >0) and (nodes[0].id == id):
-            print "test_find_nodes     PASSED"
-        else:
-            print "test_find_nodes     FAILED"
-        flag.set()
-    a.findNode(b.node.id, callback)
-    flag.wait()
-    
-def test_find_value(l, quiet=0):
-    ff = threading.Event()
-    fa = threading.Event()
-    fb = threading.Event()
-    fc = threading.Event()
-    
-    n = len(l)
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    c = l[randrange(0,n)]
-    d = l[randrange(0,n)]
-    
-    key = newID()
-    value = newID()
-    if not quiet: print "inserting value..."
-    def acb(p, f=ff):
-        f.set()
-    a.storeValueForKey(key, value, acb)
-    ff.wait()
-    
-    if not quiet:
-        print "finding..."
-    
-    class cb:
-        def __init__(self, flag, value=value, port=None):
-            self.flag = flag
-            self.val = value
-            self.found = 0
-            self.port = port
-        def callback(self, values):
-            try:
-                if(len(values) == 0):
-                    if not self.found:
-                        print "find   %s             NOT FOUND" % self.port
-                    else:
-                        print "find   %s           FOUND" % self.port
-                else:
-                    if self.val in values:
-                        self.found = 1
-            finally:
-                self.flag.set()
-    
-    b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
-    fa.wait()
-    c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
-    fb.wait()
-    d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
-    fc.wait()
-    
-def test_one(host, port, db='/tmp/test'):
-    import thread
-    k = Khashmir(host, port, db)
-    thread.start_new_thread(reactor.run, ())
-    return k
-    
-if __name__ == "__main__":
-    import sys
-    n = 8
-    if len(sys.argv) > 1: n = int(sys.argv[1])
-    l = test_build_net(peers=n)
-    time.sleep(3)
-    print "finding nodes..."
-    for i in range(n):
-        test_find_nodes(l)
-    print "inserting and fetching values..."
-    for i in range(10):
-        test_find_value(l)
+# the whole shebang, for testing
+class Khashmir(KhashmirWrite):
+    _Node = KNodeWrite