]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - actions.py
AptPackages only takes a single cache directory.
[quix0rs-apt-p2p.git] / actions.py
index 945832970cf3a5467a50b3d1d4211cea3e7648c1..013a9a7544dcd79189efb010a0afe3cab9c430d8 100644 (file)
-from time import time
-from pickle import loads, dumps
-
-from bsddb3 import db
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
 
-from const import reactor
+from time import time
 
-from hash import intify
-from knode import KNode as Node
-from ktable import KTable, K
+from twisted.internet import reactor
 
-# concurrent FIND_NODE/VALUE requests!
-N = 3
+import const
+from khash import intify
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
     def __init__(self, table, target, callback):
-       self.table = table
-       self.target = target
-       self.int = intify(target)
-       self.found = {}
-       self.queried = {}
-       self.answered = {}
-       self.callback = callback
-       self.outstanding = 0
-       self.finished = 0
-       
-       def sort(a, b, int=self.int):
-           """ this function is for sorting nodes relative to the ID we are looking for """
-           x, y = int ^ a.int, int ^ b.int
-           if x > y:
-               return 1
-           elif x < y:
-               return -1
-           return 0
-       self.sort = sort
+        self.table = table
+        self.target = target
+        self.num = intify(target)
+        self.found = {}
+        self.queried = {}
+        self.answered = {}
+        self.callback = callback
+        self.outstanding = 0
+        self.finished = 0
     
+        def sort(a, b, num=self.num):
+            """ this function is for sorting nodes relative to the ID we are looking for """
+            x, y = num ^ a.num, num ^ b.num
+            if x > y:
+                return 1
+            elif x < y:
+                return -1
+            return 0
+        self.sort = sort
+        
     def goWithNodes(self, t):
