]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
changed from bsddb3 to pysqlite
[quix0rs-apt-p2p.git] / khashmir.py
index 748ce2196b8b93d3e792f1875cb3a4771d1b1275..19bd3d8537af3cb2d884a487ad6f1de82a6558c0 100644 (file)
@@ -1,41 +1,72 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
 from const import reactor
+import const
+
 import time
+from bencode import bdecode as loads
+from bencode import bencode as dumps
+
+from sha import sha
 
 from ktable import KTable, K
 from knode import KNode as Node
 
-from hash import newID
+from hash import newID, newIDInRange
 
-from actions import FindNode, GetValue
+from actions import FindNode, GetValue, KeyExpirer
 from twisted.web import xmlrpc
 from twisted.internet.defer import Deferred
 from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
 threadable.init()
 
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 
+KhashmirDBExcept = "KhashmirDBExcept"
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'app']
-    def __init__(self, host, port):
-       self.node = Node(newID(), host, port)
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+    def __init__(self, host, port, db='khashmir.db'):
+       self.node = Node().init(newID(), host, port)
        self.table = KTable(self.node)
-       from twisted.internet.app import Application
-       from twisted.web import server
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
-       self.store = db.DB()
-       self.store.open(None, None, db.DB_BTREE)
-       
+       self.findDB(db)
+       self.last = time.time()
+       KeyExpirer(store=self.store)
 
+    def findDB(self, db):
+       import os
+       try:
+           os.stat(db)
+       except OSError:
+           self.createNewDB(db)
+       else:
+           self.loadDB(db)
+           
+    def loadDB(self, db):
+       try:
+           self.store = sqlite.connect(db=db)
+       except:
+           import traceback
+           raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+           
+    def createNewDB(self, db):
+       self.store = sqlite.connect(db=db)
+       s = """
+           create table kv (hkv text primary key, key text, value text, time timestamp);
+           create index kv_key on kv(key);
+           
+           create table nodes (id text primary key, host text, port number);
+           """
+       c = self.store.cursor()
+       c.execute(s)
+       self.store.commit()
+               
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -51,7 +82,7 @@ class Khashmir(xmlrpc.XMLRPC):
        """
         ping this node and add the contact info to the table on pong!
        """
-       n =Node(" "*20, host, port)  # note, we 
+       n =Node().init(const.NULL_ID, host, port)  # note, we 
        self.sendPing(n)
 
 
@@ -61,7 +92,10 @@ class Khashmir(xmlrpc.XMLRPC):
        # get K nodes out of local table/cache, or the node we want
        nodes = self.table.findNodes(id)
        d = Deferred()
-       d.addCallbacks(callback, errback)
+       if errback:
+           d.addCallbacks(callback, errback)
+       else:
+           d.addCallback(callback)
        if len(nodes) == 1 and nodes[0].id == id :
            d.callback(nodes)
        else:
@@ -72,30 +106,50 @@ class Khashmir(xmlrpc.XMLRPC):
     
     ## also async
     def valueForKey(self, key, callback):
-       """ returns the values found for key in global table """
+       """ returns the values found for key in global table
+           callback will be called with a list of values for each peer that returns unique values
+           final callback will be an empty list - probably should change to 'more coming' arg
+       """
        nodes = self.table.findNodes(key)
+
+       # get locals
+       l = self.retrieveValues(key)
+       if len(l) > 0:
+           reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
+
        # create our search state
        state = GetValue(self, key, callback)
-       reactor.callFromThread(state.goWithNodes, nodes)
+       reactor.callFromThread(state.goWithNodes, nodes, l)
+       
 
 
-    ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
-    def storeValueForKey(self, key, value):
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
-           values are stored in peers on a first-come first-served basis
-           this will probably change so more than one value can be stored under a key
+           a key can have many values
        """
-       def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
-           for node in nodes:
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+           if not callback:
+               # default callback
+               def _storedValueHandler(sender):
+                   pass
+               response=_storedValueHandler
+       
+           for node in nodes[:const.STORE_REDUNDANCY]:
+               def cb(t, table = table, node=node, resp=response):
+                   self.table.insertNode(node)
+                   response(t)
                if node.id != self.node.id:
+                   def default(err, node=node, table=table):
+                       table.nodeFailed(node)
                    df = node.storeValue(key, value, self.node.senderDict())
-                   df.addCallbacks(response, default)
+                   df.addCallbacks(cb, lambda x: None)
        # this call is asynch
        self.findNode(key, _storeValueForKey)
        
        
-    def insertNode(self, n):
+    def insertNode(self, n, contacted=1):
        """
        insert a node in our local table, pinging oldest contact in bucket, if necessary
        
