]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
call back from GetValues each time we get some new values
[quix0rs-apt-p2p.git] / khashmir.py
index 748ce2196b8b93d3e792f1875cb3a4771d1b1275..2b716e62937d7bf65a68ba0bea90ddba6fae95b0 100644 (file)
@@ -2,6 +2,8 @@
 
 from const import reactor
 import time
+from pickle import loads, dumps
+from sha import sha
 
 from ktable import KTable, K
 from knode import KNode as Node
@@ -12,6 +14,8 @@ from actions import FindNode, GetValue
 from twisted.web import xmlrpc
 from twisted.internet.defer import Deferred
 from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
 threadable.init()
 
 from bsddb3 import db ## find this at http://pybsddb.sf.net/
@@ -24,17 +28,28 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
 # this is the main class!
 class Khashmir(xmlrpc.XMLRPC):
-    __slots__ = ['listener', 'node', 'table', 'store', 'app']
+    __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
     def __init__(self, host, port):
        self.node = Node(newID(), host, port)
        self.table = KTable(self.node)
-       from twisted.internet.app import Application
-       from twisted.web import server
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
+       
+       ## these databases may be more suited to on-disk rather than in-memory
+       # h((key, value)) -> (key, value, time) mappings
        self.store = db.DB()
        self.store.open(None, None, db.DB_BTREE)
        
