]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
return K nodes and not K-1
[quix0rs-apt-p2p.git] / khashmir.py
index 20cc062f0b9a2128ba1d358634666510329e7295..ca6c3d0d1d5cd7c15e0b56f84b127615503d308b 100644 (file)
@@ -1,6 +1,8 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
 from const import reactor
+import const
+
 import time
 from pickle import loads, dumps
 from sha import sha
@@ -21,10 +23,7 @@ threadable.init()
 from bsddb3 import db ## find this at http://pybsddb.sf.net/
 from bsddb3._db import DBNotFoundError
 
-from base64 import decodestring as decode
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
+from xmlrpclib import Binary
 
 
 
@@ -92,14 +91,16 @@ class Khashmir(xmlrpc.XMLRPC):
     def valueForKey(self, key, callback):
        """ returns the values found for key in global table """
        nodes = self.table.findNodes(key)
-       # decode values, they will be base64 encoded
-       def cbwrap(values, cb=callback):
-           values = map(lambda x: decode(x), values)
-           callback(values)
-       # create our search state
-       state = GetValue(self, key, cbwrap)
-       reactor.callFromThread(state.goWithNodes, nodes)
 
+       # get locals
+       l = self.retrieveValues(key)
+       if len(l) > 0:
+           reactor.callFromThread(callback, l)
+
+       # create our search state
+       state = GetValue(self, key, callback)
+       reactor.callFromThread(state.goWithNodes, nodes, l)
+       
 
 
     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
@@ -109,21 +110,27 @@ class Khashmir(xmlrpc.XMLRPC):
            values are stored in peers on a first-come first-served basis
            this will probably change so more than one value can be stored under a key
        """
-       def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
            if not callback:
                # default callback - this will get called for each successful store value
                def _storedValueHandler(sender):
                    pass
                response=_storedValueHandler
+       
            for node in nodes:
+               def cb(t, table = table, node=node, resp=response):
+                   self.table.insertNode(node)
+                   response(t)
                if node.id != self.node.id:
+                   def default(err, node=node, table=table):
+                       table.nodeFailed(node)
                    df = node.storeValue(key, value, self.node.senderDict())
-                   df.addCallbacks(response, default)
+                   df.addCallback(cb)
        # this call is asynch
        self.findNode(key, _storeValueForKey)
        
        
-    def insertNode(self, n):
+    def insertNode(self, n, contacted=1):
        """
        insert a node in our local table, pinging oldest contact in bucket, if necessary
        
@@ -132,8 +139,8 @@ class Khashmir(xmlrpc.XMLRPC):
        a node into the table without it's peer-ID.  That means of course the node passed into this
        method needs to be a properly formed Node object with a valid ID.
        """
-       old = self.table.insertNode(n)
-       if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+       old = self.table.insertNode(n, contacted=contacted)
+       if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
            # the bucket is full, check to see if old node is still around and if so, replace it
            
            ## these are the callbacks used when we ping the oldest node in a bucket
@@ -142,9 +149,12 @@ class Khashmir(xmlrpc.XMLRPC):
                self.table.replaceStaleNode(old, newnode)
        
            def _notStaleNodeHandler(sender, old=old):
-               """ called when we get a ping from the remote node """
-               if sender['id'] == old.id:
-                   self.table.insertNode(old)
+               """ called when we get a pong from the old node """
+               sender, conn = sender
+               sender['host'] = conn['host']
+               sender = Node().initWithDict(sender)
+               if sender.id == old.id:
+                   self.table.justSeenNode(old)
 
            df = old.ping(self.node.senderDict())
            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
@@ -157,19 +167,18 @@ class Khashmir(xmlrpc.XMLRPC):
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
        def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
-           if id != 20 * ' ' and id != sender['id']:
+           sender = sender[0]
+           if id != 20 * ' ' and id != sender['id'].data:
                # whoah, got response from different peer than we were expecting
                pass
            else:
-               #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
                sender['host'] = host
                sender['port'] = port
                n = Node().initWithDict(sender)
                table.insertNode(n)
            return
-       def _defaultPong(err):
-           # this should probably increment a failed message counter and dump the node if it gets over a threshold
-           return      
+       def _defaultPong(err, node=node, table=self.table):
+               table.nodeFailed(node)
 
        df.addCallbacks(_pongHandler,_defaultPong)
 
@@ -198,6 +207,19 @@ class Khashmir(xmlrpc.XMLRPC):
                self.findNode(id, callback)
        
  
+    def retrieveValues(self, key):
+       if self.kw.has_key(key):
+           c = self.kw.cursor()
+           tup = c.set(key)
+           l = []
+           while(tup and tup[0] == key):
+               h1 = tup[1]
+               v = loads(self.store[h1])[1]
+               l.append(v)
+               tup = c.next()
+           return l
+       return []
+       
     #####
     ##### INCOMING MESSAGE HANDLERS
     
@@ -209,24 +231,24 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
-       nodes = self.table.findNodes(target)
+       nodes = self.table.findNodes(target.data)
        nodes = map(lambda node: node.senderDict(), nodes)
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
     
     def xmlrpc_store_value(self, key, value, sender):
-       key = decode(key)
-       h1 = sha(key+value).digest()
+       key = key.data
+       h1 = sha(key+value.data).digest()
        t = `time.time()`
        if not self.store.has_key(h1):
-           v = dumps((key, value, t))
+           v = dumps((key, value.data, t))
            self.store.put(h1, v)
            self.itime.put(t, h1)
            self.kw.put(key, h1)
@@ -239,25 +261,19 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
        ip = self.crequest.getClientIP()
-       key = decode(key)
+       key = key.data
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
 
-       if self.kw.has_key(key):
-           c = self.kw.cursor()
-           tup = c.set(key)
-           l = []
-           while(tup):
-               h1 = tup[1]
-               v = loads(self.store[h1])[1]
-               l.append(v)
-               tup = c.next()
+       l = self.retrieveValues(key)
+       if len(l) > 0:
+           l = map(lambda v: Binary(v), l)
            return {'values' : l}, self.node.senderDict()
        else:
            nodes = self.table.findNodes(key)
@@ -369,7 +385,7 @@ def test_find_value(l, quiet=0):
            try:
                if(len(values) == 0):
                    if not self.found:
-                       print "find                FAILED"
+                       print "find                NOT FOUND"
                    else:
                        print "find                FOUND"
                    sys.stdout.flush()
@@ -387,14 +403,18 @@ def test_find_value(l, quiet=0):
     d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
-def test_one(port):
+def test_one(host, port):
     import thread
-    k = Khashmir('localhost', port)
+    k = Khashmir(host, port)
     thread.start_new_thread(k.app.run, ())
     return k
     
 if __name__ == "__main__":
-    l = test_build_net()
+    import sys
+    n = 8
+    if len(sys.argv) > 1:
+       n = int(sys.argv[1])
+    l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):