-       pass
-       
-       
+        pass
+    
+    
 
 FIND_NODE_TIMEOUT = 15
 
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, args):
-       l, sender = args
-       sender = Node().initWithDict(sender)
-       if self.finished or self.answered.has_key(sender.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[sender.id] = 1
-       for node in l:
-           n = Node().initWithDict(node)
-           if not self.found.has_key(n.id):
-               self.found[n.id] = n
-               self.table.insertNode(n)
-       self.schedule()
-               
+    def handleGotNodes(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
+        l = dict["nodes"]
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]        
+        sender['host'] = _krpc_sender[0]        
+        sender = self.table.Node().initWithDict(sender)
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
+        self.table.table.insertNode(sender)
+        if self.finished or self.answered.has_key(sender.id):
+            # a day late and a dollar short
+            return
+        self.outstanding = self.outstanding - 1
+        self.answered[sender.id] = 1
+        for node in l:
+            n = self.table.Node().initWithDict(node)
+            n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+            if not self.found.has_key(n.id):
+                self.found[n.id] = n
+        self.schedule()
+        
     def schedule(self):
-       """
-           send messages to new peers, if necessary
-       """
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if node.id == self.target:
-               self.finished=1
-               return self.callback([node])
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-               df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done!!
-           self.finished=1
-           reactor.callFromThread(self.callback, l[:K])
-       
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
-       
+        """
+            send messages to new peers, if necessary
+        """
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        for node in l[:const.K]:
+            if node.id == self.target:
+                self.finished=1
+                return self.callback([node])
+            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+                df = node.findNode(self.target, self.table.node.id)
+                df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= const.CONCURRENT_REQS:
+                break
+        assert(self.outstanding) >=0
+        if self.outstanding == 0:
+            ## all done!!
+            self.finished=1
+            reactor.callLater(0, self.callback, l[:const.K])
+    
+    def makeMsgFailed(self, node):
+        def defaultGotNodes(err, self=self, node=node):
+            print ">>> find failed %s/%s" % (node.host, node.port), err
+            self.table.table.nodeFailed(node)
+            self.outstanding = self.outstanding - 1
+            self.schedule()
+        return defaultGotNodes
+    
     def goWithNodes(self, nodes):
-       """
-           this starts the process, our argument is a transaction with t.extras being our list of nodes
-           it's a transaction since we got called from the dispatcher
-       """
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
-
+        """
+            this starts the process, our argument is a transaction with t.extras being our list of nodes
+            it's a transaction since we got called from the dispatcher
+        """
+        for node in nodes:
+            if node.id == self.table.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        
+        self.schedule()
+    
 
-GET_VALUE_TIMEOUT = 15
+get_value_timeout = 15
 class GetValue(FindNode):
+    def __init__(self, table, target, callback, find="findValue"):
+        FindNode.__init__(self, table, target, callback)
+        self.findValue = find
+            
     """ get value task """
-    def handleGotNodes(self, args):
-       l, sender = args
-       sender = Node().initWithDict(sender)
-       if self.finished or self.answered.has_key(sender.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[sender.id] = 1
-       # go through nodes
-       # if we have any closer than what we already got, query them
-       if l.has_key('nodes'):
-           for node in l['nodes']:
-               n = Node().initWithDict(node)
-               if not self.found.has_key(n.id):
-                   self.found[n.id] = n
-                   self.table.insertNode(n)
-       elif l.has_key('values'):
-           def x(y, z=self.results):
-               y = y.data
-               if not z.has_key(y):
-                   z[y] = 1
-                   return y
-           v = filter(None, map(x, l['values']))
-           if(len(v)):
-               reactor.callFromThread(self.callback, v)
-       self.schedule()
-               
+    def handleGotNodes(self, dict):
+        _krpc_sender = dict['_krpc_sender']
+        dict = dict['rsp']
+        sender = {'id' : dict["id"]}
+        sender['port'] = _krpc_sender[1]
+        sender['host'] = _krpc_sender[0]                
+        sender = self.table.Node().initWithDict(sender)
+        sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
+        self.table.table.insertNode(sender)
+        if self.finished or self.answered.has_key(sender.id):
+            # a day late and a dollar short
+            return
+        self.outstanding = self.outstanding - 1
+        self.answered[sender.id] = 1
+        # go through nodes
+        # if we have any closer than what we already got, query them
+        if dict.has_key('nodes'):
+            for node in dict['nodes']:
+                n = self.table.Node().initWithDict(node)
+                n.conn = self.table.udp.connectionForAddr((n.host, n.port))
+                if not self.found.has_key(n.id):
+                    self.found[n.id] = n
+        elif dict.has_key('values'):
+            def x(y, z=self.results):
+                if not z.has_key(y):
+                    z[y] = 1
+                    return y
+                else:
+                    return None
+            z = len(dict['values'])
+            v = filter(None, map(x, dict['values']))
+            if(len(v)):
+                reactor.callLater(0, self.callback, v)
+        self.schedule()
+        
     ## get value
     def schedule(self):
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        
+        for node in l[:const.K]:
+            if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+                try:
+                    f = getattr(node, self.findValue)
+                except AttributeError:
+                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+                else:
+                    df = f(self.target, self.table.node.id)
+                    df.addCallback(self.handleGotNodes)
+                    df.addErrback(self.makeMsgFailed(node))
+                    self.outstanding = self.outstanding + 1
+                    self.queried[node.id] = 1
+            if self.outstanding >= const.CONCURRENT_REQS:
+                break
+        assert(self.outstanding) >=0
+        if self.outstanding == 0:
+            ## all done, didn't find it!!
+            self.finished=1
+            reactor.callLater(0, self.callback,[])
 
-       for node in l[:K]:
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-               df = node.findValue(self.target, self.table.node.senderDict())
-               df.addCallback(self.handleGotNodes)
-               df.addErrback(self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done, didn't find it!!
-           self.finished=1
-           reactor.callFromThread(self.callback,[])
-    
     ## get value
     def goWithNodes(self, nodes, found=None):
-       self.results = {}
-       if found:
-           for n in found:
-               self.results[n] = 1
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-           df = node.findValue(self.target, self.table.node.senderDict())
-           df.addCallback(self.handleGotNodes)
-           df.addErrback(self.defaultGotNodes)
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           reactor.callFromThread(self.callback, [])
+        self.results = {}
+        if found:
+            for n in found:
+                self.results[n] = 1
+        for node in nodes:
+            if node.id == self.table.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+            
+        self.schedule()
 
 
-KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
-KE_DELAY = 60 * 60 # 1 hour
-KE_AGE = KEINITIAL_DELAY
+class StoreValue(ActionBase):
+    def __init__(self, table, target, value, callback, store="storeValue"):
+        ActionBase.__init__(self, table, target, callback)
+        self.value = value
+        self.stored = []
+        self.store = store
+        
+    def storedValue(self, t, node):
+        self.outstanding -= 1
+        self.table.insertNode(node)
+        if self.finished:
+            return
+        self.stored.append(t)
+        if len(self.stored) >= const.STORE_REDUNDANCY:
+            self.finished=1
+            self.callback(self.stored)
+        else:
+            if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+                self.schedule()
+        return t
+    
+    def storeFailed(self, t, node):
+        print ">>> store failed %s/%s" % (node.host, node.port)
+        self.table.nodeFailed(node)
+        self.outstanding -= 1
+        if self.finished:
+            return t
+        self.schedule()
+        return t
+    
+    def schedule(self):
+        if self.finished:
+            return
+        num = const.CONCURRENT_REQS - self.outstanding
+        if num > const.STORE_REDUNDANCY:
+            num = const.STORE_REDUNDANCY
+        for i in range(num):
+            try:
+                node = self.nodes.pop()
+            except IndexError:
+                if self.outstanding == 0:
+                    self.finished = 1
+                    self.callback(self.stored)
+            else:
+                if not node.id == self.table.node.id:
+                    self.outstanding += 1
+                    try:
+                        f = getattr(node, self.store)
+                    except AttributeError:
+                        print ">>> %s doesn't have a %s method!" % (node, self.store)
+                    else:
+                        df = f(self.target, self.value, self.table.node.id)
+                        df.addCallback(self.storedValue, node=node)
+                        df.addErrback(self.storeFailed, node=node)
+                    
+    def goWithNodes(self, nodes):
+        self.nodes = nodes
+        self.nodes.sort(self.sort)
+        self.schedule()
+
 
 class KeyExpirer:
-    def __init__(self, store, itime, kw):
-       self.store = store
-       self.itime = itime
-       self.kw = kw
-       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
-       
+    def __init__(self, store):
+        self.store = store
+        reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+    
     def doExpire(self):
-       self.cut = `time() - KE_AGE`
-       self._expire()
-       
+        self.cut = "%0.6f" % (time() - const.KE_AGE)
+        self._expire()
+    
     def _expire(self):
-       ic = self.itime.cursor()
-       sc = self.store.cursor()
-       kc = self.kw.cursor()
-       irec = None
-       try:
-           irec = ic.set_range(self.cut)
-       except db.DBNotFoundError:
-           # everything is expired
-           f = ic.prev
-           irec = f()
-       else:
-           f = ic.next
-       i = 0
-       while irec:
-           it, h = irec
-           try:
-               k, v, lt = loads(self.store[h])
-           except KeyError:
-               ic.delete()
-           else:
-               if lt < self.cut:
-                   try:
-                       kc.set_both(k, h)
-                   except db.DBNotFoundError:
-                       print "Database inconsistency!  No key->value entry when a store entry was found!"
-                   else:
-                       kc.delete()
-                   self.store.delete(h)
-                   ic.delete()
-                   i = i + 1
-               else:
-                   break
-           irec = f()
-           
-       reactor.callLater(KE_DELAY, self.doExpire)
-       if(i > 0):
-           print ">>>KE: done expiring %d" % i
-       
\ No newline at end of file
+        c = self.store.cursor()
+        s = "delete from kv where time < '%s';" % self.cut
+        c.execute(s)
+        reactor.callLater(const.KE_DELAY, self.doExpire)