+       # <insert time> -> h((key, value))
+       self.itime = db.DB()
+       self.itime.set_flags(db.DB_DUP)
+       self.itime.open(None, None, db.DB_BTREE)
+
+       # key -> h((key, value))
+       self.kw = db.DB()
+       self.kw.set_flags(db.DB_DUP)
+       self.kw.open(None, None, db.DB_BTREE)
+
 
     def render(self, request):
        """
@@ -118,7 +133,7 @@ class Khashmir(xmlrpc.XMLRPC):
                if sender['id'] == old.id:
                    self.table.insertNode(old)
 
-           df = old.ping()
+           df = old.ping(self.node.senderDict())
            df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
 
 
@@ -190,8 +205,19 @@ class Khashmir(xmlrpc.XMLRPC):
        return nodes, self.node.senderDict()
     
     def xmlrpc_store_value(self, key, value, sender):
-       if not self.store.has_key(key):
-           self.store.put(key, value)
+       h1 = sha(key+value).digest()
+       t = `time.time()`
+       if not self.store.has_key(h1):
+           v = dumps((key, value, t))
+           self.store.put(h1, v)
+           self.itime.put(t, h1)
+           self.kw.put(key, h1)
+       else:
+           # update last insert time
+           tup = loads(self.store[h1])
+           self.store[h1] = dumps((tup[0], tup[1], t))
+           self.itime.put(t, h1)
+
        ip = self.crequest.getClientIP()
        n = Node(sender['id'], ip, sender['port'])
        self.insertNode(n)
@@ -201,10 +227,18 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        n = Node(sender['id'], ip, sender['port'])
        self.insertNode(n)
-       if self.store.has_key(key):
-           return {'values' : self.store[key]}, self.node.senderDict()
+       if self.kw.has_key(key):
+           c = self.kw.cursor()
+           tup = c.set_range(key)
+           l = []
+           while(tup):
+               h1 = tup[1]
+               v = loads(self.store[h1])[1]
+               l.append(v)
+               tup = c.next()
+           return {'values' : l}, self.node.senderDict()
        else:
-           nodes = self.table.findNodes(msg['key'])
+           nodes = self.table.findNodes(key)
            nodes = map(lambda node: node.senderDict(), nodes)
            return {'nodes' : nodes}, self.node.senderDict()
 
@@ -215,15 +249,18 @@ class Khashmir(xmlrpc.XMLRPC):
        pass
 
 
-#------
 
-def test_build_net(quiet=0):
+
+
+
+#------ testing
+
+def test_build_net(quiet=0, peers=64, pause=1):
     from whrandom import randrange
     import thread
     port = 2001
     l = []
-    peers = 16
-    
+        
     if not quiet:
        print "Building %s peer table." % peers
        
@@ -231,18 +268,15 @@ def test_build_net(quiet=0):
        a = Khashmir('localhost', port + i)
        l.append(a)
     
-    def run(l=l):
-       while(1):
-               events = 0
-               for peer in l:
-                       events = events + peer.dispatcher.runOnce()
-               if events == 0:
-                       time.sleep(.25)
 
     thread.start_new_thread(l[0].app.run, ())
+    time.sleep(1)
     for peer in l[1:]:
        peer.app.run()
-       
+       #time.sleep(.25)
+
+    print "adding contacts...."
+
     for peer in l[1:]:
        n = l[randrange(0, len(l))].node
        peer.addContact(n.host, n.port)
@@ -250,14 +284,20 @@ def test_build_net(quiet=0):
        peer.addContact(n.host, n.port)
        n = l[randrange(0, len(l))].node
        peer.addContact(n.host, n.port)
-       
-    time.sleep(5)
+       if pause:
+           time.sleep(.30)
+           
+    time.sleep(1)
+    print "finding close nodes...."
 
     for peer in l:
        peer.findCloseNodes()
-    time.sleep(5)
-    for peer in l:
-       peer.refreshTable()
+       if pause:
+           time.sleep(.5)
+    if pause:
+           time.sleep(10)
+#    for peer in l:
+#      peer.refreshTable()
     return l
         
 def test_find_nodes(l, quiet=0):
@@ -270,7 +310,7 @@ def test_find_nodes(l, quiet=0):
     a = l[randrange(0,n)]
     b = l[randrange(0,n)]
     
-    def callback(nodes, l=l, flag=flag):
+    def callback(nodes, flag=flag):
        if (len(nodes) >0) and (nodes[0].id == b.node.id):
            print "test_find_nodes      PASSED"
        else:
@@ -297,44 +337,52 @@ def test_find_value(l, quiet=0):
     key = sha(`randrange(0,100000)`).digest()
     value = sha(`randrange(0,100000)`).digest()
     if not quiet:
-       print "inserting value...",
+       print "inserting value..."
        sys.stdout.flush()
     a.storeValueForKey(key, value)
     time.sleep(3)
     print "finding..."
+    sys.stdout.flush()
     
-    def mc(flag, value=value):
-       def callback(values, f=flag, val=value):
+    class cb:
+       def __init__(self, flag, value=value):
+           self.flag = flag
+           self.val = value
+           self.found = 0
+       def callback(self, values):
            try:
                if(len(values) == 0):
-                   print "find                FAILED"
-               else:
-                   if values[0]['value'] != val:
+                   if not self.found:
                        print "find                FAILED"
                    else:
                        print "find                FOUND"
+                   sys.stdout.flush()
+
+               else:
+                   if self.val in values:
+                       self.found = 1
            finally:
-               f.set()
-       return callback
-    b.valueForKey(key, mc(fa))
-    c.valueForKey(key, mc(fb))
-    d.valueForKey(key, mc(fc))
-    
+               self.flag.set()
+
+    b.valueForKey(key, cb(fa).callback)
     fa.wait()
+    c.valueForKey(key, cb(fb).callback)
     fb.wait()
+    d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
+def test_one(port):
+    import thread
+    k = Khashmir('localhost', port)
+    thread.start_new_thread(k.app.run, ())
+    return k
+    
 if __name__ == "__main__":
     l = test_build_net()
     time.sleep(3)
     print "finding nodes..."
-    test_find_nodes(l)
-    test_find_nodes(l)
-    test_find_nodes(l)
+    for i in range(10):
+       test_find_nodes(l)
     print "inserting and fetching values..."
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
+    for i in range(10):
+       test_find_value(l)