]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
only send the ID along in khashmir messages, don't send the host and
[quix0rs-apt-p2p.git] / actions.py
index f367c533f27e44463f8fe8aec52afc9f43d91e56..1afdeb146e0ef56f637ec77721cabac62b5c20c8 100644 (file)
@@ -1,9 +1,12 @@
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
 from time import time
 
 from const import reactor
 import const
 
-from hash import intify
+from khash import intify
 from knode import KNode as Node
 from ktable import KTable, K
 
@@ -40,10 +43,14 @@ FIND_NODE_TIMEOUT = 15
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
         l = dict["nodes"]
-        sender = dict["sender"]
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]        
+        sender['host'] = _krpc_sender[0]        
         sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -52,7 +59,7 @@ class FindNode(ActionBase):
         self.answered[sender.id] = 1
         for node in l:
             n = Node().initWithDict(node)
-            n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+            n.conn = self.table.udp.connectionForAddr((n.host, n.port))
             if not self.found.has_key(n.id):
                 self.found[n.id] = n
         self.schedule()
@@ -71,7 +78,7 @@ class FindNode(ActionBase):
                 return self.callback([node])
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.table.node.senderDict())
+                df = node.findNode(self.target, self.table.node.id)
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
@@ -85,6 +92,7 @@ class FindNode(ActionBase):
     
     def makeMsgFailed(self, node):
         def defaultGotNodes(err, self=self, node=node):
+            print ">>> find failed %s/%s" % (node.host, node.port)
             self.table.table.nodeFailed(node)
             self.outstanding = self.outstanding - 1
             self.schedule()
@@ -108,9 +116,13 @@ GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, dict):
-        sender = dict["sender"]
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]
+        sender['host'] = _krpc_sender[0]                
         sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -122,7 +134,7 @@ class GetValue(FindNode):
         if dict.has_key('nodes'):
             for node in dict['nodes']:
                 n = Node().initWithDict(node)
-                n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+                n.conn = self.table.udp.connectionForAddr((n.host, n.port))
                 if not self.found.has_key(n.id):
                     self.found[n.id] = n
         elif dict.has_key('values'):
@@ -132,6 +144,7 @@ class GetValue(FindNode):
                     return y
                 else:
                     return None
+            z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
             if(len(v)):
                 reactor.callFromThread(self.callback, v)
@@ -147,7 +160,7 @@ class GetValue(FindNode):
         for node in l[:K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                df = node.findValue(self.target, self.table.node.senderDict())
+                df = node.findValue(self.target, self.table.node.id)
                 df.addCallback(self.handleGotNodes)
                 df.addErrback(self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
@@ -195,6 +208,7 @@ class StoreValue(ActionBase):
                 self.schedule()
             
     def storeFailed(self, t, node):
+        print ">>> store failed %s/%s" % (node.host, node.port)
         self.table.nodeFailed(node)
         self.outstanding -= 1
         if self.finished:
@@ -217,7 +231,7 @@ class StoreValue(ActionBase):
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
-                    df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+                    df = node.storeValue(self.target, self.value, self.table.node.id)
                     df.addCallback(self.storedValue, node=node)
                     df.addErrback(self.storeFailed, node=node)
                     
@@ -241,4 +255,3 @@ class KeyExpirer:
         s = "delete from kv where time < '%s';" % self.cut
         c.execute(s)
         reactor.callLater(const.KE_DELAY, self.doExpire)
-    
\ No newline at end of file