]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
oops, another buglet
[quix0rs-apt-p2p.git] / khashmir.py
index ece28516d04139fad277c537860b0786a68fbe27..36ff79f8abf02894802285f8c980af4464f8c373 100644 (file)
@@ -1,14 +1,16 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
 from const import reactor
+import const
+
 import time
-from pickle import loads, dumps
+
 from sha import sha
 
 from ktable import KTable, K
 from knode import KNode as Node
 
-from hash import newID
+from hash import newID, newIDInRange
 
 from actions import FindNode, GetValue, KeyExpirer
 from twisted.web import xmlrpc
@@ -18,42 +20,53 @@ from twisted.internet.app import Application
 from twisted.web import server
 threadable.init()
 
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
+import pysqlite_exceptions
 
+KhashmirDBExcept = "KhashmirDBExcept"
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
-    def __init__(self, host, port):
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+    def __init__(self, host, port, db='khashmir.db'):
        self.node = Node().init(newID(), host, port)
        self.table = KTable(self.node)
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
-       
-       ## these databases may be more suited to on-disk rather than in-memory
-       # h((key, value)) -> (key, value, time) mappings
-       self.store = db.DB()
-       self.store.open(None, None, db.DB_BTREE)
-       
-       # <insert time> -> h((key, value))
-       self.itime = db.DB()
-       self.itime.set_flags(db.DB_DUP)
-       self.itime.open(None, None, db.DB_BTREE)
-
-       # key -> h((key, value))
-       self.kw = db.DB()
-       self.kw.set_flags(db.DB_DUP)
-       self.kw.open(None, None, db.DB_BTREE)
-
-       KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-       
+       self.findDB(db)
+       self.last = time.time()
+       KeyExpirer(store=self.store)
+
+    def findDB(self, db):
+       import os
+       try:
+           os.stat(db)
+       except OSError:
+           self.createNewDB(db)
+       else:
+           self.loadDB(db)
+           
+    def loadDB(self, db):
+       try:
+           self.store = sqlite.connect(db=db)
+            self.store.autocommit = 1
+       except:
+           import traceback
+           raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+           
+    def createNewDB(self, db):
+       self.store = sqlite.connect(db=db)
+        self.store.autocommit = 1
+       s = """
+           create table kv (key text, value text, time timestamp, primary key (key, value));
+           create index kv_key on kv(key);
+           create index kv_timestamp on kv(time);
+           
+           create table nodes (id text primary key, host text, port number);
+           """
+       c = self.store.cursor()
+       c.execute(s)
+               
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -69,7 +82,7 @@ class Khashmir(xmlrpc.XMLRPC):
        """
         ping this node and add the contact info to the table on pong!
        """
-       n =Node().init(" "*20, host, port)  # note, we 
+       n =Node().init(const.NULL_ID, host, port)  # note, we 
        self.sendPing(n)
 
 
@@ -79,7 +92,10 @@ class Khashmir(xmlrpc.XMLRPC):
        # get K nodes out of local table/cache, or the node we want
        nodes = self.table.findNodes(id)
        d = Deferred()
-       d.addCallbacks(callback, errback)
+       if errback:
+           d.addCallbacks(callback, errback)
+       else:
+           d.addCallback(callback)
        if len(nodes) == 1 and nodes[0].id == id :
            d.callback(nodes)
        else:
@@ -90,41 +106,50 @@ class Khashmir(xmlrpc.XMLRPC):
     
     ## also async
     def valueForKey(self, key, callback):
-       """ returns the values found for key in global table """
+       """ returns the values found for key in global table
+           callback will be called with a list of values for each peer that returns unique values
+           final callback will be an empty list - probably should change to 'more coming' arg
+       """
        nodes = self.table.findNodes(key)
-       # create our search state
-       state = GetValue(self, key, callback)
-       reactor.callFromThread(state.goWithNodes, nodes)
-       
+
        # get locals
        l = self.retrieveValues(key)
        if len(l) > 0:
