From: Cameron Dale Date: Fri, 25 Apr 2008 05:18:58 +0000 (-0700) Subject: More fixes for the broken twisted HTTP client. X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=f295cae59d5df6a377ec886ed2f279a57e3e776e;p=quix0rs-apt-p2p.git More fixes for the broken twisted HTTP client. --- diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 35847f7..55a818f 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -9,7 +9,7 @@ from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version @@ -20,6 +20,30 @@ from apt_p2p_conf import version class PipelineError(Exception): """An error has occurred in pipelining requests.""" + +class FixedHTTPClientChannelRequest(HTTPClientChannelRequest): + """Fix the broken _error function.""" + + def __init__(self, channel, request, closeAfter): + HTTPClientChannelRequest.__init__(self, channel, request, closeAfter) + self.started = False + + def _error(self, err): + """ + Abort parsing, and depending of the status of the request, either fire + the C{responseDefer} if no response has been sent yet, or close the + stream. + """ + if self.started: + self.abortParse() + if hasattr(self, 'stream') and self.stream is not None: + self.stream.finish(err) + else: + self.responseDefer.errback(err) + + def gotInitialLine(self, initialLine): + self.started = True + HTTPClientChannelRequest.gotInitialLine(self, initialLine) class LoggingHTTPClientProtocol(HTTPClientProtocol): """A modified client protocol that logs the number of bytes received.""" @@ -39,6 +63,37 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol): self.stats.receivedBytes(len(data), self.mirror) HTTPClientProtocol.rawDataReceived(self, data) + def submitRequest(self, request, closeAfter=True): + """ + @param request: The request to send to a remote server. + @type request: L{ClientRequest} + + @param closeAfter: If True the 'Connection: close' header will be sent, + otherwise 'Connection: keep-alive' + @type closeAfter: C{bool} + + @rtype: L{twisted.internet.defer.Deferred} + @return: A Deferred which will be called back with the + L{twisted.web2.http.Response} from the server. + """ + + # Assert we're in a valid state to submit more + assert self.outRequest is None + assert ((self.readPersistent is PERSIST_NO_PIPELINE + and not self.inRequests) + or self.readPersistent is PERSIST_PIPELINE) + + self.manager.clientBusy(self) + if closeAfter: + self.readPersistent = False + + self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self, + request, closeAfter) + self.inRequests.append(chanRequest) + + chanRequest.submit() + return chanRequest.responseDefer + def setReadPersistent(self, persist): self.readPersistent = persist if not persist: @@ -48,6 +103,20 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol): for request in lostRequests: request.connectionLost(PipelineError('The pipelined connection was lost')) + def connectionLost(self, reason): + self.readPersistent = False + self.setTimeout(None) + self.manager.clientGone(self) + # Cancel the current request + if self.inRequests and self.inRequests[0] is not None: + self.inRequests[0].connectionLost(reason) + # Tell all remaining requests to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + if request is not None: + request.connectionLost(reason) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -64,7 +133,7 @@ class Peer(ClientFactory): self.port = port self.stats = stats self.mirror = False - self.rank = 0.1 + self.rank = 0.01 self.busy = False self.pipeline = False self.closed = True @@ -86,6 +155,7 @@ class Peer(ClientFactory): def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting + log.msg('Connecting to (%s, %d)' % (self.host, self.port)) self.connecting = True d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) @@ -93,6 +163,7 @@ class Peer(ClientFactory): def connected(self, proto): """Begin processing the queued requests.""" + log.msg('Connected to (%s, %d)' % (self.host, self.port)) self.closed = False self.connecting = False self.proto = proto @@ -220,6 +291,7 @@ class Peer(ClientFactory): def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() + log.msg('Lost the connection to (%s, %d)' % (self.host, self.port)) self.busy = False self.pipeline = False self.closed = True