X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=f1488e540481a51a75b445a2bf2fa48ca465ec4c;hb=09472169824fc47c359cdee65652a12a7ba46806;hp=2a53897ee64c0b8ca4a03bf19eb6d40773e25f32;hpb=eaecf245ed359e161b761e9c2d9ace4598ca5a20;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 2a53897..f1488e5 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -17,6 +17,24 @@ from zope.interface import implements from apt_p2p_conf import version +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,9 +46,11 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port + self.stats = stats + self.mirror = False self.rank = 0.5 self.busy = False self.pipeline = False @@ -45,14 +65,18 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%r, %r, %r)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" @@ -61,6 +85,23 @@ class Peer(ClientFactory): self.proto = proto self.processQueue() + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req = self.request_queue.pop(0) + req.deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) + def close(self): """Close the connection to the peer.""" if not self.closed: @@ -105,8 +146,6 @@ class Peer(ClientFactory): req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() self._responseTimes.append((now, now - req.submissionTime)) self._lastResponse = (now, resp.stream.length)