]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
call back from GetValues each time we get some new values
[quix0rs-apt-p2p.git] / khashmir.py
index 6e9b6273eadbc64fe83cda3d77f1e97d2bfd843f..2b716e62937d7bf65a68ba0bea90ddba6fae95b0 100644 (file)
@@ -2,6 +2,8 @@
 
 from const import reactor
 import time
+from pickle import loads, dumps
+from sha import sha
 
 from ktable import KTable, K
 from knode import KNode as Node
@@ -26,15 +28,28 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'app']
+    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
     def __init__(self, host, port):
        self.node = Node(newID(), host, port)
        self.table = KTable(self.node)
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
+       
+       ## these databases may be more suited to on-disk rather than in-memory
+       # h((key, value)) -> (key, value, time) mappings
        self.store = db.DB()
        self.store.open(None, None, db.DB_BTREE)
        
+       # <insert time> -> h((key, value))
+       self.itime = db.DB()
+       self.itime.set_flags(db.DB_DUP)
+       self.itime.open(None, None, db.DB_BTREE)
+
+       # key -> h((key, value))
+       self.kw = db.DB()
+       self.kw.set_flags(db.DB_DUP)
+       self.kw.open(None, None, db.DB_BTREE)
+
 
     def render(self, request):
        """
@@ -118,7 +133,7 @@ class Khashmir(xmlrpc.XMLRPC):
                if sender['id'] == old.id:
                    self.table.insertNode(old)
 
-           df = old.ping()
+           df = old.ping(self.node.senderDict())
            df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
 
 
@@ -190,8 +205,19 @@ class Khashmir(xmlrpc.XMLRPC):
        return nodes, self.node.senderDict()
     
     def xmlrpc_store_value(self, key, value, sender):
-       if not self.store.has_key(key):
-           self.store.put(key, value)
+       h1 = sha(key+value).digest()
+       t = `time.time()`
+       if not self.store.has_key(h1):
+           v = dumps((key, value, t))
+           self.store.put(h1, v)
+           self.itime.put(t, h1)
+           self.kw.put(key, h1)
+       else:
+           # update last insert time
+           tup = loads(self.store[h1])
+           self.store[h1] = dumps((tup[0], tup[1], t))
+           self.itime.put(t, h1)
+
        ip = self.crequest.getClientIP()
        n = Node(sender['id'], ip, sender['port'])
        self.insertNode(n)
@@ -201,8 +227,16 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        n = Node(sender['id'], ip, sender['port'])
        self.insertNode(n)
-       if self.store.has_key(key):
-           return {'values' : self.store[key]}, self.node.senderDict()
+       if self.kw.has_key(key):
+           c = self.kw.cursor()
+           tup = c.set_range(key)
+           l = []
+           while(tup):
+               h1 = tup[1]
+               v = loads(self.store[h1])[1]
+               l.append(v)
+               tup = c.next()
+           return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
            nodes = map(lambda node: node.senderDict(), nodes)
@@ -221,13 +255,12 @@ class Khashmir(xmlrpc.XMLRPC):
 
 #------ testing
 
-def test_build_net(quiet=0):
+def test_build_net(quiet=0, peers=64, pause=1):
     from whrandom import randrange
     import thread
     port = 2001
     l = []
-    peers = 128
-    
+        
     if not quiet:
        print "Building %s peer table." % peers
        
@@ -251,14 +284,18 @@ def test_build_net(quiet=0):
        peer.addContact(n.host, n.port)
        n = l[randrange(0, len(l))].node
        peer.addContact(n.host, n.port)
-       time.sleep(.30)
-
-    time.sleep(2)
+       if pause:
+           time.sleep(.30)
+           
+    time.sleep(1)
     print "finding close nodes...."
 
     for peer in l:
        peer.findCloseNodes()
-       time.sleep(1)
+       if pause:
+           time.sleep(.5)
+    if pause:
+           time.sleep(10)
 #    for peer in l:
 #      peer.refreshTable()
     return l
@@ -273,7 +310,7 @@ def test_find_nodes(l, quiet=0):
     a = l[randrange(0,n)]
     b = l[randrange(0,n)]
     
-    def callback(nodes, l=l, flag=flag):
+    def callback(nodes, flag=flag):
        if (len(nodes) >0) and (nodes[0].id == b.node.id):
            print "test_find_nodes      PASSED"
        else:
@@ -300,43 +337,52 @@ def test_find_value(l, quiet=0):
     key = sha(`randrange(0,100000)`).digest()
     value = sha(`randrange(0,100000)`).digest()
     if not quiet:
-       print "inserting value...",
+       print "inserting value..."
        sys.stdout.flush()
     a.storeValueForKey(key, value)
     time.sleep(3)
     print "finding..."
+    sys.stdout.flush()
     
-    def mc(flag, value=value):
-       def callback(values, f=flag, val=value):
+    class cb:
+       def __init__(self, flag, value=value):
+           self.flag = flag
+           self.val = value
+           self.found = 0
+       def callback(self, values):
            try:
                if(len(values) == 0):
-                   print "find                FAILED"
-               else:
-                   if values != val:
+                   if not self.found:
                        print "find                FAILED"
                    else:
                        print "find                FOUND"
+                   sys.stdout.flush()
+
+               else:
+                   if self.val in values:
+                       self.found = 1
            finally:
-               f.set()
-       return callback
-    b.valueForKey(key, mc(fa))
+               self.flag.set()
+
+    b.valueForKey(key, cb(fa).callback)
     fa.wait()
-    c.valueForKey(key, mc(fb))
+    c.valueForKey(key, cb(fb).callback)
     fb.wait()
-    d.valueForKey(key, mc(fc))    
+    d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
+def test_one(port):
+    import thread
+    k = Khashmir('localhost', port)
+    thread.start_new_thread(k.app.run, ())
+    return k
+    
 if __name__ == "__main__":
     l = test_build_net()
     time.sleep(3)
     print "finding nodes..."
-    test_find_nodes(l)
-    test_find_nodes(l)
-    test_find_nodes(l)
+    for i in range(10):
+       test_find_nodes(l)
     print "inserting and fetching values..."
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
+    for i in range(10):
+       test_find_value(l)