More strict use of errbacks when using deferreds.
authorCameron Dale <camrdale@gmail.com>
Mon, 14 Apr 2008 21:35:12 +0000 (14:35 -0700)
committerCameron Dale <camrdale@gmail.com>
Mon, 14 Apr 2008 21:35:12 +0000 (14:35 -0700)
Now almost all calls to addCallback include adding an errback as well.
HTTPDownloader will remove the next request on a connection error.
HTTPServer will download the whole file after a rendering error.
A failed check of the freshness of a file will cause the whole file to be downloaded.
A resolving error in a DHT join behaves as if the bootstrap node was unreachable.
Removed the errback args to khashmir's find routines as they were never used.

apt_p2p/HTTPDownloader.py
apt_p2p/HTTPServer.py
apt_p2p/apt_p2p.py
apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/khashmir.py

index 057b5a2befdc6783d1c2e295d8c6b9024c782231..13cb237320d24b9ce2c87b6e8940556592701af2 100644 (file)
@@ -56,7 +56,7 @@ class Peer(ClientFactory):
         assert self.closed and not self.connecting
         self.connecting = True
         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
-        d.addCallback(self.connected)
+        d.addCallbacks(self.connected, self.connectionError)
 
     def connected(self, proto):
         """Begin processing the queued requests."""
@@ -65,6 +65,23 @@ class Peer(ClientFactory):
         self.proto = proto
         self.processQueue()
         
+    def connectionError(self, err):
+        """Cancel the requests."""
+        log.msg('Failed to connect to the peer by HTTP.')
+        log.err(err)
+
+        # Remove one request so that we don't loop indefinitely
+        if self.request_queue:
+            req = self.request_queue.pop(0)
+            req.deferRequest.errback(err)
+            
+        self._completed += 1
+        self._errors += 1
+        self.rerank()
+        if self.connecting:
+            self.connecting = False
+            self.clientGone(None)
+        
     def close(self):
         """Close the connection to the peer."""
         if not self.closed:
index d6c12bf3ce402a949700b53887a4fd94c5a06d2a..ee314ac66710e54424069e207fa41b98f2a088e9 100644 (file)
@@ -34,7 +34,8 @@ class FileDownloader(static.File):
         log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
         resp = super(FileDownloader, self).renderHTTP(req)
         if isinstance(resp, defer.Deferred):
-            resp.addCallback(self._renderHTTP_done, req)
+            resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
+                              callbackArgs = (req, ), errbackArgs = (req, ))
         else:
             resp = self._renderHTTP_done(resp, req)
         return resp
@@ -52,6 +53,16 @@ class FileDownloader(static.File):
         
         return resp
 
+    def _renderHTTP_error(self, err, req):
+        log.msg('Failed to render %s: %r' % (req.uri, err))
+        log.err(err)
+        
+        if self.manager:
+            path = 'http:/' + req.uri
+            return self.manager.get_resp(req, path)
+        
+        return err
+
     def createSimilarFile(self, path):
         return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
                               self.processors, self.indexNames[:])
index 671e2269aea9d5713c020a4dc3a514aa20ddd894..489da5ec661999d447d2f1b4552fc3f52256e52a 100644 (file)
@@ -157,7 +157,8 @@ class AptP2P:
         """
         log.msg('Checking if %s is still fresh' % url)
         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
-        d.addCallback(self.check_freshness_done, req, url, resp)
+        d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
+                       callbackArgs = (req, url, resp), errbackArgs = (req, url))
         return d
     
     def check_freshness_done(self, resp, req, url, orig_resp):
@@ -178,6 +179,17 @@ class AptP2P:
             log.msg('Stale, need to redownload: %s' % url)
             return self.get_resp(req, url)
     
+    def check_freshness_error(self, err, req, url):
+        """Mirror request failed, continue with download.
+        
+        @param err: the response from the mirror to the HEAD request
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        """
+        log.err(err)
+        return self.get_resp(req, url)
+    
     def get_resp(self, req, url):
         """Lookup a hash for the file in the local mirror info.
         
index f11eefc8305f87f12c10a85b5703d942ee3ed08e..f773482695428b03e044153e3b2e2b3e8a6be8e1 100644 (file)
@@ -146,23 +146,35 @@ class DHT:
         if not self.khashmir:
             self.khashmir = Khashmir(self.config, self.cache_dir)
 
