X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=1a0752c4e087a46a8bd562634d986c34a897576f;hb=855f99e45331dd8c96bc241fd984d3fd97b05071;hp=3c57c82e58b86ec0b091f3dd030d9ebc7bf0358a;hpb=3a40bdc7125e5a4595779fe663b1f4a83c3b2a03;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 3c57c82..1a0752c 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -9,7 +9,8 @@ from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest +from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version from twisted.trial import unittest @@ -17,6 +18,33 @@ from zope.interface import implements from apt_p2p_conf import version +class PipelineError(Exception): + """An error has occurred in pipelining requests.""" + +class FixedHTTPClientChannelRequest(HTTPClientChannelRequest): + """Fix the broken _error function.""" + + def __init__(self, channel, request, closeAfter): + HTTPClientChannelRequest.__init__(self, channel, request, closeAfter) + self.started = False + + def _error(self, err): + """ + Abort parsing, and depending of the status of the request, either fire + the C{responseDefer} if no response has been sent yet, or close the + stream. + """ + if self.started: + self.abortParse() + if hasattr(self, 'stream') and self.stream is not None: + self.stream.finish(err) + else: + self.responseDefer.errback(err) + + def gotInitialLine(self, initialLine): + self.started = True + HTTPClientChannelRequest.gotInitialLine(self, initialLine) + class LoggingHTTPClientProtocol(HTTPClientProtocol): """A modified client protocol that logs the number of bytes received.""" @@ -35,6 +63,65 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol): self.stats.receivedBytes(len(data), self.mirror) HTTPClientProtocol.rawDataReceived(self, data) + def submitRequest(self, request, closeAfter=True): + """ + @param request: The request to send to a remote server. + @type request: L{ClientRequest} + + @param closeAfter: If True the 'Connection: close' header will be sent, + otherwise 'Connection: keep-alive' + @type closeAfter: C{bool} + + @rtype: L{twisted.internet.defer.Deferred} + @return: A Deferred which will be called back with the + L{twisted.web2.http.Response} from the server. + """ + + # Assert we're in a valid state to submit more + assert self.outRequest is None + assert ((self.readPersistent is PERSIST_NO_PIPELINE + and not self.inRequests) + or self.readPersistent is PERSIST_PIPELINE) + + self.manager.clientBusy(self) + if closeAfter: + self.readPersistent = False + + self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self, + request, closeAfter) + self.inRequests.append(chanRequest) + + chanRequest.submit() + return chanRequest.responseDefer + + def setReadPersistent(self, persist): + oldPersist = self.readPersistent + self.readPersistent = persist + if not persist: + # Tell all requests but first to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + elif (oldPersist is PERSIST_NO_PIPELINE and + persist is PERSIST_PIPELINE and + self.outRequest is None): + self.manager.clientPipelining(self) + + def connectionLost(self, reason): + self.readPersistent = False + self.setTimeout(None) + self.manager.clientGone(self) + # Cancel the current request + if self.inRequests and self.inRequests[0] is not None: + self.inRequests[0].connectionLost(reason) + # Tell all remaining requests to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + if request is not None: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -51,7 +138,7 @@ class Peer(ClientFactory): self.port = port self.stats = stats self.mirror = False - self.rank = 0.1 + self.rank = 0.01 self.busy = False self.pipeline = False self.closed = True @@ -73,6 +160,7 @@ class Peer(ClientFactory): def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting + log.msg('Connecting to (%s, %d)' % (self.host, self.port)) self.connecting = True d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) @@ -80,6 +168,7 @@ class Peer(ClientFactory): def connected(self, proto): """Begin processing the queued requests.""" + log.msg('Connected to (%s, %d)' % (self.host, self.port)) self.closed = False self.connecting = False self.proto = proto @@ -92,8 +181,8 @@ class Peer(ClientFactory): # Remove one request so that we don't loop indefinitely if self.request_queue: - req = self.request_queue.pop(0) - req.deferRequest.errback(err) + req, deferRequest, submissionTime = self.request_queue.pop(0) + deferRequest.errback(err) self._completed += 1 self._errors += 1 @@ -113,12 +202,12 @@ class Peer(ClientFactory): @type request: L{twisted.web2.client.http.ClientRequest} @return: deferred that will fire with the completed request """ - request.submissionTime = datetime.now() - request.deferRequest = defer.Deferred() - self.request_queue.append(request) + submissionTime = datetime.now() + deferRequest = defer.Deferred() + self.request_queue.append((request, deferRequest, submissionTime)) self.rerank() reactor.callLater(0, self.processQueue) - return request.deferRequest + return deferRequest def processQueue(self): """Check the queue to see if new requests can be sent to the peer.""" @@ -133,28 +222,44 @@ class Peer(ClientFactory): return if self.outstanding and not self.pipeline: return + if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE + and not self.proto.inRequests) + or self.proto.readPersistent is PERSIST_PIPELINE): + log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' % + (self.proto.readPersistent, self.proto.inRequests)) + return - req = self.request_queue.pop(0) + req, deferRequest, submissionTime = self.request_queue.pop(0) + try: + deferResponse = self.proto.submitRequest(req, False) + except: + # Try again later + log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, )) + log.err() + self.request_queue.insert(0, (request, deferRequest, submissionTime)) + ractor.callLater(1, self.processQueue) + return + self.outstanding += 1 self.rerank() - req.deferResponse = self.proto.submitRequest(req, False) - req.deferResponse.addCallbacks(self.requestComplete, self.requestError, - callbackArgs = (req, ), errbackArgs = (req, )) + deferResponse.addCallbacks(self.requestComplete, self.requestError, + callbackArgs = (req, deferRequest, submissionTime), + errbackArgs = (req, deferRequest)) - def requestComplete(self, resp, req): + def requestComplete(self, resp, req, deferRequest, submissionTime): """Process a completed request.""" self._processLastResponse() self.outstanding -= 1 assert self.outstanding >= 0 - log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) + log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers)) self._completed += 1 now = datetime.now() - self._responseTimes.append((now, now - req.submissionTime)) + self._responseTimes.append((now, now - submissionTime)) self._lastResponse = (now, resp.stream.length) self.rerank() - req.deferRequest.callback(resp) + deferRequest.callback(resp) - def requestError(self, error, req): + def requestError(self, error, req, deferRequest): """Process a request that ended with an error.""" self._processLastResponse() self.outstanding -= 1 @@ -163,7 +268,7 @@ class Peer(ClientFactory): self._completed += 1 self._errors += 1 self.rerank() - req.deferRequest.errback(error) + deferRequest.errback(error) def hashError(self, error): """Log that a hash error occurred from the peer.""" @@ -191,6 +296,7 @@ class Peer(ClientFactory): def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() + log.msg('Lost the connection to (%s, %d)' % (self.host, self.port)) self.busy = False self.pipeline = False self.closed = True @@ -252,8 +358,9 @@ class Peer(ClientFactory): def _processLastResponse(self): """Save the download time of the last request for speed calculations.""" if self._lastResponse is not None: - now = datetime.now() - self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) + if self._lastResponse[1] is not None: + now = datetime.now() + self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) self._lastResponse = None def downloadSpeed(self):