X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=d1ea0a93cebe1ee9095355d3f6e9eb2edc4eab27;hb=94e031b2636053bc0a7896b253e35e70d8d981cd;hp=10831f1ba8bc9a47d217c27fd3b724c75165a924;hpb=63552e45682929a84e8f7f1df9f381965b024f31;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 10831f1..d1ea0a9 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -9,7 +9,8 @@ from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest +from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version from twisted.trial import unittest @@ -17,6 +18,33 @@ from zope.interface import implements from apt_p2p_conf import version +class PipelineError(Exception): + """An error has occurred in pipelining requests.""" + +class FixedHTTPClientChannelRequest(HTTPClientChannelRequest): + """Fix the broken _error function.""" + + def __init__(self, channel, request, closeAfter): + HTTPClientChannelRequest.__init__(self, channel, request, closeAfter) + self.started = False + + def _error(self, err): + """ + Abort parsing, and depending of the status of the request, either fire + the C{responseDefer} if no response has been sent yet, or close the + stream. + """ + if self.started: + self.abortParse() + if hasattr(self, 'stream') and self.stream is not None: + self.stream.finish(err) + else: + self.responseDefer.errback(err) + + def gotInitialLine(self, initialLine): + self.started = True + HTTPClientChannelRequest.gotInitialLine(self, initialLine) + class LoggingHTTPClientProtocol(HTTPClientProtocol): """A modified client protocol that logs the number of bytes received.""" @@ -35,6 +63,65 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol): self.stats.receivedBytes(len(data), self.mirror) HTTPClientProtocol.rawDataReceived(self, data) + def submitRequest(self, request, closeAfter=True): + """ + @param request: The request to send to a remote server. + @type request: L{ClientRequest} + + @param closeAfter: If True the 'Connection: close' header will be sent, + otherwise 'Connection: keep-alive' + @type closeAfter: C{bool} + + @rtype: L{twisted.internet.defer.Deferred} + @return: A Deferred which will be called back with the + L{twisted.web2.http.Response} from the server. + """ + + # Assert we're in a valid state to submit more + assert self.outRequest is None + assert ((self.readPersistent is PERSIST_NO_PIPELINE + and not self.inRequests) + or self.readPersistent is PERSIST_PIPELINE) + + self.manager.clientBusy(self) + if closeAfter: + self.readPersistent = False + + self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self, + request, closeAfter) + self.inRequests.append(chanRequest) + + chanRequest.submit() + return chanRequest.responseDefer + + def setReadPersistent(self, persist): + oldPersist = self.readPersistent + self.readPersistent = persist + if not persist: + # Tell all requests but first to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + elif (oldPersist is PERSIST_NO_PIPELINE and + persist is PERSIST_PIPELINE and + self.outRequest is None): + self.manager.clientPipelining(self) + + def connectionLost(self, reason): + self.readPersistent = False + self.setTimeout(None) + self.manager.clientGone(self) + # Cancel the current request + if self.inRequests and self.inRequests[0] is not None: + self.inRequests[0].connectionLost(reason) + # Tell all remaining requests to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + if request is not None: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -51,7 +138,7 @@ class Peer(ClientFactory): self.port = port self.stats = stats self.mirror = False - self.rank = 0.1 + self.rank = 0.01 self.busy = False self.pipeline = False self.closed = True @@ -73,6 +160,7 @@ class Peer(ClientFactory): def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting + log.msg('Connecting to (%s, %d)' % (self.host, self.port)) self.connecting = True d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) @@ -80,6 +168,7 @@ class Peer(ClientFactory): def connected(self, proto): """Begin processing the queued requests.""" + log.msg('Connected to (%s, %d)' % (self.host, self.port)) self.closed = False self.connecting = False self.proto = proto @@ -133,11 +222,26 @@ class Peer(ClientFactory): return if self.outstanding and not self.pipeline: return + if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE + and not self.proto.inRequests) + or self.proto.readPersistent is PERSIST_PIPELINE): + log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' % + (self.proto.readPersistent, self.proto.inRequests)) + return req, deferRequest, submissionTime = self.request_queue.pop(0) + try: + deferResponse = self.proto.submitRequest(req, False) + except: + # Try again later + log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, )) + log.err() + self.request_queue.insert(0, (request, deferRequest, submissionTime)) + ractor.callLater(1, self.processQueue) + return + self.outstanding += 1 self.rerank() - deferResponse = self.proto.submitRequest(req, False) deferResponse.addCallbacks(self.requestComplete, self.requestError, callbackArgs = (req, deferRequest, submissionTime), errbackArgs = (req, deferRequest)) @@ -147,7 +251,7 @@ class Peer(ClientFactory): self._processLastResponse() self.outstanding -= 1 assert self.outstanding >= 0 - log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) + log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers)) self._completed += 1 now = datetime.now() self._responseTimes.append((now, now - submissionTime)) @@ -192,6 +296,7 @@ class Peer(ClientFactory): def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() + log.msg('Lost the connection to (%s, %d)' % (self.host, self.port)) self.busy = False self.pipeline = False self.closed = True