+        self.outstandingJoins = 0
         for node in self.bootstrap:
             host, port = node.rsplit(':', 1)
             port = int(port)
+            self.outstandingJoins += 1
             
             # Translate host names into IP addresses
             if isIPAddress(host):
                 self._join_gotIP(host, port)
             else:
-                reactor.resolve(host).addCallback(self._join_gotIP, port)
+                reactor.resolve(host).addCallbacks(self._join_gotIP,
+                                                   self._join_resolveFailed,
+                                                   callbackArgs = (port, ),
+                                                   errbackArgs = (host, port))
         
         return self.joining
 
     def _join_gotIP(self, ip, port):
         """Join the DHT using a single bootstrap nodes IP address."""
-        self.outstandingJoins += 1
         self.khashmir.addContact(ip, port, self._join_single, self._join_error)
     
+    def _join_resolveFailed(self, err, host, port):
+        """Failed to lookup the IP address of the bootstrap node."""
+        log.msg('Failed to find an IP address for host: (%r, %r)' % (host, port))
+        log.err(err)
+        self.outstandingJoins -= 1
+        if self.outstandingJoins <= 0:
+            self.khashmir.findCloseNodes(self._join_complete)
+    
     def _join_single(self, addr):
         """Process the response from the bootstrap node.
         
@@ -172,7 +184,7 @@ class DHT:
         if addr:
             self.foundAddrs.append(addr)
         if addr or self.outstandingJoins <= 0:
-            self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+            self.khashmir.findCloseNodes(self._join_complete)
         log.msg('Got back from bootstrap node: %r' % (addr,))
     
     def _join_error(self, failure = None):
@@ -184,11 +196,11 @@ class DHT:
         self.outstandingJoins -= 1
         log.msg("bootstrap node could not be reached")
         if self.outstandingJoins <= 0:
-            self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+            self.khashmir.findCloseNodes(self._join_complete)
 
     def _join_complete(self, result):
         """End the joining process and return the addresses found for this node."""
-        if not self.joined and len(result) > 1:
+        if not self.joined and isinstance(result, list) and len(result) > 1:
             self.joined = True
         if self.joining and self.outstandingJoins <= 0:
             df = self.joining
index 7946523da0f266d3efe18aa5b55edc52474e5cc2..8860c6719491e67c6267a1e483267587bb5dc3d2 100644 (file)
@@ -162,7 +162,7 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(NULL_ID, host, port)
         self.sendJoin(n, callback=callback, errback=errback)
 
-    def findNode(self, id, callback, errback=None):
+    def findNode(self, id, callback):
         """Find the contact info for the K closest nodes in the global table.
         
         @type id: C{string}
@@ -170,21 +170,12 @@ class KhashmirBase(protocol.Factory):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         # Start with our node
         nodes = [copy(self.node)]
 
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
         # Start the finding nodes action
-        state = FindNode(self, id, d.callback, self.config, self.stats)
+        state = FindNode(self, id, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
@@ -258,7 +249,7 @@ class KhashmirBase(protocol.Factory):
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None, errback = None):
+    def findCloseNodes(self, callback=lambda a: None):
         """Perform a findNode on the ID one away from our own.
 
         This will allow us to populate our table with nodes on our network
@@ -269,12 +260,9 @@ class KhashmirBase(protocol.Factory):
         @param callback: the method to call with the results, it must take 1
             parameter, the list of K closest nodes
             (optional, defaults to doing nothing with the results)
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback, errback)
+        self.findNode(id, callback)
 
     def refreshTable(self, force = False):
         """Check all the buckets for those that need refreshing.
@@ -364,7 +352,7 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     #{ Local interface
-    def findValue(self, key, callback, errback=None):
+    def findValue(self, key, callback):
         """Get the nodes that have values for the key from the global table.
         
         @type key: C{string}
@@ -372,21 +360,12 @@ class KhashmirRead(KhashmirBase):
         @type callback: C{method}
         @param callback: the method to call with the results, it must take 1
             parameter, the list of nodes with values
-        @type errback: C{method}
-        @param errback: the method to call if an error occurs
-            (optional, defaults to doing nothing when an error occurs)
         """
         # Start with ourself
         nodes = [copy(self.node)]
         
-        d = Deferred()
-        if errback:
-            d.addCallbacks(callback, errback)
-        else:
-            d.addCallback(callback)
-
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config, self.stats)
+        state = FindValue(self, key, callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):