Better handling and logging for intermittent HTTP client submission errors.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
index 3c57c82e58b86ec0b091f3dd030d9ebc7bf0358a..2952f48cab295ab36381b26983f1c41a58984d2e 100644 (file)
@@ -10,6 +10,7 @@ from twisted import version as twisted_version
 from twisted.python import log
 from twisted.web2.client.interfaces import IHTTPClientManager
 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
+from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
 from twisted.web2 import stream as stream_mod, http_headers
 from twisted.web2 import version as web2_version
 from twisted.trial import unittest
@@ -92,8 +93,8 @@ class Peer(ClientFactory):
 
         # Remove one request so that we don't loop indefinitely
         if self.request_queue:
-            req = self.request_queue.pop(0)
-            req.deferRequest.errback(err)
+            req, deferRequest, submissionTime = self.request_queue.pop(0)
+            deferRequest.errback(err)
             
         self._completed += 1
         self._errors += 1
@@ -113,12 +114,12 @@ class Peer(ClientFactory):
         @type request: L{twisted.web2.client.http.ClientRequest}
         @return: deferred that will fire with the completed request
         """
-        request.submissionTime = datetime.now()
-        request.deferRequest = defer.Deferred()
-        self.request_queue.append(request)
+        submissionTime = datetime.now()
+        deferRequest = defer.Deferred()
+        self.request_queue.append((request, deferRequest, submissionTime))
         self.rerank()
         reactor.callLater(0, self.processQueue)
-        return request.deferRequest
+        return deferRequest
 
     def processQueue(self):
         """Check the queue to see if new requests can be sent to the peer."""
@@ -133,28 +134,44 @@ class Peer(ClientFactory):
             return
         if self.outstanding and not self.pipeline:
             return
+        if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
+                 and not self.proto.inRequests)
+                 or self.proto.readPersistent is PERSIST_PIPELINE):
+            log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
+                    (self.proto.readPersistent, self.proto.inRequests))
+            return
 
-        req = self.request_queue.pop(0)
+        req, deferRequest, submissionTime = self.request_queue.pop(0)
+        try:
+            deferResponse = self.proto.submitRequest(req, False)
+        except:
+            # Try again later
+            log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
+            log.err()
+            self.request_queue.insert(0, (request, deferRequest, submissionTime))
+            ractor.callLater(1, self.processQueue)
+            return
+            
         self.outstanding += 1
         self.rerank()
-        req.deferResponse = self.proto.submitRequest(req, False)
-        req.deferResponse.addCallbacks(self.requestComplete, self.requestError,
-                                       callbackArgs = (req, ), errbackArgs = (req, ))
+        deferResponse.addCallbacks(self.requestComplete, self.requestError,
+                                   callbackArgs = (req, deferRequest, submissionTime),
+                                   errbackArgs = (req, deferRequest))
 
-    def requestComplete(self, resp, req):
+    def requestComplete(self, resp, req, deferRequest, submissionTime):
         """Process a completed request."""
         self._processLastResponse()
         self.outstanding -= 1
         assert self.outstanding >= 0
-        log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
+        log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
         self._completed += 1
         now = datetime.now()
-        self._responseTimes.append((now, now - req.submissionTime))
+        self._responseTimes.append((now, now - submissionTime))
         self._lastResponse = (now, resp.stream.length)
         self.rerank()
-        req.deferRequest.callback(resp)
+        deferRequest.callback(resp)
 
-    def requestError(self, error, req):
+    def requestError(self, error, req, deferRequest):
         """Process a request that ended with an error."""
         self._processLastResponse()
         self.outstanding -= 1
@@ -163,7 +180,7 @@ class Peer(ClientFactory):
         self._completed += 1
         self._errors += 1
         self.rerank()
-        req.deferRequest.errback(error)
+        deferRequest.errback(error)
         
     def hashError(self, error):
         """Log that a hash error occurred from the peer."""