]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
Add tracking of index file hashes from Release files.
[quix0rs-apt-p2p.git] / actions.py
index f5d3a9fca6d1b7c12ed01ffa85c945791175d3aa..013a9a7544dcd79189efb010a0afe3cab9c430d8 100644 (file)
@@ -1,11 +1,12 @@
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
 from time import time
 
-from const import reactor
-import const
+from twisted.internet import reactor
 
-from hash import intify
-from knode import KNode as Node
-from ktable import KTable, K
+import const
+from khash import intify
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
@@ -43,10 +44,11 @@ class FindNode(ActionBase):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         l = dict["nodes"]
-        sender = dict["sender"]
+        sender = {'id' : dict["id"]}
         sender['port'] = _krpc_sender[1]        
-        sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender['host'] = _krpc_sender[0]        
+        sender = self.table.Node().initWithDict(sender)
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -54,8 +56,8 @@ class FindNode(ActionBase):
         self.outstanding = self.outstanding - 1
         self.answered[sender.id] = 1
         for node in l:
-            n = Node().initWithDict(node)
-            n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+            n = self.table.Node().initWithDict(node)
+            n.conn = self.table.udp.connectionForAddr((n.host, n.port))
             if not self.found.has_key(n.id):
                 self.found[n.id] = n
         self.schedule()
@@ -68,13 +70,13 @@ class FindNode(ActionBase):
             return
         l = self.found.values()
         l.sort(self.sort)
-        for node in l[:K]:
+        for node in l[:const.K]:
             if node.id == self.target:
                 self.finished=1
                 return self.callback([node])
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.table.node.senderDict())
+                df = node.findNode(self.target, self.table.node.id)
                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                 self.outstanding = self.outstanding + 1
                 self.queried[node.id] = 1
@@ -84,11 +86,11 @@ class FindNode(ActionBase):
         if self.outstanding == 0:
             ## all done!!
             self.finished=1
-            reactor.callFromThread(self.callback, l[:K])
+            reactor.callLater(0, self.callback, l[:const.K])
     
     def makeMsgFailed(self, node):
         def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed %s/%s" % (node.host, node.port)
+            print ">>> find failed %s/%s" % (node.host, node.port), err
             self.table.table.nodeFailed(node)
             self.outstanding = self.outstanding - 1
             self.schedule()
@@ -108,16 +110,21 @@ class FindNode(ActionBase):
         self.schedule()
     
 
-GET_VALUE_TIMEOUT = 15
+get_value_timeout = 15
 class GetValue(FindNode):
+    def __init__(self, table, target, callback, find="findValue"):
+        FindNode.__init__(self, table, target, callback)
+        self.findValue = find
+            
     """ get value task """
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
-        sender = dict["sender"]
-        sender['port'] = _krpc_sender[1]        
-        sender = Node().initWithDict(sender)
-        sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]
+        sender['host'] = _krpc_sender[0]                
+        sender = self.table.Node().initWithDict(sender)
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
             # a day late and a dollar short
@@ -128,8 +135,8 @@ class GetValue(FindNode):
         # if we have any closer than what we already got, query them
         if dict.has_key('nodes'):
             for node in dict['nodes']:
-                n = Node().initWithDict(node)
-                n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+                n = self.table.Node().initWithDict(node)
+                n.conn = self.table.udp.connectionForAddr((n.host, n.port))
                 if not self.found.has_key(n.id):
                     self.found[n.id] = n
         elif dict.has_key('values'):
@@ -142,7 +149,7 @@ class GetValue(FindNode):
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
             if(len(v)):
-                reactor.callFromThread(self.callback, v)
+                reactor.callLater(0, self.callback, v)
         self.schedule()
         
     ## get value
@@ -152,21 +159,26 @@ class GetValue(FindNode):
         l = self.found.values()
         l.sort(self.sort)
         
-        for node in l[:K]:
+        for node in l[:const.K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                df = node.findValue(self.target, self.table.node.senderDict())
-                df.addCallback(self.handleGotNodes)
-                df.addErrback(self.makeMsgFailed(node))
-                self.outstanding = self.outstanding + 1
-                self.queried[node.id] = 1
+                try:
+                    f = getattr(node, self.findValue)
+                except AttributeError:
+                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+                else:
+                    df = f(self.target, self.table.node.id)
+                    df.addCallback(self.handleGotNodes)
+                    df.addErrback(self.makeMsgFailed(node))
+                    self.outstanding = self.outstanding + 1
+                    self.queried[node.id] = 1
             if self.outstanding >= const.CONCURRENT_REQS:
                 break
         assert(self.outstanding) >=0
         if self.outstanding == 0:
             ## all done, didn't find it!!
             self.finished=1
-            reactor.callFromThread(self.callback,[])
+            reactor.callLater(0, self.callback,[])
 
     ## get value
     def goWithNodes(self, nodes, found=None):
@@ -184,11 +196,12 @@ class GetValue(FindNode):
 
 
 class StoreValue(ActionBase):
-    def __init__(self, table, target, value, callback):
+    def __init__(self, table, target, value, callback, store="storeValue"):
         ActionBase.__init__(self, table, target, callback)
         self.value = value
         self.stored = []
-    
+        self.store = store
+        
     def storedValue(self, t, node):
         self.outstanding -= 1
         self.table.insertNode(node)
@@ -201,15 +214,17 @@ class StoreValue(ActionBase):
         else:
             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
                 self.schedule()
-            
+        return t
+    
     def storeFailed(self, t, node):
         print ">>> store failed %s/%s" % (node.host, node.port)
         self.table.nodeFailed(node)
         self.outstanding -= 1
         if self.finished:
-            return
+            return t
         self.schedule()
-        
+        return t
+    
     def schedule(self):
         if self.finished:
             return
@@ -226,9 +241,14 @@ class StoreValue(ActionBase):
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
-                    df = node.storeValue(self.target, self.value, self.table.node.senderDict())
-                    df.addCallback(self.storedValue, node=node)
-                    df.addErrback(self.storeFailed, node=node)
+                    try:
+                        f = getattr(node, self.store)
+                    except AttributeError:
+                        print ">>> %s doesn't have a %s method!" % (node, self.store)
+                    else:
+                        df = f(self.target, self.value, self.table.node.id)
+                        df.addCallback(self.storedValue, node=node)
+                        df.addErrback(self.storeFailed, node=node)
                     
     def goWithNodes(self, nodes):
         self.nodes = nodes