]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/actions.py
Return a token in find_node responses, use it in store_value requests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
index dc743de2e7658bcb2e598bc3ccb503964b702c6c..6766cd9c6169dd8301cd6ff6c052d663175b0541 100644 (file)
@@ -1,16 +1,16 @@
 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
-from time import time
-
 from twisted.internet import reactor
+from twisted.python import log
 
 from khash import intify
+from util import uncompact
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, table, target, callback, config):
-        self.table = table
+    def __init__(self, caller, target, callback, config):
+        self.caller = caller
         self.target = target
         self.config = config
         self.num = intify(target)
@@ -43,21 +43,19 @@ class FindNode(ActionBase):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
+        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
+        self.caller.insertNode(n)
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
         l = dict["nodes"]
-        sender = {'id' : dict["id"]}
-        sender['port'] = _krpc_sender[1]        
-        sender['host'] = _krpc_sender[0]        
-        sender = self.table.Node().initWithDict(sender)
-        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
-        self.table.table.insertNode(sender)
-        if self.finished or self.answered.has_key(sender.id):
+        if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
-        self.answered[sender.id] = 1
-        for node in l:
-            n = self.table.Node().initWithDict(node)
-            n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+        self.answered[dict["id"]] = 1
+        for compact_node in l:
+            node = uncompact(compact_node)
+            n = self.caller.Node(node)
             if not self.found.has_key(n.id):
                 self.found[n.id] = n
         self.schedule()
@@ -74,15 +72,15 @@ class FindNode(ActionBase):
             if node.id == self.target:
                 self.finished=1
                 return self.callback([node])
-            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.table.node.id)
+                df = node.findNode(self.target, self.caller.node.id)
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
             if self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
-        assert(self.outstanding) >=0
+        assert self.outstanding >=0
         if self.outstanding == 0:
             ## all done!!
             self.finished=1
@@ -90,8 +88,9 @@ class FindNode(ActionBase):
     
     def makeMsgFailed(self, node):
         def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed %s/%s" % (node.host, node.port), err
-            self.table.table.nodeFailed(node)
+            log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
+            log.err(err)
+            self.caller.table.nodeFailed(node)
             self.outstanding = self.outstanding - 1
             self.schedule()
         return defaultGotNodes