@@ -104,8 +158,8 @@ class Khashmir(xmlrpc.XMLRPC):
        a node into the table without it's peer-ID.  That means of course the node passed into this
        method needs to be a properly formed Node object with a valid ID.
        """
-       old = self.table.insertNode(n)
-       if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+       old = self.table.insertNode(n, contacted=contacted)
+       if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
            # the bucket is full, check to see if old node is still around and if so, replace it
            
            ## these are the callbacks used when we ping the oldest node in a bucket
@@ -114,12 +168,13 @@ class Khashmir(xmlrpc.XMLRPC):
                self.table.replaceStaleNode(old, newnode)
        
            def _notStaleNodeHandler(sender, old=old):
-               """ called when we get a ping from the remote node """
-               if sender['id'] == old.id:
-                   self.table.insertNode(old)
+               """ called when we get a pong from the old node """
+               sender = Node().initWithDict(sender)
+               if sender.id == old.id:
+                   self.table.justSeenNode(old)
 
-           df = old.ping()
-           df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
+           df = old.ping(self.node.senderDict())
+           df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
 
     def sendPing(self, node):
@@ -128,18 +183,19 @@ class Khashmir(xmlrpc.XMLRPC):
        """
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
-       def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
-           if id != 20 * ' ' and id != sender['id']:
+       def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+           l, sender = args
+           if id != const.NULL_ID and id != sender['id'].decode('base64'):
                # whoah, got response from different peer than we were expecting
                pass
            else:
-               #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
-               n = Node(sender['id'], host, port)
+               sender['host'] = host
+               sender['port'] = port
+               n = Node().initWithDict(sender)
                table.insertNode(n)
            return
-       def _defaultPong(err):
-           # this should probably increment a failed message counter and dump the node if it gets over a threshold
-           return      
+       def _defaultPong(err, node=node, table=self.table):
+               table.nodeFailed(node)
 
        df.addCallbacks(_pongHandler,_defaultPong)
 
@@ -163,11 +219,22 @@ class Khashmir(xmlrpc.XMLRPC):
            pass
 
        for bucket in self.table.buckets:
-           if time.time() - bucket.lastAccessed >= 60 * 60:
-               id = randRange(bucket.min, bucket.max)
+           if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+               id = newIDInRange(bucket.min, bucket.max)
                self.findNode(id, callback)
        
  
+    def retrieveValues(self, key):
+       s = "select value from kv where key = '%s';" % key.encode('base64')
+       c = self.store.cursor()
+       c.execute(s)
+       t = c.fetchone()
+       l = []
+       while t:
+           l.append(t['value'])
+           t = c.fetchone()
+       return l
+       
     #####
     ##### INCOMING MESSAGE HANDLERS
     
