]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
fix bug in pong handler
[quix0rs-apt-p2p.git] / khashmir.py
index aa1902f2bbe96ec1dde250a4a5e7a029e353bc8a..8321cf7ce08d3d83a22c92c2e343871e2176ec6b 100644 (file)
@@ -2,13 +2,15 @@
 
 from const import reactor
 import time
+from pickle import loads, dumps
+from sha import sha
 
 from ktable import KTable, K
 from knode import KNode as Node
 
 from hash import newID
 
-from actions import FindNode, GetValue
+from actions import FindNode, GetValue, KeyExpirer
 from twisted.web import xmlrpc
 from twisted.internet.defer import Deferred
 from twisted.python import threadable
@@ -19,6 +21,8 @@ threadable.init()
 from bsddb3 import db ## find this at http://pybsddb.sf.net/
 from bsddb3._db import DBNotFoundError
 
+from xmlrpclib import Binary
+
 # don't ping unless it's been at least this many seconds since we've heard from a peer
 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
@@ -26,16 +30,30 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'app']
+    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
     def __init__(self, host, port):
-       self.node = Node(newID(), host, port)
+       self.node = Node().init(newID(), host, port)
        self.table = KTable(self.node)
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
+       
+       ## these databases may be more suited to on-disk rather than in-memory
+       # h((key, value)) -> (key, value, time) mappings
        self.store = db.DB()
        self.store.open(None, None, db.DB_BTREE)
        
+       # <insert time> -> h((key, value))
+       self.itime = db.DB()
+       self.itime.set_flags(db.DB_DUP)
+       self.itime.open(None, None, db.DB_BTREE)
+
+       # key -> h((key, value))
+       self.kw = db.DB()
+       self.kw.set_flags(db.DB_DUP)
+       self.kw.open(None, None, db.DB_BTREE)
 
+       KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
+       
     def render(self, request):
        """
            Override the built in render so we can have access to the request object!
@@ -51,7 +69,7 @@ class Khashmir(xmlrpc.XMLRPC):
        """
         ping this node and add the contact info to the table on pong!
        """
-       n =Node(" "*20, host, port)  # note, we 
+       n =Node().init(" "*20, host, port)  # note, we 
        self.sendPing(n)
 
 
@@ -74,19 +92,26 @@ class Khashmir(xmlrpc.XMLRPC):
     def valueForKey(self, key, callback):
        """ returns the values found for key in global table """
        nodes = self.table.findNodes(key)
+       # decode values, they will be base64 encoded
        # create our search state
        state = GetValue(self, key, callback)
        reactor.callFromThread(state.goWithNodes, nodes)
 
 
+
     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
-    def storeValueForKey(self, key, value):
+    def storeValueForKey(self, key, value, callback=None):
        """ stores the value for key in the global table, returns immediately, no status 
            in this implementation, peers respond but don't indicate status to storing values
            values are stored in peers on a first-come first-served basis
            this will probably change so more than one value can be stored under a key
        """
-       def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+           if not callback:
+               # default callback - this will get called for each successful store value
+               def _storedValueHandler(sender):
+                   pass
+               response=_storedValueHandler
            for node in nodes:
                if node.id != self.node.id:
                    df = node.storeValue(key, value, self.node.senderDict())
@@ -115,11 +140,12 @@ class Khashmir(xmlrpc.XMLRPC):
        
            def _notStaleNodeHandler(sender, old=old):
                """ called when we get a ping from the remote node """
-               if sender['id'] == old.id:
+               sender = Node().initWithSenderDict(sender)
+               if sender.id == old.id:
                    self.table.insertNode(old)
 
-           df = old.ping()
-           df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
+           df = old.ping(self.node.senderDict())
+           df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
 
     def sendPing(self, node):
@@ -129,12 +155,15 @@ class Khashmir(xmlrpc.XMLRPC):
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
        def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
-           if id != 20 * ' ' and id != sender['id']:
+           sender = Node().initWithSenderDict(sender)
+           if id != 20 * ' ' and id != sender.id:
                # whoah, got response from different peer than we were expecting
                pass
            else:
                #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
-               n = Node(sender['id'], host, port)
+               sender['host'] = host
+               sender['port'] = port
+               n = Node().initWithDict(sender)
                table.insertNode(n)
            return
        def _defaultPong(err):
@@ -177,7 +206,8 @@ class Khashmir(xmlrpc.XMLRPC):
            returns sender dict
        """
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
        self.insertNode(n)
        return self.node.senderDict()
                
@@ -185,83 +215,104 @@ class Khashmir(xmlrpc.XMLRPC):
        nodes = self.table.findNodes(target)
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
        self.insertNode(n)
        return nodes, self.node.senderDict()
     
     def xmlrpc_store_value(self, key, value, sender):
-       if not self.store.has_key(key):
-           self.store.put(key, value)
+       key = key.data
+       h1 = sha(key+value.data).digest()
+       t = `time.time()`
+       if not self.store.has_key(h1):
+           v = dumps((key, value.data, t))
+           self.store.put(h1, v)
+           self.itime.put(t, h1)
+           self.kw.put(key, h1)
+       else:
+           # update last insert time
+           tup = loads(self.store[h1])
+           self.store[h1] = dumps((tup[0], tup[1], t))
+           self.itime.put(t, h1)
+
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
        self.insertNode(n)
        return self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       n = Node(sender['id'], ip, sender['port'])
