]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
finding values is now fixed
[quix0rs-apt-p2p.git] / khashmir.py
index 32dab3e6522211386cc0bcbd26dd608b205e49ee..aa1902f2bbe96ec1dde250a4a5e7a029e353bc8a 100644 (file)
@@ -6,11 +6,14 @@ import time
 from ktable import KTable, K
 from knode import KNode as Node
 
-from hash import newID, intify
+from hash import newID
 
+from actions import FindNode, GetValue
 from twisted.web import xmlrpc
 from twisted.internet.defer import Deferred
 from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
 threadable.init()
 
 from bsddb3 import db ## find this at http://pybsddb.sf.net/
@@ -19,9 +22,6 @@ from bsddb3._db import DBNotFoundError
 # don't ping unless it's been at least this many seconds since we've heard from a peer
 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
 
 
 # this is the main class!
@@ -30,8 +30,6 @@ class Khashmir(xmlrpc.XMLRPC):
     def __init__(self, host, port):
        self.node = Node(newID(), host, port)
        self.table = KTable(self.node)
-       from twisted.internet.app import Application
-       from twisted.web import server
        self.app = Application("xmlrpc")
        self.app.listenTCP(port, server.Site(self))
        self.store = db.DB()
@@ -206,7 +204,7 @@ class Khashmir(xmlrpc.XMLRPC):
        if self.store.has_key(key):
            return {'values' : self.store[key]}, self.node.senderDict()
        else:
-           nodes = self.table.findNodes(msg['key'])
+           nodes = self.table.findNodes(key)
            nodes = map(lambda node: node.senderDict(), nodes)
            return {'nodes' : nodes}, self.node.senderDict()
 
@@ -216,172 +214,12 @@ class Khashmir(xmlrpc.XMLRPC):
     def _storedValueHandler(self, sender):
        pass
 
-       
-    
-    
-
-class ActionBase:
-    """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, table, target, callback):
-       self.table = table
-       self.target = target
-       self.int = intify(target)
-       self.found = {}
-       self.queried = {}
-       self.answered = {}
-       self.callback = callback
-       self.outstanding = 0
-       self.finished = 0
-       
-       def sort(a, b, int=self.int):
-           """ this function is for sorting nodes relative to the ID we are looking for """
-           x, y = int ^ a.int, int ^ b.int
-           if x > y:
-               return 1
-           elif x < y:
-               return -1
-           return 0
-       self.sort = sort
-    
-    def goWithNodes(self, t):
-       pass
-       
-       
 
-FIND_NODE_TIMEOUT = 15
 
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, args):
-       l, sender = args
-       if self.finished or self.answered.has_key(sender['id']):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
-       for node in l:
-           if not self.found.has_key(node['id']):
-               n = Node(node['id'], node['host'], node['port'])
-               self.found[n.id] = n
-               self.table.insertNode(n)
-       self.schedule()
-               
-    def schedule(self):
-       """
-           send messages to new peers, if necessary
-       """
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if node.id == self.target:
-               self.finished=1
-               return self.callback([node])
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-               df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done!!
-           self.finished=1
-           reactor.callFromThread(self.callback, l[:K])
-       
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
-       
-    def goWithNodes(self, nodes):
-       """
-           this starts the process, our argument is a transaction with t.extras being our list of nodes
-           it's a transaction since we got called from the dispatcher
-       """
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
-
-
-GET_VALUE_TIMEOUT = 15
-class GetValue(FindNode):
-    """ get value task """
-    def handleGotNodes(self, args):
-       l, sender = args
-       l = l[0]
-       if self.finished or self.answered.has_key(sender['id']):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[sender['id']] = 1
-       # go through nodes
-       # if we have any closer than what we already got, query them
-       if l.has_key('nodes'):
-           for node in l['nodes']:
-               if not self.found.has_key(node['id']):
-                   n = Node(node['id'], node['host'], node['port'])
-                   self.found[n.id] = n
-                   self.table.insertNode(n)
-       elif l.has_key('values'):
-           ## done
-           self.finished = 1
-           return self.callback(l['values'])
-       self.schedule()
-               
-    ## get value
-    def schedule(self):
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-               df = node.getValue(node, self.target)
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done, didn't find it!!
-           self.finished=1
-           reactor.callFromThread(self.callback,[])
-    
-    ## get value
-    def goWithNodes(self, nodes):
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           reactor.callFromThread(self.callback, [])
 
 
 
-#------
+#------ testing
 
 def test_build_net(quiet=0):
     from whrandom import randrange
@@ -475,7 +313,7 @@ def test_find_value(l, quiet=0):
                if(len(values) == 0):
                    print "find                FAILED"
                else:
-                   if values[0]['value'] != val:
+                   if values != val:
                        print "find                FAILED"
                    else:
                        print "find                FOUND"
@@ -483,11 +321,10 @@ def test_find_value(l, quiet=0):
                f.set()
        return callback
     b.valueForKey(key, mc(fa))
-    c.valueForKey(key, mc(fb))
-    d.valueForKey(key, mc(fc))
-    
     fa.wait()
+    c.valueForKey(key, mc(fb))
     fb.wait()
+    d.valueForKey(key, mc(fc))    
     fc.wait()
     
 if __name__ == "__main__":