X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=apt_p2p%2FHTTPDownloader.py;h=97346db878aa5adfdb06feb84c68b1dad11eb92e;hb=fcfb936185ed7dfa126e443b3b281215eefc4a67;hp=15ee5640696bc777b76b6a1ae4f7c900fcf9de2a;hpb=40ef1ce5ded865bc9c339d15e667e87fc5775a7c;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 15ee564..97346db 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -17,6 +17,24 @@ from zope.interface import implements from apt_p2p_conf import version +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,10 +46,12 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port - self.rank = 0.5 + self.stats = stats + self.mirror = False + self.rank = 0.1 self.busy = False self.pipeline = False self.closed = True @@ -47,15 +67,16 @@ class Peer(ClientFactory): self._responseTimes = [] def __repr__(self): - return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank) + return "(%r, %r, %r)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" @@ -64,6 +85,23 @@ class Peer(ClientFactory): self.proto = proto self.processQueue() + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req = self.request_queue.pop(0) + req.deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) + def close(self): """Close the connection to the peer.""" if not self.closed: @@ -151,7 +189,8 @@ class Peer(ClientFactory): """Mark sent requests as errors.""" self._processLastResponse() for req in self.response_queue: - req.deferRequest.errback(ProtocolError('lost connection')) + reactor.callLater(0, req.deferRequest.errback, + ProtocolError('lost connection')) self.busy = False self.pipeline = False self.closed = True @@ -276,12 +315,12 @@ class Peer(ClientFactory): rank = 1.0 if self.closed: rank *= 0.9 - rank *= exp(-(len(self.request_queue) - len(self.response_queue))) + rank *= exp(-(len(self.request_queue) + len(self.response_queue))) speed = self.downloadSpeed() if speed > 0.0: rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= exp(-float(self._errors) / self._completed) + rank *= exp(-10.0 * self._errors / self._completed) rank *= exp(-self.responseTime() / 5.0) self.rank = rank