-           callback(l)
+           reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
 
+       # create our search state
+       state = GetValue(self, key, callback)
+       reactor.callFromThread(state.goWithNodes, nodes, l)
+       
 
 
-    ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
     def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
-           values are stored in peers on a first-come first-served basis
-           this will probably change so more than one value can be stored under a key
+           a key can have many values
        """
-       def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
            if not callback:
-               # default callback - this will get called for each successful store value
+               # default callback
                def _storedValueHandler(sender):
                    pass
                response=_storedValueHandler
-           for node in nodes:
+       
+           for node in nodes[:const.STORE_REDUNDANCY]:
+               def cb(t, table = table, node=node, resp=response):
+                   self.table.insertNode(node)
+                   response(t)
                if node.id != self.node.id:
+                   def default(err, node=node, table=table):
+                       table.nodeFailed(node)
                    df = node.storeValue(key, value, self.node.senderDict())
-                   df.addCallbacks(response, default)
+                   df.addCallbacks(cb, lambda x: None)
        # this call is asynch
        self.findNode(key, _storeValueForKey)
        
        
-    def insertNode(self, n):
+    def insertNode(self, n, contacted=1):
        """
        insert a node in our local table, pinging oldest contact in bucket, if necessary
        
@@ -133,8 +158,8 @@ class Khashmir(xmlrpc.XMLRPC):
        a node into the table without it's peer-ID.  That means of course the node passed into this
        method needs to be a properly formed Node object with a valid ID.
        """
-       old = self.table.insertNode(n)
-       if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+       old = self.table.insertNode(n, contacted=contacted)
+       if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
            # the bucket is full, check to see if old node is still around and if so, replace it
            
            ## these are the callbacks used when we ping the oldest node in a bucket
@@ -143,10 +168,10 @@ class Khashmir(xmlrpc.XMLRPC):
                self.table.replaceStaleNode(old, newnode)
        
            def _notStaleNodeHandler(sender, old=old):
-               """ called when we get a ping from the remote node """
+               """ called when we get a pong from the old node """
                sender = Node().initWithDict(sender)
                if sender.id == old.id:
-                   self.table.insertNode(old)
+                   self.table.justSeenNode(old)
 
            df = old.ping(self.node.senderDict())
            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
@@ -158,33 +183,30 @@ class Khashmir(xmlrpc.XMLRPC):
        """
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
-       def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
-           if id != 20 * ' ' and id != sender['id'].data:
+       def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+           l, sender = args
+           if id != const.NULL_ID and id != sender['id'].decode('base64'):
                # whoah, got response from different peer than we were expecting
                pass
            else:
-               #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
                sender['host'] = host
                sender['port'] = port
                n = Node().initWithDict(sender)
                table.insertNode(n)
            return
-       def _defaultPong(err):
-           # this should probably increment a failed message counter and dump the node if it gets over a threshold
-           return      
+       def _defaultPong(err, node=node, table=self.table):
+               table.nodeFailed(node)
 
        df.addCallbacks(_pongHandler,_defaultPong)
 
 
-    def findCloseNodes(self):
+    def findCloseNodes(self, callback=lambda a: None):
        """
            This does a findNode on the ID one away from our own.  
            This will allow us to populate our table with nodes on our network closest to our own.
            This is called as soon as we start up with an empty table
        """
        id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-       def callback(nodes):
-           pass
        self.findNode(id, callback)
 
     def refreshTable(self):
@@ -195,24 +217,21 @@ class Khashmir(xmlrpc.XMLRPC):
            pass
 
        for bucket in self.table.buckets:
-           if time.time() - bucket.lastAccessed >= 60 * 60:
-               id = randRange(bucket.min, bucket.max)
+           if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+               id = newIDInRange(bucket.min, bucket.max)
                self.findNode(id, callback)
        
  
     def retrieveValues(self, key):