@@ -102,7 +101,7 @@ class FindNode(ActionBase):
             it's a transaction since we got called from the dispatcher
         """
         for node in nodes:
-            if node.id == self.table.node.id:
+            if node.id == self.caller.node.id:
                 continue
             else:
                 self.found[node.id] = node
@@ -112,31 +111,27 @@ class FindNode(ActionBase):
 
 get_value_timeout = 15
 class GetValue(FindNode):
-    def __init__(self, table, target, callback, config, find="findValue"):
-        FindNode.__init__(self, table, target, callback, config)
+    def __init__(self, caller, target, callback, config, find="findValue"):
+        FindNode.__init__(self, caller, target, callback, config)
         self.findValue = find
             
     """ get value task """
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
-        sender = {'id' : dict["id"]}
-        sender['port'] = _krpc_sender[1]
-        sender['host'] = _krpc_sender[0]                
-        sender = self.table.Node().initWithDict(sender)
-        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
-        self.table.table.insertNode(sender)
-        if self.finished or self.answered.has_key(sender.id):
+        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
+        self.caller.insertNode(n)
+        if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
-        self.answered[sender.id] = 1
+        self.answered[dict["id"]] = 1
         # go through nodes
         # if we have any closer than what we already got, query them
         if dict.has_key('nodes'):
-            for node in dict['nodes']:
-                n = self.table.Node().initWithDict(node)
-                n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+            for compact_node in dict['nodes']:
+                node = uncompact(compact_node)
+                n = self.caller.Node(node)
                 if not self.found.has_key(n.id):
                     self.found[n.id] = n
         elif dict.has_key('values'):
@@ -149,7 +144,7 @@ class GetValue(FindNode):
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
             if(len(v)):
-                reactor.callLater(0, self.callback, v)
+                reactor.callLater(0, self.callback, self.target, v)
         self.schedule()
         
     ## get value
@@ -160,25 +155,25 @@ class GetValue(FindNode):
         l.sort(self.sort)
         
         for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
                 try:
                     f = getattr(node, self.findValue)
                 except AttributeError:
-                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+                    log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
                 else:
-                    df = f(self.target, self.table.node.id)
+                    df = f(self.target, self.caller.node.id)
                     df.addCallback(self.handleGotNodes)
                     df.addErrback(self.makeMsgFailed(node))
                     self.outstanding = self.outstanding + 1
                     self.queried[node.id] = 1
             if self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
-        assert(self.outstanding) >=0
+        assert self.outstanding >=0
         if self.outstanding == 0:
             ## all done, didn't find it!!
             self.finished=1
-            reactor.callLater(0, self.callback,[])
+            reactor.callLater(0, self.callback, self.target, [])
 
     ## get value
     def goWithNodes(self, nodes, found=None):
@@ -187,7 +182,7 @@ class GetValue(FindNode):
             for n in found:
                 self.results[n] = 1
         for node in nodes:
-            if node.id == self.table.node.id:
+            if node.id == self.caller.node.id:
                 continue
             else:
                 self.found[node.id] = node
@@ -196,29 +191,29 @@ class GetValue(FindNode):
 
 
 class StoreValue(ActionBase):
-    def __init__(self, table, target, value, callback, config, store="storeValue"):
-        ActionBase.__init__(self, table, target, callback, config)
+    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+        ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         self.stored = []
         self.store = store
         
     def storedValue(self, t, node):
         self.outstanding -= 1
-        self.table.insertNode(node)
+        self.caller.insertNode(node)
         if self.finished:
             return
         self.stored.append(t)
         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
             self.finished=1
-            self.callback(self.stored)
+            self.callback(self.target, self.value, self.stored)
         else:
             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
                 self.schedule()
         return t
     
     def storeFailed(self, t, node):
-        print ">>> store failed %s/%s" % (node.host, node.port)
-        self.table.nodeFailed(node)
+        log.msg("store failed %s/%s" % (node.host, node.port))
+        self.caller.nodeFailed(node)
         self.outstanding -= 1
         if self.finished:
             return t
@@ -237,16 +232,16 @@ class StoreValue(ActionBase):
             except IndexError:
                 if self.outstanding == 0:
                     self.finished = 1
-                    self.callback(self.stored)
+                    self.callback(self.target, self.value, self.stored)
             else:
-                if not node.id == self.table.node.id:
+                if not node.id == self.caller.node.id:
                     self.outstanding += 1
                     try:
                         f = getattr(node, self.store)
                     except AttributeError:
-                        print ">>> %s doesn't have a %s method!" % (node, self.store)
+                        log.msg("%s doesn't have a %s method!" % (node, self.store))
                     else:
-                        df = f(self.target, self.value, self.table.node.id)
+                        df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallback(self.storedValue, node=node)
                         df.addErrback(self.storeFailed, node=node)
                     
@@ -260,14 +255,14 @@ class KeyExpirer:
     def __init__(self, store, config):
         self.store = store
         self.config = config
-        reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
+        self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
     
     def doExpire(self):
-        self.cut = "%0.6f" % (time() - self.config['KE_AGE'])
-        self._expire()
-    
-    def _expire(self):
-        c = self.store.cursor()
-        s = "delete from kv where time < '%s';" % self.cut
-        c.execute(s)
-        reactor.callLater(self.config['KE_DELAY'], self.doExpire)
+        self.store.expireValues(self.config['KE_AGE'])
+        self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
+        
+    def shutdown(self):
+        try:
+            self.next_expire.cancel()
+        except:
+            pass