+       key = key.data
+       sender['host'] = ip
+       n = Node().initWithDict(sender)
        self.insertNode(n)
-       if self.store.has_key(key):
-           return {'values' : self.store[key]}, self.node.senderDict()
+
+       if self.kw.has_key(key):
+           c = self.kw.cursor()
+           tup = c.set(key)
+           l = []
+           while(tup):
+               h1 = tup[1]
+               v = loads(self.store[h1])[1]
+               l.append(v)
+               tup = c.next()
+           l = map(lambda v: Binary(v), l)
+           return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
            nodes = map(lambda node: node.senderDict(), nodes)
            return {'nodes' : nodes}, self.node.senderDict()
 
-    ###
-    ### message response callbacks
-    # called when we get a response to store value
-    def _storedValueHandler(self, sender):
-       pass
-
-
 
 
 
 
 #------ testing
 
-def test_build_net(quiet=0):
+def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
     from whrandom import randrange
     import thread
     port = 2001
     l = []
-    peers = 16
-    
+        
     if not quiet:
        print "Building %s peer table." % peers
        
     for i in xrange(peers):
-       a = Khashmir('localhost', port + i)
+       a = Khashmir(host, port + i)
        l.append(a)
     
-    def run(l=l):
-       while(1):
-               events = 0
-               for peer in l:
-                       events = events + peer.dispatcher.runOnce()
-               if events == 0:
-                       time.sleep(.25)
 
     thread.start_new_thread(l[0].app.run, ())
+    time.sleep(1)
     for peer in l[1:]:
        peer.app.run()
-       
+       #time.sleep(.25)
+
+    print "adding contacts...."
+
     for peer in l[1:]:
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
+       peer.addContact(host, n.port)
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
+       peer.addContact(host, n.port)
        n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
-       
-    time.sleep(5)
+       peer.addContact(host, n.port)
+       if pause:
+           time.sleep(.30)
+           
+    time.sleep(1)
+    print "finding close nodes...."
 
     for peer in l:
        peer.findCloseNodes()
-    time.sleep(5)
-    for peer in l:
-       peer.refreshTable()
+       if pause:
+           time.sleep(.5)
+    if pause:
+           time.sleep(2)
+#    for peer in l:
+#      peer.refreshTable()
     return l
         
 def test_find_nodes(l, quiet=0):
@@ -274,8 +325,8 @@ def test_find_nodes(l, quiet=0):
     a = l[randrange(0,n)]
     b = l[randrange(0,n)]
     
-    def callback(nodes, l=l, flag=flag):
-       if (len(nodes) >0) and (nodes[0].id == b.node.id):
+    def callback(nodes, flag=flag, id = b.node.id):
+       if (len(nodes) >0) and (nodes[0].id == id):
            print "test_find_nodes      PASSED"
        else:
            print "test_find_nodes      FAILED"
@@ -286,6 +337,7 @@ def test_find_nodes(l, quiet=0):
 def test_find_value(l, quiet=0):
     from whrandom import randrange
     from sha import sha
+    from hash import newID
     import time, threading, sys
     
     fa = threading.Event()
@@ -298,46 +350,55 @@ def test_find_value(l, quiet=0):
     c = l[randrange(0,n)]
     d = l[randrange(0,n)]
 
-    key = sha(`randrange(0,100000)`).digest()
-    value = sha(`randrange(0,100000)`).digest()
+    key = newID()
+    value = newID()
     if not quiet:
-       print "inserting value...",
+       print "inserting value..."
        sys.stdout.flush()
     a.storeValueForKey(key, value)
     time.sleep(3)
     print "finding..."
+    sys.stdout.flush()
     
-    def mc(flag, value=value):
-       def callback(values, f=flag, val=value):
+    class cb:
+       def __init__(self, flag, value=value):
+           self.flag = flag
+           self.val = value
+           self.found = 0
+       def callback(self, values):
            try:
                if(len(values) == 0):
-                   print "find                FAILED"
-               else:
-                   if values != val:
+                   if not self.found:
                        print "find                FAILED"
                    else:
                        print "find                FOUND"
+                   sys.stdout.flush()
+
+               else:
+                   if self.val in values:
+                       self.found = 1
            finally:
-               f.set()
-       return callback
-    b.valueForKey(key, mc(fa))
+               self.flag.set()
+
+    b.valueForKey(key, cb(fa).callback)
     fa.wait()
-    c.valueForKey(key, mc(fb))
+    c.valueForKey(key, cb(fb).callback)
     fb.wait()
-    d.valueForKey(key, mc(fc))    
+    d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
+def test_one(port):
+    import thread
+    k = Khashmir('localhost', port)
+    thread.start_new_thread(k.app.run, ())
+    return k
+    
 if __name__ == "__main__":
     l = test_build_net()
     time.sleep(3)
     print "finding nodes..."
-    test_find_nodes(l)
-    test_find_nodes(l)
-    test_find_nodes(l)
+    for i in range(10):
+       test_find_nodes(l)
     print "inserting and fetching values..."
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
+    for i in range(10):
+       test_find_value(l)