]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
oops, another buglet
[quix0rs-apt-p2p.git] / khashmir.py
index 390d8a9dcb0a458c47e40a66af7c4c286572a3fb..36ff79f8abf02894802285f8c980af4464f8c373 100644 (file)
@@ -4,8 +4,6 @@ from const import reactor
 import const
 
 import time
-from bencode import bdecode as loads
-from bencode import bencode as dumps
 
 from sha import sha
 
@@ -22,39 +20,53 @@ from twisted.internet.app import Application
 from twisted.web import server
 threadable.init()
 
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-from xmlrpclib import Binary
-
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
+import pysqlite_exceptions
 
+KhashmirDBExcept = "KhashmirDBExcept"
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
-    def __init__(self, host, port):
+    __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+    def __init__(self, host, port, db='khashmir.db'):
        self.node = Node().init(newID(), host, port)
        self.table = KTable(self.node)
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
-       
-       ## these databases may be more suited to on-disk rather than in-memory
-       # h((key, value)) -> (key, value, time) mappings
-       self.store = db.DB()
-       self.store.open(None, None, db.DB_BTREE)
-       
-       # <insert time> -> h((key, value))
-       self.itime = db.DB()
-       self.itime.set_flags(db.DB_DUP)
-       self.itime.open(None, None, db.DB_BTREE)
-
-       # key -> h((key, value))
-       self.kw = db.DB()
-       self.kw.set_flags(db.DB_DUP)
-       self.kw.open(None, None, db.DB_BTREE)
-
-       KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-       
+       self.findDB(db)
+       self.last = time.time()
+       KeyExpirer(store=self.store)
+
+    def findDB(self, db):
+       import os
+       try:
+           os.stat(db)
+       except OSError:
+           self.createNewDB(db)
+       else:
+           self.loadDB(db)
+           
+    def loadDB(self, db):
+       try:
+           self.store = sqlite.connect(db=db)
+            self.store.autocommit = 1
+       except:
+           import traceback
+           raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+           
+    def createNewDB(self, db):
+       self.store = sqlite.connect(db=db)
+        self.store.autocommit = 1
+       s = """
+           create table kv (key text, value text, time timestamp, primary key (key, value));
+           create index kv_key on kv(key);
+           create index kv_timestamp on kv(time);
+           
+           create table nodes (id text primary key, host text, port number);
+           """
+       c = self.store.cursor()
+       c.execute(s)
+               
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -70,7 +82,7 @@ class Khashmir(xmlrpc.XMLRPC):
        """
         ping this node and add the contact info to the table on pong!
        """
-       n =Node().init(" "*20, host, port)  # note, we 
+       n =Node().init(const.NULL_ID, host, port)  # note, we 
        self.sendPing(n)
 
 
@@ -94,13 +106,16 @@ class Khashmir(xmlrpc.XMLRPC):
     
     ## also async
     def valueForKey(self, key, callback):
-       """ returns the values found for key in global table """
+       """ returns the values found for key in global table
+           callback will be called with a list of values for each peer that returns unique values
+           final callback will be an empty list - probably should change to 'more coming' arg
+       """
        nodes = self.table.findNodes(key)
 
        # get locals
        l = self.retrieveValues(key)
        if len(l) > 0:
-           reactor.callFromThread(callback, l)
+           reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
 
        # create our search state
        state = GetValue(self, key, callback)
@@ -108,21 +123,20 @@ class Khashmir(xmlrpc.XMLRPC):
        
 
 
-    ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
     def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
-           values are stored in peers on a first-come first-served basis
-           this will probably change so more than one value can be stored under a key
+           a key can have many values
        """
        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
            if not callback:
-               # default callback - this will get called for each successful store value
+               # default callback
                def _storedValueHandler(sender):
                    pass
                response=_storedValueHandler
        
-           for node in nodes:
+           for node in nodes[:const.STORE_REDUNDANCY]:
                def cb(t, table = table, node=node, resp=response):
                    self.table.insertNode(node)
                    response(t)
@@ -169,8 +183,9 @@ class Khashmir(xmlrpc.XMLRPC):
        """
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
-       def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
-           if id != 20 * ' ' and id != sender['id'].data:
+       def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+           l, sender = args
+           if id != const.NULL_ID and id != sender['id'].decode('base64'):
                # whoah, got response from different peer than we were expecting
                pass
            else:
