X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=35847f701a433f0f3ea6a505b4ef396d75d3a2ce;hb=05422476cb06c6ccd2def7709a251e618e1eafb3;hp=057b5a2befdc6783d1c2e295d8c6b9024c782231;hpb=c429a67c05afa54e5fe44607e5fe7c09fd35e81a;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 057b5a2..35847f7 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -10,6 +10,7 @@ from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version from twisted.trial import unittest @@ -17,6 +18,36 @@ from zope.interface import implements from apt_p2p_conf import version +class PipelineError(Exception): + """An error has occurred in pipelining requests.""" + +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + + def setReadPersistent(self, persist): + self.readPersistent = persist + if not persist: + # Tell all requests but first to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + request.connectionLost(PipelineError('The pipelined connection was lost')) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,17 +59,18 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port + self.stats = stats self.mirror = False - self.rank = 0.5 + self.rank = 0.1 self.busy = False self.pipeline = False self.closed = True self.connecting = False self.request_queue = [] - self.response_queue = [] + self.outstanding = 0 self.proto = None self.connector = None self._errors = 0 @@ -55,15 +87,33 @@ class Peer(ClientFactory): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" self.closed = False self.connecting = False self.proto = proto - self.processQueue() + reactor.callLater(0, self.processQueue) + + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req, deferRequest, submissionTime = self.request_queue.pop(0) + deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) def close(self): """Close the connection to the peer.""" @@ -76,12 +126,12 @@ class Peer(ClientFactory): @type request: L{twisted.web2.client.http.ClientRequest} @return: deferred that will fire with the completed request """ - request.submissionTime = datetime.now() - request.deferRequest = defer.Deferred() - self.request_queue.append(request) + submissionTime = datetime.now() + deferRequest = defer.Deferred() + self.request_queue.append((request, deferRequest, submissionTime)) self.rerank() - self.processQueue() - return request.deferRequest + reactor.callLater(0, self.processQueue) + return deferRequest def processQueue(self): """Check the queue to see if new requests can be sent to the peer.""" @@ -94,36 +144,55 @@ class Peer(ClientFactory): return if self.busy and not self.pipeline: return - if self.response_queue and not self.pipeline: + if self.outstanding and not self.pipeline: + return + if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE + and not self.proto.inRequests) + or self.proto.readPersistent is PERSIST_PIPELINE): + log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' % + (self.proto.readPersistent, self.proto.inRequests)) return - req = self.request_queue.pop(0) - self.response_queue.append(req) + req, deferRequest, submissionTime = self.request_queue.pop(0) + try: + deferResponse = self.proto.submitRequest(req, False) + except: + # Try again later + log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, )) + log.err() + self.request_queue.insert(0, (request, deferRequest, submissionTime)) + ractor.callLater(1, self.processQueue) + return + + self.outstanding += 1 self.rerank() - req.deferResponse = self.proto.submitRequest(req, False) - req.deferResponse.addCallbacks(self.requestComplete, self.requestError) + deferResponse.addCallbacks(self.requestComplete, self.requestError, + callbackArgs = (req, deferRequest, submissionTime), + errbackArgs = (req, deferRequest)) - def requestComplete(self, resp): + def requestComplete(self, resp, req, deferRequest, submissionTime): """Process a completed request.""" self._processLastResponse() - req = self.response_queue.pop(0) - log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) + self.outstanding -= 1 + assert self.outstanding >= 0 + log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers)) self._completed += 1 now = datetime.now() - self._responseTimes.append((now, now - req.submissionTime)) + self._responseTimes.append((now, now - submissionTime)) self._lastResponse = (now, resp.stream.length) self.rerank() - req.deferRequest.callback(resp) + deferRequest.callback(resp) - def requestError(self, error): + def requestError(self, error, req, deferRequest): """Process a request that ended with an error.""" self._processLastResponse() - req = self.response_queue.pop(0) + self.outstanding -= 1 + assert self.outstanding >= 0 log.msg('Download of %s generated error %r' % (req.uri, error)) self._completed += 1 self._errors += 1 self.rerank() - req.deferRequest.errback(error) + deferRequest.errback(error) def hashError(self, error): """Log that a hash error occurred from the peer.""" @@ -140,28 +209,25 @@ class Peer(ClientFactory): """Try to send a new request.""" self._processLastResponse() self.busy = False - self.processQueue() + reactor.callLater(0, self.processQueue) self.rerank() def clientPipelining(self, proto): """Try to send a new request.""" self.pipeline = True - self.processQueue() + reactor.callLater(0, self.processQueue) def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() - for req in self.response_queue: - req.deferRequest.errback(ProtocolError('lost connection')) self.busy = False self.pipeline = False self.closed = True self.connecting = False - self.response_queue = [] self.proto = None self.rerank() if self.request_queue: - self.processQueue() + reactor.callLater(0, self.processQueue) #{ Downloading request interface def setCommonHeaders(self): @@ -210,7 +276,7 @@ class Peer(ClientFactory): #{ Peer information def isIdle(self): """Check whether the peer is idle or not.""" - return not self.busy and not self.request_queue and not self.response_queue + return not self.busy and not self.request_queue and not self.outstanding def _processLastResponse(self): """Save the download time of the last request for speed calculations.""" @@ -277,12 +343,12 @@ class Peer(ClientFactory): rank = 1.0 if self.closed: rank *= 0.9 - rank *= exp(-(len(self.request_queue) - len(self.response_queue))) + rank *= exp(-(len(self.request_queue) + self.outstanding)) speed = self.downloadSpeed() if speed > 0.0: rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= exp(-float(self._errors) / self._completed) + rank *= exp(-10.0 * self._errors / self._completed) rank *= exp(-self.responseTime() / 5.0) self.rank = rank @@ -291,16 +357,23 @@ class TestClientManager(unittest.TestCase): client = None pending_calls = [] + length = [] def gotResp(self, resp, num, expect): self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) if expect is not None: self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream_mod.readStream(resp.stream, print_).addCallback(printdone) + while len(self.length) <= num: + self.length.append(0) + self.length[num] = 0 + def addData(data, self = self, num = num): + self.length[num] += len(data) + def checkLength(resp, self = self, num = num, length = resp.stream.length): + self.failUnlessEqual(self.length[num], length) + return resp + df = stream_mod.readStream(resp.stream, addData) + df.addCallback(checkLength) + return df def test_download(self): """Tests a normal download.""" @@ -341,7 +414,7 @@ class TestClientManager(unittest.TestCase): newRequest("/rfc/rfc0801.txt", 3, 40824) # This one will probably be queued - self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070)) + self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070)) # Connection should still be open, but idle self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))