X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=1831ceabd19f51363cb1c32fb8a6abc11db0ead8;hb=8e5874b77f6b3d6e008c60001b5f49420e0b404b;hp=2a53897ee64c0b8ca4a03bf19eb6d40773e25f32;hpb=eaecf245ed359e161b761e9c2d9ace4598ca5a20;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 2a53897..1831cea 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -17,6 +17,24 @@ from zope.interface import implements from apt_p2p_conf import version +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,10 +46,12 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port - self.rank = 0.5 + self.stats = stats + self.mirror = False + self.rank = 0.1 self.busy = False self.pipeline = False self.closed = True @@ -45,21 +65,42 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%r, %r, %r)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" self.closed = False self.connecting = False self.proto = proto - self.processQueue() + reactor.callLater(0, self.processQueue) + + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req = self.request_queue.pop(0) + req.deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) def close(self): """Close the connection to the peer.""" @@ -76,7 +117,7 @@ class Peer(ClientFactory): request.deferRequest = defer.Deferred() self.request_queue.append(request) self.rerank() - self.processQueue() + reactor.callLater(0, self.processQueue) return request.deferRequest def processQueue(self): @@ -105,8 +146,6 @@ class Peer(ClientFactory): req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() self._responseTimes.append((now, now - req.submissionTime)) self._lastResponse = (now, resp.stream.length) @@ -138,19 +177,20 @@ class Peer(ClientFactory): """Try to send a new request.""" self._processLastResponse() self.busy = False - self.processQueue() + reactor.callLater(0, self.processQueue) self.rerank() def clientPipelining(self, proto): """Try to send a new request.""" self.pipeline = True - self.processQueue() + reactor.callLater(0, self.processQueue) def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() for req in self.response_queue: - req.deferRequest.errback(ProtocolError('lost connection')) + reactor.callLater(0, req.deferRequest.errback, + ProtocolError('lost connection')) self.busy = False self.pipeline = False self.closed = True @@ -159,7 +199,7 @@ class Peer(ClientFactory): self.proto = None self.rerank() if self.request_queue: - self.processQueue() + reactor.callLater(0, self.processQueue) #{ Downloading request interface def setCommonHeaders(self): @@ -275,12 +315,12 @@ class Peer(ClientFactory): rank = 1.0 if self.closed: rank *= 0.9 - rank *= exp(-(len(self.request_queue) - len(self.response_queue))) + rank *= exp(-(len(self.request_queue) + len(self.response_queue))) speed = self.downloadSpeed() if speed > 0.0: rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= exp(-float(self._errors) / self._completed) + rank *= exp(-10.0 * self._errors / self._completed) rank *= exp(-self.responseTime() / 5.0) self.rank = rank @@ -289,16 +329,23 @@ class TestClientManager(unittest.TestCase): client = None pending_calls = [] + length = [] def gotResp(self, resp, num, expect): self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) if expect is not None: self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream_mod.readStream(resp.stream, print_).addCallback(printdone) + while len(self.length) <= num: + self.length.append(0) + self.length[num] = 0 + def addData(data, self = self, num = num): + self.length[num] += len(data) + def checkLength(resp, self = self, num = num, length = resp.stream.length): + self.failUnlessEqual(self.length[num], length) + return resp + df = stream_mod.readStream(resp.stream, addData) + df.addCallback(checkLength) + return df def test_download(self): """Tests a normal download.""" @@ -339,7 +386,7 @@ class TestClientManager(unittest.TestCase): newRequest("/rfc/rfc0801.txt", 3, 40824) # This one will probably be queued - self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070)) + self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070)) # Connection should still be open, but idle self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))