]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
fix bug where we fail to return any values we have stored locally
[quix0rs-apt-p2p.git] / khashmir.py
index f3e410aa3087b1b8fc94772f0b3e0eeefe6a981e..a738a97fad8048e2296aeee876a9d8a20bddfe24 100644 (file)
@@ -39,6 +39,7 @@ class Khashmir(protocol.Factory):
         
     def setup(self, host, port, db='khashmir.db'):
         self._findDB(db)
+        self.port = port
         self.node = self._loadSelfNode(host, port)
         self.table = KTable(self.node)
         self.app = Application("krpc")
@@ -160,7 +161,7 @@ class Khashmir(protocol.Factory):
     
     
     ## also async
-    def valueForKey(self, key, callback):
+    def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
             callback will be called with a list of values for each peer that returns unique values
             final callback will be an empty list - probably should change to 'more coming' arg
@@ -168,7 +169,12 @@ class Khashmir(protocol.Factory):
         nodes = self.table.findNodes(key)
         
         # get locals
-        l = self.retrieveValues(key)
+        if searchlocal:
+            l = self.retrieveValues(key)
+            if len(l) > 0:
+                reactor.callLater(0, callback, (l))
+        else:
+            l = []
         
         # create our search state
         state = GetValue(self, key, callback)
@@ -213,6 +219,8 @@ class Khashmir(protocol.Factory):
             
             def _notStaleNodeHandler(dict, old=old):
                 """ called when we get a pong from the old node """
+                _krpc_sender = dict['_krpc_sender']
+                dict = dict['rsp']
                 sender = dict['sender']
                 if sender['id'] == old.id:
                     self.table.justSeenNode(old.id)
@@ -227,6 +235,8 @@ class Khashmir(protocol.Factory):
         df = node.ping(self.node.senderDict())
         ## these are the callbacks we use when we issue a PING
         def _pongHandler(dict, node=node, table=self.table, callback=callback):
+            _krpc_sender = dict['_krpc_sender']
+            dict = dict['rsp']
             sender = dict['sender']
             if node.id != const.NULL_ID and node.id != sender['id']:
                 # whoah, got response from different peer than we were expecting
@@ -288,6 +298,7 @@ class Khashmir(protocol.Factory):
             returns sender dict
         """
         sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.airhook.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
@@ -297,6 +308,7 @@ class Khashmir(protocol.Factory):
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
         sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.airhook.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
@@ -313,6 +325,7 @@ class Khashmir(protocol.Factory):
             s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
             c.execute(s)
         sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.airhook.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
@@ -320,6 +333,7 @@ class Khashmir(protocol.Factory):
     
     def krpc_find_value(self, key, sender, _krpc_sender):
         sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
         n = Node().initWithDict(sender)
         n.conn = self.airhook.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
@@ -436,7 +450,7 @@ def test_find_nodes(l, quiet=0):
     flag.wait()
     
 def test_find_value(l, quiet=0):
-    
+    ff = threading.Event()
     fa = threading.Event()
     fb = threading.Event()
     fc = threading.Event()
@@ -450,34 +464,38 @@ def test_find_value(l, quiet=0):
     key = newID()
     value = newID()
     if not quiet: print "inserting value..."
-    a.storeValueForKey(key, value)
-    time.sleep(3)
+    def acb(p, f=ff):
+        f.set()
+    a.storeValueForKey(key, value, acb)
+    ff.wait()
+    
     if not quiet:
         print "finding..."
     
     class cb:
-        def __init__(self, flag, value=value):
+        def __init__(self, flag, value=value, port=None):
             self.flag = flag
             self.val = value
             self.found = 0
+            self.port = port
         def callback(self, values):
             try:
                 if(len(values) == 0):
                     if not self.found:
-                        print "find                NOT FOUND"
+                        print "find   %s             NOT FOUND" % self.port
                     else:
-                        print "find                FOUND"
+                        print "find   %s           FOUND" % self.port
                 else:
                     if self.val in values:
                         self.found = 1
             finally:
                 self.flag.set()
     
-    b.valueForKey(key, cb(fa).callback)
+    b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
     fa.wait()
-    c.valueForKey(key, cb(fb).callback)
+    c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
     fb.wait()
-    d.valueForKey(key, cb(fc).callback)    
+    d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)    
     fc.wait()
     
 def test_one(host, port, db='/tmp/test'):
@@ -493,7 +511,7 @@ if __name__ == "__main__":
     l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
-    for i in range(10):
+    for i in range(n):
         test_find_nodes(l)
     print "inserting and fetching values..."
     for i in range(10):