Now almost all calls to addCallback include adding an errback as well.
HTTPDownloader will remove the next request on a connection error.
HTTPServer will download the whole file after a rendering error.
A failed check of the freshness of a file will cause the whole file to be downloaded.
A resolving error in a DHT join behaves as if the bootstrap node was unreachable.
Removed the errback args to khashmir's find routines as they were never used.
assert self.closed and not self.connecting
self.connecting = True
d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
- d.addCallback(self.connected)
+ d.addCallbacks(self.connected, self.connectionError)
def connected(self, proto):
"""Begin processing the queued requests."""
self.proto = proto
self.processQueue()
+ def connectionError(self, err):
+ """Cancel the requests."""
+ log.msg('Failed to connect to the peer by HTTP.')
+ log.err(err)
+
+ # Remove one request so that we don't loop indefinitely
+ if self.request_queue:
+ req = self.request_queue.pop(0)
+ req.deferRequest.errback(err)
+
+ self._completed += 1
+ self._errors += 1
+ self.rerank()
+ if self.connecting:
+ self.connecting = False
+ self.clientGone(None)
+
def close(self):
"""Close the connection to the peer."""
if not self.closed:
log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
resp = super(FileDownloader, self).renderHTTP(req)
if isinstance(resp, defer.Deferred):
- resp.addCallback(self._renderHTTP_done, req)
+ resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
+ callbackArgs = (req, ), errbackArgs = (req, ))
else:
resp = self._renderHTTP_done(resp, req)
return resp
return resp
+ def _renderHTTP_error(self, err, req):
+ log.msg('Failed to render %s: %r' % (req.uri, err))
+ log.err(err)
+
+ if self.manager:
+ path = 'http:/' + req.uri
+ return self.manager.get_resp(req, path)
+
+ return err
+
def createSimilarFile(self, path):
return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
self.processors, self.indexNames[:])
"""
log.msg('Checking if %s is still fresh' % url)
d = self.peers.get('', url, method = "HEAD", modtime = modtime)
- d.addCallback(self.check_freshness_done, req, url, resp)
+ d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
+ callbackArgs = (req, url, resp), errbackArgs = (req, url))
return d
def check_freshness_done(self, resp, req, url, orig_resp):
log.msg('Stale, need to redownload: %s' % url)
return self.get_resp(req, url)
+ def check_freshness_error(self, err, req, url):
+ """Mirror request failed, continue with download.
+
+ @param err: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ """
+ log.err(err)
+ return self.get_resp(req, url)
+
def get_resp(self, req, url):
"""Lookup a hash for the file in the local mirror info.
if not self.khashmir:
self.khashmir = Khashmir(self.config, self.cache_dir)
+ self.outstandingJoins = 0
for node in self.bootstrap:
host, port = node.rsplit(':', 1)
port = int(port)
+ self.outstandingJoins += 1
# Translate host names into IP addresses
if isIPAddress(host):
self._join_gotIP(host, port)
else:
- reactor.resolve(host).addCallback(self._join_gotIP, port)
+ reactor.resolve(host).addCallbacks(self._join_gotIP,
+ self._join_resolveFailed,
+ callbackArgs = (port, ),
+ errbackArgs = (host, port))
return self.joining
def _join_gotIP(self, ip, port):
"""Join the DHT using a single bootstrap nodes IP address."""
- self.outstandingJoins += 1
self.khashmir.addContact(ip, port, self._join_single, self._join_error)
+ def _join_resolveFailed(self, err, host, port):
+ """Failed to lookup the IP address of the bootstrap node."""
+ log.msg('Failed to find an IP address for host: (%r, %r)' % (host, port))
+ log.err(err)
+ self.outstandingJoins -= 1
+ if self.outstandingJoins <= 0:
+ self.khashmir.findCloseNodes(self._join_complete)
+
def _join_single(self, addr):
"""Process the response from the bootstrap node.
if addr:
self.foundAddrs.append(addr)
if addr or self.outstandingJoins <= 0:
- self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+ self.khashmir.findCloseNodes(self._join_complete)
log.msg('Got back from bootstrap node: %r' % (addr,))
def _join_error(self, failure = None):
self.outstandingJoins -= 1
log.msg("bootstrap node could not be reached")
if self.outstandingJoins <= 0:
- self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+ self.khashmir.findCloseNodes(self._join_complete)
def _join_complete(self, result):
"""End the joining process and return the addresses found for this node."""
- if not self.joined and len(result) > 1:
+ if not self.joined and isinstance(result, list) and len(result) > 1:
self.joined = True
if self.joining and self.outstandingJoins <= 0:
df = self.joining
n = self.Node(NULL_ID, host, port)
self.sendJoin(n, callback=callback, errback=errback)
- def findNode(self, id, callback, errback=None):
+ def findNode(self, id, callback):
"""Find the contact info for the K closest nodes in the global table.
@type id: C{string}
@type callback: C{method}
@param callback: the method to call with the results, it must take 1
parameter, the list of K closest nodes
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
# Start with our node
nodes = [copy(self.node)]
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
-
# Start the finding nodes action
- state = FindNode(self, id, d.callback, self.config, self.stats)
+ state = FindNode(self, id, callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, node, contacted = True):
df = node.join(self.node.id)
df.addCallbacks(_pongHandler, _defaultPong)
- def findCloseNodes(self, callback=lambda a: None, errback = None):
+ def findCloseNodes(self, callback=lambda a: None):
"""Perform a findNode on the ID one away from our own.
This will allow us to populate our table with nodes on our network
@param callback: the method to call with the results, it must take 1
parameter, the list of K closest nodes
(optional, defaults to doing nothing with the results)
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback, errback)
+ self.findNode(id, callback)
def refreshTable(self, force = False):
"""Check all the buckets for those that need refreshing.
_Node = KNodeRead
#{ Local interface
- def findValue(self, key, callback, errback=None):
+ def findValue(self, key, callback):
"""Get the nodes that have values for the key from the global table.
@type key: C{string}
@type callback: C{method}
@param callback: the method to call with the results, it must take 1
parameter, the list of nodes with values
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
# Start with ourself
nodes = [copy(self.node)]
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
-
# Search for others starting with the locally found ones
- state = FindValue(self, key, d.callback, self.config, self.stats)
+ state = FindValue(self, key, callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def valueForKey(self, key, callback, searchlocal = True):