@@ -185,15 +200,13 @@ class Khashmir(xmlrpc.XMLRPC):
        df.addCallbacks(_pongHandler,_defaultPong)
 
 
-    def findCloseNodes(self):
+    def findCloseNodes(self, callback=lambda a: None):
        """
            This does a findNode on the ID one away from our own.  
            This will allow us to populate our table with nodes on our network closest to our own.
            This is called as soon as we start up with an empty table
        """
        id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-       def callback(nodes):
-           pass
        self.findNode(id, callback)
 
     def refreshTable(self):
@@ -210,17 +223,15 @@ class Khashmir(xmlrpc.XMLRPC):
        
  
     def retrieveValues(self, key):
-       if self.kw.has_key(key):
-           c = self.kw.cursor()
-           tup = c.set(key)
-           l = []
-           while(tup and tup[0] == key):
-               h1 = tup[1]
-               v = loads(self.store[h1])[1]
-               l.append(v)
-               tup = c.next()
-           return l
-       return []
+       s = "select value from kv where key = '%s';" % key.encode('base64')
+       c = self.store.cursor()
+       c.execute(s)
+       t = c.fetchone()
+       l = []
+       while t:
+           l.append(t['value'])
+           t = c.fetchone()
+       return l
        
     #####
     ##### INCOMING MESSAGE HANDLERS
@@ -234,48 +245,42 @@ class Khashmir(xmlrpc.XMLRPC):
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
-       return self.node.senderDict()
+       return (), self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target.data)
+       nodes = self.table.findNodes(target.decode('base64'))
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
-    
+           
     def xmlrpc_store_value(self, key, value, sender):
-       key = key.data
-       h1 = sha(key+value.data).digest()
-       t = `time.time()`
-       if not self.store.has_key(h1):
-           v = dumps((key, value.data, t))
-           self.store.put(h1, v)
-           self.itime.put(t, h1)
-           self.kw.put(key, h1)
-       else:
+       t = "%0.6f" % time.time()
+       s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
+       c = self.store.cursor()
+       try:
+           c.execute(s)
+       except pysqlite_exceptions.IntegrityError, reason:
            # update last insert time
-           tup = loads(self.store[h1])
-           self.store[h1] = dumps((tup[0], tup[1], t))
-           self.itime.put(t, h1)
-
+           s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
+           c.execute(s)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
-       return self.node.senderDict()
+       return (), self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       key = key.data
+       key = key.decode('base64')
        sender['host'] = ip
        n = Node().initWithDict(sender)
        self.insertNode(n, contacted=0)
 
        l = self.retrieveValues(key)
        if len(l) > 0:
-           l = map(lambda v: Binary(v), l)
            return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
@@ -290,6 +295,7 @@ class Khashmir(xmlrpc.XMLRPC):
 
 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     from whrandom import randrange
+    import threading
     import thread
     port = 2001
     l = []
@@ -298,7 +304,7 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
        print "Building %s peer table." % peers
        
     for i in xrange(peers):
-       a = Khashmir(host, port + i)
+       a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
        l.append(a)
     
 
@@ -306,7 +312,7 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     time.sleep(1)
     for peer in l[1:]:
        peer.app.run()
-       #time.sleep(.25)
+    time.sleep(10)
 
     print "adding contacts...."
 
@@ -318,17 +324,18 @@ def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
        n = l[randrange(0, len(l))].node
        peer.addContact(host, n.port)
        if pause:
-           time.sleep(.5)
-           
+           time.sleep(.33)
+       
     time.sleep(10)
     print "finding close nodes...."
 
     for peer in l:
-       peer.findCloseNodes()
-       if pause:
-           time.sleep(2)
-    if pause:
-           time.sleep(10)
+       flag = threading.Event()
+       def cb(nodes, f=flag):
+           f.set()
+       peer.findCloseNodes(cb)
+       flag.wait()
+
 #    for peer in l:
 #      peer.refreshTable()
     return l
@@ -405,9 +412,9 @@ def test_find_value(l, quiet=0):
     d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
-def test_one(host, port):
+def test_one(host, port, db='/tmp/test'):
     import thread
-    k = Khashmir(host, port)
+    k = Khashmir(host, port, db)
     thread.start_new_thread(k.app.run, ())
     return k