From: Cameron Dale Date: Sat, 1 Mar 2008 02:43:12 +0000 (-0800) Subject: Documented the HTTPDownloader module. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=222c430337a9fe573aba00223d6322ff7e0b3fb2 Documented the HTTPDownloader module. --- diff --git a/apt_dht/HTTPDownloader.py b/apt_dht/HTTPDownloader.py index 2d91816..49bab65 100644 --- a/apt_dht/HTTPDownloader.py +++ b/apt_dht/HTTPDownloader.py @@ -1,4 +1,6 @@ +"""Manage all download requests to a single site.""" + from math import exp from datetime import datetime, timedelta @@ -43,23 +45,32 @@ class Peer(ClientFactory): self._lastResponse = None self._responseTimes = [] + #{ Manage the request queue def connect(self): + """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) d.addCallback(self.connected) def connected(self, proto): + """Begin processing the queued requests.""" self.closed = False self.connecting = False self.proto = proto self.processQueue() def close(self): + """Close the connection to the peer.""" if not self.closed: self.proto.transport.loseConnection() def submitRequest(self, request): + """Add a new request to the queue. + + @type request: L{twisted.web2.client.http.ClientRequest} + @return: deferred that will fire with the completed request + """ request.submissionTime = datetime.now() request.deferRequest = defer.Deferred() self.request_queue.append(request) @@ -67,6 +78,7 @@ class Peer(ClientFactory): return request.deferRequest def processQueue(self): + """Check the queue to see if new requests can be sent to the peer.""" if not self.request_queue: return if self.connecting: @@ -85,6 +97,7 @@ class Peer(ClientFactory): req.deferResponse.addCallbacks(self.requestComplete, self.requestError) def requestComplete(self, resp): + """Process a completed request.""" self._processLastResponse() req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) @@ -97,6 +110,7 @@ class Peer(ClientFactory): req.deferRequest.callback(resp) def requestError(self, error): + """Process a request that ended with an error.""" self._processLastResponse() req = self.response_queue.pop(0) log.msg('Download of %s generated error %r' % (req.uri, error)) @@ -109,20 +123,24 @@ class Peer(ClientFactory): log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error)) self._errors += 1 - # The IHTTPClientManager interface functions + #{ IHTTPClientManager interface def clientBusy(self, proto): + """Save the busy state.""" self.busy = True def clientIdle(self, proto): + """Try to send a new request.""" self._processLastResponse() self.busy = False self.processQueue() def clientPipelining(self, proto): + """Try to send a new request.""" self.pipeline = True self.processQueue() def clientGone(self, proto): + """Mark sent requests as errors.""" self._processLastResponse() for req in self.response_queue: req.deferRequest.errback(ProtocolError('lost connection')) @@ -135,8 +153,9 @@ class Peer(ClientFactory): if self.request_queue: self.processQueue() - # The downloading request interface functions + #{ Downloading request interface def setCommonHeaders(self): + """Get the common HTTP headers for all requests.""" headers = http_headers.Headers() headers.setHeader('Host', self.host) headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % @@ -144,21 +163,47 @@ class Peer(ClientFactory): return headers def get(self, path, method="GET", modtime=None): + """Add a new request to the queue. + + @type path: C{string} + @param path: the path to request from the peer + @type method: C{string} + @param method: the HTTP method to use, 'GET' or 'HEAD' + (optional, defaults to 'GET') + @type modtime: C{int} + @param modtime: the modification time to use for an 'If-Modified-Since' + header, as seconds since the epoch + (optional, defaults to not sending that header) + """ headers = self.setCommonHeaders() if modtime: headers.setHeader('If-Modified-Since', modtime) return self.submitRequest(ClientRequest(method, path, headers, None)) def getRange(self, path, rangeStart, rangeEnd, method="GET"): + """Add a new request with a Range header to the queue. + + @type path: C{string} + @param path: the path to request from the peer + @type rangeStart: C{int} + @param rangeStart: the byte to begin the request at + @type rangeEnd: C{int} + @param rangeEnd: the byte to end the request at (inclusive) + @type method: C{string} + @param method: the HTTP method to use, 'GET' or 'HEAD' + (optional, defaults to 'GET') + """ headers = self.setCommonHeaders() headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) return self.submitRequest(ClientRequest(method, path, headers, None)) - # Functions that return information about the peer + #{ Peer information def isIdle(self): + """Check whether the peer is idle or not.""" return not self.busy and not self.request_queue and not self.response_queue def _processLastResponse(self): + """Save the download time of the last request for speed calculations.""" if self._lastResponse is not None: now = datetime.now() self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) @@ -246,6 +291,7 @@ class TestClientManager(unittest.TestCase): stream_mod.readStream(resp.stream, print_).addCallback(printdone) def test_download(self): + """Tests a normal download.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 10 @@ -255,6 +301,7 @@ class TestClientManager(unittest.TestCase): return d def test_head(self): + """Tests a 'HEAD' request.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 10 @@ -264,6 +311,7 @@ class TestClientManager(unittest.TestCase): return d def test_multiple_downloads(self): + """Tests multiple downloads with queueing and connection closing.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 120 @@ -274,20 +322,30 @@ class TestClientManager(unittest.TestCase): d.addCallback(self.gotResp, num, expect) if last: d.addBoth(lastDefer.callback) - + + # 3 quick requests newRequest("/rfc/rfc0006.txt", 1, 1776) newRequest("/rfc/rfc2362.txt", 2, 159833) newRequest("/rfc/rfc0801.txt", 3, 40824) + + # This one will probably be queued self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070)) + + # Connection should still be open, but idle self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606)) + + #Connection should be closed self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696)) self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976)) self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27)) self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088)) + + # Now it should definitely be closed self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True)) return lastDefer def test_multiple_quick_downloads(self): + """Tests lots of multiple downloads with queueing.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 30 @@ -317,6 +375,7 @@ class TestClientManager(unittest.TestCase): log.msg('Response Time is: %r' % self.client.responseTime()) def test_peer_info(self): + """Test retrieving the peer info during a download.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 120 @@ -345,6 +404,7 @@ class TestClientManager(unittest.TestCase): return lastDefer def test_range(self): + """Test a Range request.""" host = 'www.ietf.org' self.client = Peer(host, 80) self.timeout = 10