From: Cameron Dale Date: Mon, 14 Apr 2008 21:35:12 +0000 (-0700) Subject: More strict use of errbacks when using deferreds. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=7a84d9fb17076695aba3c0f5a32c6487bdd3f059 More strict use of errbacks when using deferreds. Now almost all calls to addCallback include adding an errback as well. HTTPDownloader will remove the next request on a connection error. HTTPServer will download the whole file after a rendering error. A failed check of the freshness of a file will cause the whole file to be downloaded. A resolving error in a DHT join behaves as if the bootstrap node was unreachable. Removed the errback args to khashmir's find routines as they were never used. --- diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 057b5a2..13cb237 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -56,7 +56,7 @@ class Peer(ClientFactory): assert self.closed and not self.connecting self.connecting = True d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" @@ -65,6 +65,23 @@ class Peer(ClientFactory): self.proto = proto self.processQueue() + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req = self.request_queue.pop(0) + req.deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) + def close(self): """Close the connection to the peer.""" if not self.closed: diff --git a/apt_p2p/HTTPServer.py b/apt_p2p/HTTPServer.py index d6c12bf..ee314ac 100644 --- a/apt_p2p/HTTPServer.py +++ b/apt_p2p/HTTPServer.py @@ -34,7 +34,8 @@ class FileDownloader(static.File): log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr)) resp = super(FileDownloader, self).renderHTTP(req) if isinstance(resp, defer.Deferred): - resp.addCallback(self._renderHTTP_done, req) + resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error, + callbackArgs = (req, ), errbackArgs = (req, )) else: resp = self._renderHTTP_done(resp, req) return resp @@ -52,6 +53,16 @@ class FileDownloader(static.File): return resp + def _renderHTTP_error(self, err, req): + log.msg('Failed to render %s: %r' % (req.uri, err)) + log.err(err) + + if self.manager: + path = 'http:/' + req.uri + return self.manager.get_resp(req, path) + + return err + def createSimilarFile(self, path): return self.__class__(path, self.manager, self.defaultType, self.ignoredExts, self.processors, self.indexNames[:]) diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 671e226..489da5e 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -157,7 +157,8 @@ class AptP2P: """ log.msg('Checking if %s is still fresh' % url) d = self.peers.get('', url, method = "HEAD", modtime = modtime) - d.addCallback(self.check_freshness_done, req, url, resp) + d.addCallbacks(self.check_freshness_done, self.check_freshness_error, + callbackArgs = (req, url, resp), errbackArgs = (req, url)) return d def check_freshness_done(self, resp, req, url, orig_resp): @@ -178,6 +179,17 @@ class AptP2P: log.msg('Stale, need to redownload: %s' % url) return self.get_resp(req, url) + def check_freshness_error(self, err, req, url): + """Mirror request failed, continue with download. + + @param err: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + """ + log.err(err) + return self.get_resp(req, url) + def get_resp(self, req, url): """Lookup a hash for the file in the local mirror info. diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index f11eefc..f773482 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -146,23 +146,35 @@ class DHT: if not self.khashmir: self.khashmir = Khashmir(self.config, self.cache_dir) + self.outstandingJoins = 0 for node in self.bootstrap: host, port = node.rsplit(':', 1) port = int(port) + self.outstandingJoins += 1 # Translate host names into IP addresses if isIPAddress(host): self._join_gotIP(host, port) else: - reactor.resolve(host).addCallback(self._join_gotIP, port) + reactor.resolve(host).addCallbacks(self._join_gotIP, + self._join_resolveFailed, + callbackArgs = (port, ), + errbackArgs = (host, port)) return self.joining def _join_gotIP(self, ip, port): """Join the DHT using a single bootstrap nodes IP address.""" - self.outstandingJoins += 1 self.khashmir.addContact(ip, port, self._join_single, self._join_error) + def _join_resolveFailed(self, err, host, port): + """Failed to lookup the IP address of the bootstrap node.""" + log.msg('Failed to find an IP address for host: (%r, %r)' % (host, port)) + log.err(err) + self.outstandingJoins -= 1 + if self.outstandingJoins <= 0: + self.khashmir.findCloseNodes(self._join_complete) + def _join_single(self, addr): """Process the response from the bootstrap node. @@ -172,7 +184,7 @@ class DHT: if addr: self.foundAddrs.append(addr) if addr or self.outstandingJoins <= 0: - self.khashmir.findCloseNodes(self._join_complete, self._join_complete) + self.khashmir.findCloseNodes(self._join_complete) log.msg('Got back from bootstrap node: %r' % (addr,)) def _join_error(self, failure = None): @@ -184,11 +196,11 @@ class DHT: self.outstandingJoins -= 1 log.msg("bootstrap node could not be reached") if self.outstandingJoins <= 0: - self.khashmir.findCloseNodes(self._join_complete, self._join_complete) + self.khashmir.findCloseNodes(self._join_complete) def _join_complete(self, result): """End the joining process and return the addresses found for this node.""" - if not self.joined and len(result) > 1: + if not self.joined and isinstance(result, list) and len(result) > 1: self.joined = True if self.joining and self.outstandingJoins <= 0: df = self.joining diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 7946523..8860c67 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -162,7 +162,7 @@ class KhashmirBase(protocol.Factory): n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) - def findNode(self, id, callback, errback=None): + def findNode(self, id, callback): """Find the contact info for the K closest nodes in the global table. @type id: C{string} @@ -170,21 +170,12 @@ class KhashmirBase(protocol.Factory): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ # Start with our node nodes = [copy(self.node)] - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - # Start the finding nodes action - state = FindNode(self, id, d.callback, self.config, self.stats) + state = FindNode(self, id, callback, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, node, contacted = True): @@ -258,7 +249,7 @@ class KhashmirBase(protocol.Factory): df = node.join(self.node.id) df.addCallbacks(_pongHandler, _defaultPong) - def findCloseNodes(self, callback=lambda a: None, errback = None): + def findCloseNodes(self, callback=lambda a: None): """Perform a findNode on the ID one away from our own. This will allow us to populate our table with nodes on our network @@ -269,12 +260,9 @@ class KhashmirBase(protocol.Factory): @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes (optional, defaults to doing nothing with the results) - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback, errback) + self.findNode(id, callback) def refreshTable(self, force = False): """Check all the buckets for those that need refreshing. @@ -364,7 +352,7 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead #{ Local interface - def findValue(self, key, callback, errback=None): + def findValue(self, key, callback): """Get the nodes that have values for the key from the global table. @type key: C{string} @@ -372,21 +360,12 @@ class KhashmirRead(KhashmirBase): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of nodes with values - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ # Start with ourself nodes = [copy(self.node)] - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - # Search for others starting with the locally found ones - state = FindValue(self, key, d.callback, self.config, self.stats) + state = FindValue(self, key, callback, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) def valueForKey(self, key, callback, searchlocal = True):