from twisted.python import log
from twisted.web2.client.interfaces import IHTTPClientManager
from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
+from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
from twisted.web2 import stream as stream_mod, http_headers
from twisted.web2 import version as web2_version
from twisted.trial import unittest
self.port = port
self.stats = stats
self.mirror = False
- self.rank = 0.5
+ self.rank = 0.1
self.busy = False
self.pipeline = False
self.closed = True
self.connecting = False
self.request_queue = []
- self.response_queue = []
+ self.outstanding = 0
self.proto = None
self.connector = None
self._errors = 0
self.closed = False
self.connecting = False
self.proto = proto
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
def connectionError(self, err):
"""Cancel the requests."""
# Remove one request so that we don't loop indefinitely
if self.request_queue:
- req = self.request_queue.pop(0)
- req.deferRequest.errback(err)
+ req, deferRequest, submissionTime = self.request_queue.pop(0)
+ deferRequest.errback(err)
self._completed += 1
self._errors += 1
@type request: L{twisted.web2.client.http.ClientRequest}
@return: deferred that will fire with the completed request
"""
- request.submissionTime = datetime.now()
- request.deferRequest = defer.Deferred()
- self.request_queue.append(request)
+ submissionTime = datetime.now()
+ deferRequest = defer.Deferred()
+ self.request_queue.append((request, deferRequest, submissionTime))
self.rerank()
- self.processQueue()
- return request.deferRequest
+ reactor.callLater(0, self.processQueue)
+ return deferRequest
def processQueue(self):
"""Check the queue to see if new requests can be sent to the peer."""
return
if self.busy and not self.pipeline:
return
- if self.response_queue and not self.pipeline:
+ if self.outstanding and not self.pipeline:
+ return
+ if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
+ and not self.proto.inRequests)
+ or self.proto.readPersistent is PERSIST_PIPELINE):
+ log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
+ (self.proto.readPersistent, self.proto.inRequests))
return
- req = self.request_queue.pop(0)
- self.response_queue.append(req)
+ req, deferRequest, submissionTime = self.request_queue.pop(0)
+ try:
+ deferResponse = self.proto.submitRequest(req, False)
+ except:
+ # Try again later
+ log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
+ log.err()
+ self.request_queue.insert(0, (request, deferRequest, submissionTime))
+ ractor.callLater(1, self.processQueue)
+ return
+
+ self.outstanding += 1
self.rerank()
- req.deferResponse = self.proto.submitRequest(req, False)
- req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
+ deferResponse.addCallbacks(self.requestComplete, self.requestError,
+ callbackArgs = (req, deferRequest, submissionTime),
+ errbackArgs = (req, deferRequest))
- def requestComplete(self, resp):
+ def requestComplete(self, resp, req, deferRequest, submissionTime):
"""Process a completed request."""
self._processLastResponse()
- req = self.response_queue.pop(0)
- log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
+ self.outstanding -= 1
+ assert self.outstanding >= 0
+ log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
self._completed += 1
now = datetime.now()
- self._responseTimes.append((now, now - req.submissionTime))
+ self._responseTimes.append((now, now - submissionTime))
self._lastResponse = (now, resp.stream.length)
self.rerank()
- req.deferRequest.callback(resp)
+ deferRequest.callback(resp)
- def requestError(self, error):
+ def requestError(self, error, req, deferRequest):
"""Process a request that ended with an error."""
self._processLastResponse()
- req = self.response_queue.pop(0)
+ self.outstanding -= 1
+ assert self.outstanding >= 0
log.msg('Download of %s generated error %r' % (req.uri, error))
self._completed += 1
self._errors += 1
self.rerank()
- req.deferRequest.errback(error)
+ deferRequest.errback(error)
def hashError(self, error):
"""Log that a hash error occurred from the peer."""
"""Try to send a new request."""
self._processLastResponse()
self.busy = False
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
self.rerank()
def clientPipelining(self, proto):
"""Try to send a new request."""
self.pipeline = True
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
def clientGone(self, proto):
"""Mark sent requests as errors."""
self._processLastResponse()
- for req in self.response_queue:
- req.deferRequest.errback(ProtocolError('lost connection'))
self.busy = False
self.pipeline = False
self.closed = True
self.connecting = False
- self.response_queue = []
self.proto = None
self.rerank()
if self.request_queue:
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
#{ Downloading request interface
def setCommonHeaders(self):
#{ Peer information
def isIdle(self):
"""Check whether the peer is idle or not."""
- return not self.busy and not self.request_queue and not self.response_queue
+ return not self.busy and not self.request_queue and not self.outstanding
def _processLastResponse(self):
"""Save the download time of the last request for speed calculations."""
rank = 1.0
if self.closed:
rank *= 0.9
- rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
+ rank *= exp(-(len(self.request_queue) + self.outstanding))
speed = self.downloadSpeed()
if speed > 0.0:
rank *= exp(-512.0*1024 / speed)
if self._completed:
- rank *= exp(-float(self._errors) / self._completed)
+ rank *= exp(-10.0 * self._errors / self._completed)
rank *= exp(-self.responseTime() / 5.0)
self.rank = rank
client = None
pending_calls = []
+ length = []
def gotResp(self, resp, num, expect):
self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
if expect is not None:
self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
- def print_(n):
- pass
- def printdone(n):
- pass
- stream_mod.readStream(resp.stream, print_).addCallback(printdone)
+ while len(self.length) <= num:
+ self.length.append(0)
+ self.length[num] = 0
+ def addData(data, self = self, num = num):
+ self.length[num] += len(data)
+ def checkLength(resp, self = self, num = num, length = resp.stream.length):
+ self.failUnlessEqual(self.length[num], length)
+ return resp
+ df = stream_mod.readStream(resp.stream, addData)
+ df.addCallback(checkLength)
+ return df
def test_download(self):
"""Tests a normal download."""
newRequest("/rfc/rfc0801.txt", 3, 40824)
# This one will probably be queued
- self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
+ self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
# Connection should still be open, but idle
self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))