+"""Manage all download requests to a single site."""
+
from math import exp
from datetime import datetime, timedelta
self._lastResponse = None
self._responseTimes = []
+ #{ Manage the request queue
def connect(self):
+ """Connect to the peer."""
assert self.closed and not self.connecting
self.connecting = True
d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
d.addCallback(self.connected)
def connected(self, proto):
+ """Begin processing the queued requests."""
self.closed = False
self.connecting = False
self.proto = proto
self.processQueue()
def close(self):
+ """Close the connection to the peer."""
if not self.closed:
self.proto.transport.loseConnection()
def submitRequest(self, request):
+ """Add a new request to the queue.
+
+ @type request: L{twisted.web2.client.http.ClientRequest}
+ @return: deferred that will fire with the completed request
+ """
request.submissionTime = datetime.now()
request.deferRequest = defer.Deferred()
self.request_queue.append(request)
return request.deferRequest
def processQueue(self):
+ """Check the queue to see if new requests can be sent to the peer."""
if not self.request_queue:
return
if self.connecting:
req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
def requestComplete(self, resp):
+ """Process a completed request."""
self._processLastResponse()
req = self.response_queue.pop(0)
log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
req.deferRequest.callback(resp)
def requestError(self, error):
+ """Process a request that ended with an error."""
self._processLastResponse()
req = self.response_queue.pop(0)
log.msg('Download of %s generated error %r' % (req.uri, error))
log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
self._errors += 1
- # The IHTTPClientManager interface functions
+ #{ IHTTPClientManager interface
def clientBusy(self, proto):
+ """Save the busy state."""
self.busy = True
def clientIdle(self, proto):
+ """Try to send a new request."""
self._processLastResponse()
self.busy = False
self.processQueue()
def clientPipelining(self, proto):
+ """Try to send a new request."""
self.pipeline = True
self.processQueue()
def clientGone(self, proto):
+ """Mark sent requests as errors."""
self._processLastResponse()
for req in self.response_queue:
req.deferRequest.errback(ProtocolError('lost connection'))
if self.request_queue:
self.processQueue()
- # The downloading request interface functions
+ #{ Downloading request interface
def setCommonHeaders(self):
+ """Get the common HTTP headers for all requests."""
headers = http_headers.Headers()
headers.setHeader('Host', self.host)
headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' %
return headers
def get(self, path, method="GET", modtime=None):
+ """Add a new request to the queue.
+
+ @type path: C{string}
+ @param path: the path to request from the peer
+ @type method: C{string}
+ @param method: the HTTP method to use, 'GET' or 'HEAD'
+ (optional, defaults to 'GET')
+ @type modtime: C{int}
+ @param modtime: the modification time to use for an 'If-Modified-Since'
+ header, as seconds since the epoch
+ (optional, defaults to not sending that header)
+ """
headers = self.setCommonHeaders()
if modtime:
headers.setHeader('If-Modified-Since', modtime)
return self.submitRequest(ClientRequest(method, path, headers, None))
def getRange(self, path, rangeStart, rangeEnd, method="GET"):
+ """Add a new request with a Range header to the queue.
+
+ @type path: C{string}
+ @param path: the path to request from the peer
+ @type rangeStart: C{int}
+ @param rangeStart: the byte to begin the request at
+ @type rangeEnd: C{int}
+ @param rangeEnd: the byte to end the request at (inclusive)
+ @type method: C{string}
+ @param method: the HTTP method to use, 'GET' or 'HEAD'
+ (optional, defaults to 'GET')
+ """
headers = self.setCommonHeaders()
headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
return self.submitRequest(ClientRequest(method, path, headers, None))
- # Functions that return information about the peer
+ #{ Peer information
def isIdle(self):
+ """Check whether the peer is idle or not."""
return not self.busy and not self.request_queue and not self.response_queue
def _processLastResponse(self):
+ """Save the download time of the last request for speed calculations."""
if self._lastResponse is not None:
now = datetime.now()
self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
stream_mod.readStream(resp.stream, print_).addCallback(printdone)
def test_download(self):
+ """Tests a normal download."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 10
return d
def test_head(self):
+ """Tests a 'HEAD' request."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 10
return d
def test_multiple_downloads(self):
+ """Tests multiple downloads with queueing and connection closing."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 120
d.addCallback(self.gotResp, num, expect)
if last:
d.addBoth(lastDefer.callback)
-
+
+ # 3 quick requests
newRequest("/rfc/rfc0006.txt", 1, 1776)
newRequest("/rfc/rfc2362.txt", 2, 159833)
newRequest("/rfc/rfc0801.txt", 3, 40824)
+
+ # This one will probably be queued
self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
+
+ # Connection should still be open, but idle
self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
+
+ #Connection should be closed
self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
+
+ # Now it should definitely be closed
self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
return lastDefer
def test_multiple_quick_downloads(self):
+ """Tests lots of multiple downloads with queueing."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 30
log.msg('Response Time is: %r' % self.client.responseTime())
def test_peer_info(self):
+ """Test retrieving the peer info during a download."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 120
return lastDefer
def test_range(self):
+ """Test a Range request."""
host = 'www.ietf.org'
self.client = Peer(host, 80)
self.timeout = 10