-       if self.kw.has_key(key):
-           c = self.kw.cursor()
-           tup = c.set(key)
-           l = []
-           while(tup and tup[0] == key):
-               h1 = tup[1]
-               v = loads(self.store[h1])[1]
-               l.append(v)
-               tup = c.next()
-           l = map(lambda v: Binary(v), l)
-           return l
-       return []
+       s = "select value from kv where key = '%s';" % key.encode('base64')
+       c = self.store.cursor()
+       c.execute(s)
+       t = c.fetchone()
+       l = []
+       while t:
+           l.append(t['value'])
+           t = c.fetchone()
+       return l
        
     #####
     ##### INCOMING MESSAGE HANDLERS
@@ -225,48 +244,43 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
-       return self.node.senderDict()
+       self.insertNode(n, contacted=0)
+       return (), self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target.data)
+       nodes = self.table.findNodes(target.decode('base64'))
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
-    
+           
     def xmlrpc_store_value(self, key, value, sender):
-       key = key.data
-       h1 = sha(key+value.data).digest()
-       t = `time.time()`
-       if not self.store.has_key(h1):
-           v = dumps((key, value.data, t))
-           self.store.put(h1, v)
-           self.itime.put(t, h1)
-           self.kw.put(key, h1)
-       else:
+       t = "%0.6f" % time.time()
+       s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
+       c = self.store.cursor()
+       try:
+           c.execute(s)
+       except pysqlite_exceptions.IntegrityError, reason:
            # update last insert time
-           tup = loads(self.store[h1])
-           self.store[h1] = dumps((tup[0], tup[1], t))
-           self.itime.put(t, h1)
-
+           s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
+           c.execute(s)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
-       return self.node.senderDict()
+       self.insertNode(n, contacted=0)
+       return (), self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       key = key.data
+       key = key.decode('base64')
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
 
        l = self.retrieveValues(key)
-       if len(l):
+       if len(l) > 0:
            return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
@@ -281,6 +295,7 @@ class Khashmir(xmlrpc.XMLRPC):
 
 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     from whrandom import randrange
+    import threading
     import thread
     port = 2001
     l = []
@@ -289,7 +304,7 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
        print "Building %s peer table." % peers
        
     for i in xrange(peers):
-       a = Khashmir(host, port + i)
+       a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
        l.append(a)
     
 
@@ -297,7 +312,7 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     time.sleep(1)
     for peer in l[1:]:
        peer.app.run()
-       #time.sleep(.25)
+    time.sleep(10)
 
     print "adding contacts...."
 
@@ -309,17 +324,18 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
        n = l[randrange(0, len(l))].node
        peer.addContact(host, n.port)
        if pause:
-           time.sleep(.30)
-           
-    time.sleep(1)
+           time.sleep(.33)
+       
+    time.sleep(10)
     print "finding close nodes...."
 
     for peer in l:
-       peer.findCloseNodes()
-       if pause:
-           time.sleep(.5)
-    if pause:
-           time.sleep(2)
+       flag = threading.Event()
+       def cb(nodes, f=flag):
+           f.set()
+       peer.findCloseNodes(cb)
+       flag.wait()
+
 #    for peer in l:
 #      peer.refreshTable()
     return l
@@ -378,7 +394,7 @@ def test_find_value(l, quiet=0):
            try:
                if(len(values) == 0):
                    if not self.found:
-                       print "find                FAILED"
+                       print "find                NOT FOUND"
                    else:
                        print "find                FOUND"
                    sys.stdout.flush()
@@ -396,14 +412,18 @@ def test_find_value(l, quiet=0):
     d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
-def test_one(port):
+def test_one(host, port, db='/tmp/test'):
     import thread
-    k = Khashmir('localhost', port)
+    k = Khashmir(host, port, db)
     thread.start_new_thread(k.app.run, ())
     return k
     
 if __name__ == "__main__":
-    l = test_build_net()
+    import sys
+    n = 8
+    if len(sys.argv) > 1:
+       n = int(sys.argv[1])
+    l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):