@@ -177,87 +244,102 @@ class Khashmir(xmlrpc.XMLRPC):
            returns sender dict
        """
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
-       self.insertNode(n)
-       return self.node.senderDict()
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
+       self.insertNode(n, contacted=0)
+       return (), self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target)
+       nodes = self.table.findNodes(target.decode('base64'))
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
-       self.insertNode(n)
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
+       self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
-    
+           
     def xmlrpc_store_value(self, key, value, sender):
-       if not self.store.has_key(key):
-           self.store.put(key, value)
+       h1 = sha(key+value).digest().encode('base64')
+       t = `time.time()`
+       s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
+       c = self.store.cursor()
+       try:
+           c.execute(s)
+       except:
+           # update last insert time
+           s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
+           c.execute(s)
+       self.store.commit()
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
-       self.insertNode(n)
-       return self.node.senderDict()
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
+       self.insertNode(n, contacted=0)
+       return (), self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
-       self.insertNode(n)
-       if self.store.has_key(key):
-           return {'values' : self.store[key]}, self.node.senderDict()
+       key = key.decode('base64')
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
+       self.insertNode(n, contacted=0)
+
+       l = self.retrieveValues(key)
+       if len(l) > 0:
+           return {'values' : l}, self.node.senderDict()
        else:
-           nodes = self.table.findNodes(msg['key'])
+           nodes = self.table.findNodes(key)
            nodes = map(lambda node: node.senderDict(), nodes)
            return {'nodes' : nodes}, self.node.senderDict()
 
-    ###
-    ### message response callbacks
-    # called when we get a response to store value
-    def _storedValueHandler(self, sender):
-       pass
 
 
-#------
 
-def test_build_net(quiet=0):
+
+#------ testing
+
+def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     from whrandom import randrange
     import thread
     port = 2001
     l = []
-    peers = 16
-    
+        
     if not quiet:
        print "Building %s peer table." % peers
        
     for i in xrange(peers):
-       a = Khashmir('localhost', port + i)
+       a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
        l.append(a)
     
-    def run(l=l):
-       while(1):
-               events = 0
-               for peer in l:
-                       events = events + peer.dispatcher.runOnce()
-               if events == 0:
-                       time.sleep(.25)
 
     thread.start_new_thread(l[0].app.run, ())
+    time.sleep(1)
     for peer in l[1:]:
        peer.app.run()
-       
+       #time.sleep(.25)
+
+    print "adding contacts...."
+
     for peer in l[1:]:
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
+       peer.addContact(host, n.port)
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
+       peer.addContact(host, n.port)
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
-       
-    time.sleep(5)
+       peer.addContact(host, n.port)
+       if pause:
+           time.sleep(.5)
+           
+    time.sleep(1)
+    print "finding close nodes...."
 
     for peer in l:
        peer.findCloseNodes()
-    time.sleep(5)
-    for peer in l:
-       peer.refreshTable()
+       if pause:
+           time.sleep(.5)
+    if pause:
+           time.sleep(1)
+#    for peer in l:
+#      peer.refreshTable()
     return l
         
 def test_find_nodes(l, quiet=0):
@@ -270,8 +352,8 @@ def test_find_nodes(l, quiet=0):
     a = l[randrange(0,n)]
     b = l[randrange(0,n)]
     
-    def callback(nodes, l=l, flag=flag):
-       if (len(nodes) >0) and (nodes[0].id == b.node.id):
+    def callback(nodes, flag=flag, id = b.node.id):
+       if (len(nodes) >0) and (nodes[0].id == id):
            print "test_find_nodes      PASSED"
        else:
            print "test_find_nodes      FAILED"
@@ -282,6 +364,7 @@ def test_find_nodes(l, quiet=0):
 def test_find_value(l, quiet=0):
     from whrandom import randrange
     from sha import sha
+    from hash import newID
     import time, threading, sys
     
     fa = threading.Event()
@@ -294,47 +377,59 @@ def test_find_value(l, quiet=0):
     c = l[randrange(0,n)]
     d = l[randrange(0,n)]
 
-    key = sha(`randrange(0,100000)`).digest()
-    value = sha(`randrange(0,100000)`).digest()
+    key = newID()
+    value = newID()
     if not quiet:
-       print "inserting value...",
+       print "inserting value..."
        sys.stdout.flush()
     a.storeValueForKey(key, value)
     time.sleep(3)
     print "finding..."
+    sys.stdout.flush()
     
-    def mc(flag, value=value):
-       def callback(values, f=flag, val=value):
+    class cb:
+       def __init__(self, flag, value=value):
+           self.flag = flag
+           self.val = value
+           self.found = 0
+       def callback(self, values):
            try:
                if(len(values) == 0):
-                   print "find                FAILED"
-               else:
-                   if values[0]['value'] != val:
-                       print "find                FAILED"
+                   if not self.found:
+                       print "find                NOT FOUND"
                    else:
                        print "find                FOUND"
+                   sys.stdout.flush()
+
+               else:
+                   if self.val in values:
+                       self.found = 1
            finally:
-               f.set()
-       return callback
-    b.valueForKey(key, mc(fa))
-    c.valueForKey(key, mc(fb))
-    d.valueForKey(key, mc(fc))
-    
+               self.flag.set()
+
+    b.valueForKey(key, cb(fa).callback)
     fa.wait()
+    c.valueForKey(key, cb(fb).callback)
     fb.wait()
+    d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
+def test_one(host, port, db='/tmp/test'):
+    import thread
+    k = Khashmir(host, port, db)
+    thread.start_new_thread(k.app.run, ())
+    return k
+    
 if __name__ == "__main__":
-    l = test_build_net()
+    import sys
+    n = 8
+    if len(sys.argv) > 1:
+       n = int(sys.argv[1])
+    l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
-    test_find_nodes(l)
-    test_find_nodes(l)
-    test_find_nodes(l)
+    for i in range(10):
+       test_find_nodes(l)
     print "inserting and fetching values..."
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
+    for i in range(10):
+       test_find_value(l)