More fixes for the broken twisted HTTP client.
authorCameron Dale <camrdale@gmail.com>
Fri, 25 Apr 2008 05:18:58 +0000 (22:18 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 25 Apr 2008 05:18:58 +0000 (22:18 -0700)
apt_p2p/HTTPDownloader.py

index 35847f7..55a818f 100644 (file)
@@ -9,7 +9,7 @@ from twisted.internet.protocol import ClientFactory
 from twisted import version as twisted_version
 from twisted.python import log
 from twisted.web2.client.interfaces import IHTTPClientManager
-from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
+from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
 from twisted.web2 import stream as stream_mod, http_headers
 from twisted.web2 import version as web2_version
@@ -20,6 +20,30 @@ from apt_p2p_conf import version
 
 class PipelineError(Exception):
     """An error has occurred in pipelining requests."""
+
+class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
+    """Fix the broken _error function."""
+
+    def __init__(self, channel, request, closeAfter):
+        HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
+        self.started = False
+
+    def _error(self, err):
+        """
+        Abort parsing, and depending of the status of the request, either fire
+        the C{responseDefer} if no response has been sent yet, or close the
+        stream.
+        """
+        if self.started:
+            self.abortParse()
+        if hasattr(self, 'stream') and self.stream is not None:
+            self.stream.finish(err)
+        else:
+            self.responseDefer.errback(err)
+
+    def gotInitialLine(self, initialLine):
+        self.started = True
+        HTTPClientChannelRequest.gotInitialLine(self, initialLine)
     
 class LoggingHTTPClientProtocol(HTTPClientProtocol):
     """A modified client protocol that logs the number of bytes received."""
@@ -39,6 +63,37 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol):
             self.stats.receivedBytes(len(data), self.mirror)
         HTTPClientProtocol.rawDataReceived(self, data)
 
+    def submitRequest(self, request, closeAfter=True):
+        """
+        @param request: The request to send to a remote server.
+        @type request: L{ClientRequest}
+
+        @param closeAfter: If True the 'Connection: close' header will be sent,
+            otherwise 'Connection: keep-alive'
+        @type closeAfter: C{bool}
+
+        @rtype: L{twisted.internet.defer.Deferred}
+        @return: A Deferred which will be called back with the
+            L{twisted.web2.http.Response} from the server.
+        """
+
+        # Assert we're in a valid state to submit more
+        assert self.outRequest is None
+        assert ((self.readPersistent is PERSIST_NO_PIPELINE
+                 and not self.inRequests)
+                or self.readPersistent is PERSIST_PIPELINE)
+
+        self.manager.clientBusy(self)
+        if closeAfter:
+            self.readPersistent = False
+
+        self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
+                                            request, closeAfter)
+        self.inRequests.append(chanRequest)
+
+        chanRequest.submit()
+        return chanRequest.responseDefer
+
     def setReadPersistent(self, persist):
         self.readPersistent = persist
         if not persist:
@@ -48,6 +103,20 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol):
             for request in lostRequests:
                 request.connectionLost(PipelineError('The pipelined connection was lost'))
 
+    def connectionLost(self, reason):
+        self.readPersistent = False
+        self.setTimeout(None)
+        self.manager.clientGone(self)
+        # Cancel the current request
+        if self.inRequests and self.inRequests[0] is not None:
+            self.inRequests[0].connectionLost(reason)
+        # Tell all remaining requests to abort.
+        lostRequests = self.inRequests[1:]
+        del self.inRequests[1:]
+        for request in lostRequests:
+            if request is not None:
+                request.connectionLost(reason)
+                
 class Peer(ClientFactory):
     """A manager for all HTTP requests to a single peer.
     
@@ -64,7 +133,7 @@ class Peer(ClientFactory):
         self.port = port
         self.stats = stats
         self.mirror = False
-        self.rank = 0.1
+        self.rank = 0.01
         self.busy = False
         self.pipeline = False
         self.closed = True
@@ -86,6 +155,7 @@ class Peer(ClientFactory):
     def connect(self):
         """Connect to the peer."""
         assert self.closed and not self.connecting
+        log.msg('Connecting to (%s, %d)' % (self.host, self.port))
         self.connecting = True
         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
@@ -93,6 +163,7 @@ class Peer(ClientFactory):
 
     def connected(self, proto):
         """Begin processing the queued requests."""
+        log.msg('Connected to (%s, %d)' % (self.host, self.port))
         self.closed = False
         self.connecting = False
         self.proto = proto
@@ -220,6 +291,7 @@ class Peer(ClientFactory):
     def clientGone(self, proto):
         """Mark sent requests as errors."""
         self._processLastResponse()
+        log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
         self.busy = False
         self.pipeline = False
         self.closed = True