from twisted.python import log
from twisted.web2.client.interfaces import IHTTPClientManager
from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
+from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
from twisted.web2 import stream as stream_mod, http_headers
from twisted.web2 import version as web2_version
from twisted.trial import unittest
# Remove one request so that we don't loop indefinitely
if self.request_queue:
- req = self.request_queue.pop(0)
- req.deferRequest.errback(err)
+ req, deferRequest, submissionTime = self.request_queue.pop(0)
+ deferRequest.errback(err)
self._completed += 1
self._errors += 1
@type request: L{twisted.web2.client.http.ClientRequest}
@return: deferred that will fire with the completed request
"""
- request.submissionTime = datetime.now()
- request.deferRequest = defer.Deferred()
- self.request_queue.append(request)
+ submissionTime = datetime.now()
+ deferRequest = defer.Deferred()
+ self.request_queue.append((request, deferRequest, submissionTime))
self.rerank()
reactor.callLater(0, self.processQueue)
- return request.deferRequest
+ return deferRequest
def processQueue(self):
"""Check the queue to see if new requests can be sent to the peer."""
return
if self.outstanding and not self.pipeline:
return
+ if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
+ and not self.proto.inRequests)
+ or self.proto.readPersistent is PERSIST_PIPELINE):
+ log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
+ (self.proto.readPersistent, self.proto.inRequests))
+ return
- req = self.request_queue.pop(0)
+ req, deferRequest, submissionTime = self.request_queue.pop(0)
+ try:
+ deferResponse = self.proto.submitRequest(req, False)
+ except:
+ # Try again later
+ log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
+ log.err()
+ self.request_queue.insert(0, (request, deferRequest, submissionTime))
+ ractor.callLater(1, self.processQueue)
+ return
+
self.outstanding += 1
self.rerank()
- req.deferResponse = self.proto.submitRequest(req, False)
- req.deferResponse.addCallbacks(self.requestComplete, self.requestError,
- callbackArgs = (req, ), errbackArgs = (req, ))
+ deferResponse.addCallbacks(self.requestComplete, self.requestError,
+ callbackArgs = (req, deferRequest, submissionTime),
+ errbackArgs = (req, deferRequest))
- def requestComplete(self, resp, req):
+ def requestComplete(self, resp, req, deferRequest, submissionTime):
"""Process a completed request."""
self._processLastResponse()
self.outstanding -= 1
assert self.outstanding >= 0
- log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
+ log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
self._completed += 1
now = datetime.now()
- self._responseTimes.append((now, now - req.submissionTime))
+ self._responseTimes.append((now, now - submissionTime))
self._lastResponse = (now, resp.stream.length)
self.rerank()
- req.deferRequest.callback(resp)
+ deferRequest.callback(resp)
- def requestError(self, error, req):
+ def requestError(self, error, req, deferRequest):
"""Process a request that ended with an error."""
self._processLastResponse()
self.outstanding -= 1
self._completed += 1
self._errors += 1
self.rerank()
- req.deferRequest.errback(error)
+ deferRequest.errback(error)
def hashError(self, error):
"""Log that a hash error occurred from the peer."""