X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=HTTPDownloader.py;h=7e5a06cb6880a10bcb8ce0a20c43ef56e1862ab9;hp=7d9644419700933d732273619cefefed1f6ea9fd;hb=de97c2578c709b19e96c0419c9482a5487ab20ce;hpb=4efc0d25095680d57493fee290c2ee4b4a2f44bb diff --git a/HTTPDownloader.py b/HTTPDownloader.py index 7d96444..7e5a06c 100644 --- a/HTTPDownloader.py +++ b/HTTPDownloader.py @@ -1,59 +1,226 @@ -from twisted.web2.client import http -from twisted.internet.defer import Deferred +from twisted.internet import reactor, defer, protocol +from twisted.internet.protocol import ClientFactory +from twisted import version as twisted_version +from twisted.web2.client.interfaces import IHTTPClientManager +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2 import stream as stream_mod, http_headers +from twisted.web2 import version as web2_version +from twisted.trial import unittest +from zope.interface import implements -class HTTPClientManager(object): - """A manager for all HTTP requests to a site. +from apt_dht_conf import version + +class HTTPClientManager(ClientFactory): + """A manager for all HTTP requests to a single site. + Controls all requests that got to a single site (host and port). + This includes buffering requests until they can be sent and reconnecting + in the even of the connection being closed. """ implements(IHTTPClientManager) - def __init__(self, host, port): + def __init__(self, host, port=80): self.host = host self.port = port - self.client = http.HTTPClientProtocol(self) self.busy = False self.pipeline = False - self.closed = False - self.pending_requests = [] + self.closed = True + self.connecting = False + self.request_queue = [] + self.response_queue = [] + self.proto = None + self.connector = None - def get(self, path): - uri = 'http://' + self.host + ':' + self.port + path - request = http.ClientRequest('GET', uri, {}, None) - request.responseDefer = Deferred() - self.pending_requests.append(request) - if not self.busy: - self._submitRequest() + def connect(self): + assert(self.closed and not self.connecting) + self.connecting = True + d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) + d.addCallback(self.connected) + + def connected(self, proto): + self.closed = False + self.connecting = False + self.proto = proto + self.processQueue() - return request.responseDefer + def close(self): + if not self.closed: + self.proto.transport.loseConnection() + + def is_idle(self): + return not self.busy and not self.request_queue and not self.response_queue - def _submitRequest(self): - assert self.pending_requests + def submitRequest(self, request): + request.deferRequest = defer.Deferred() + self.request_queue.append(request) + self.processQueue() + return request.deferRequest + + def processQueue(self): + if not self.request_queue: + return + if self.connecting: + return if self.closed: - del self.client - self.client = http.HTTPClientProtocol(self) - - request = self.pending_requests.pop() - d = self.client.submitRequest(request, False) - d.addCallback(request.responseDefer.callback) + self.connect() + return + if self.busy and not self.pipeline: + return + if self.response_queue and not self.pipeline: + return + + req = self.request_queue.pop(0) + self.response_queue.append(req) + req.deferResponse = self.proto.submitRequest(req, False) + req.deferResponse.addCallback(self.requestComplete) + req.deferResponse.addErrback(self.requestError) + + def requestComplete(self, resp): + req = self.response_queue.pop(0) + req.deferRequest.callback(resp) + + def requestError(self, error): + req = self.response_queue.pop(0) + req.deferRequest.errback(error) def clientBusy(self, proto): self.busy = True def clientIdle(self, proto): self.busy = False - if self.pending_requests: - self._submitRequest() + self.processQueue() def clientPipelining(self, proto): self.pipeline = True + self.processQueue() def clientGone(self, proto): - self.closed = True + for req in self.response_queue: + req.deferRequest.errback(ProtocolError('lost connection')) self.busy = False self.pipeline = False - del self.client - if self.pending_requests: - self._submitRequest() + self.closed = True + self.connecting = False + self.response_queue = [] + self.proto = None + if self.request_queue: + self.processQueue() + + def setCommonHeaders(self): + headers = http_headers.Headers() + headers.setHeader('Host', self.host) + headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % + (version.short(), twisted_version.short(), web2_version.short())) + return headers + + def get(self, path, method="GET"): + headers = self.setCommonHeaders() + return self.submitRequest(ClientRequest(method, path, headers, None)) + + def getRange(self, path, rangeStart, rangeEnd, method="GET"): + headers = self.setCommonHeaders() + headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) + return self.submitRequest(ClientRequest(method, path, headers, None)) + +class TestClientManager(unittest.TestCase): + """Unit tests for the HTTPClientManager.""" + + client = None + pending_calls = [] + + def gotResp(self, resp, num, expect): + self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) + if expect is not None: + self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) + def print_(n): + pass + def printdone(n): + pass + stream_mod.readStream(resp.stream, print_).addCallback(printdone) + + def test_download(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.get('/robots.txt') + d.addCallback(self.gotResp, 1, 309) + return d + + def test_head(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.get('/robots.txt', "HEAD") + d.addCallback(self.gotResp, 1, 0) + return d + + def test_multiple_downloads(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 120 + lastDefer = defer.Deferred() + + def newRequest(path, num, expect, last=False): + d = self.client.get(path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest("/", 1, 3433) + newRequest("/blog/", 2, 37121) + newRequest("/camrdale.html", 3, 2234) + self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309)) + self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True)) + return lastDefer + + def test_multiple_quick_downloads(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 30 + lastDefer = defer.Deferred() + + def newRequest(path, num, expect, last=False): + d = self.client.get(path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest("/", 1, 3433) + newRequest("/blog/", 2, 37121) + newRequest("/camrdale.html", 3, 2234) + self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True)) + return lastDefer + + def test_range(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.getRange('/robots.txt', 100, 199) + d.addCallback(self.gotResp, 1, 100) + return d + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.pending_calls = [] + if self.client: + self.client.close() + self.client = None