X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=HTTPDownloader.py;h=c580ed4714d946edd9dab01f1b9eec28da68737d;hb=f9400e1b17a4f83b6178f7b13f0cbe14efe8d8af;hp=3e705ddf4ceecaa9b4a41879c8e26dc9b2f93c31;hpb=e0f42c76df7c36de9a4889a429e57594f577e100;p=quix0rs-apt-p2p.git diff --git a/HTTPDownloader.py b/HTTPDownloader.py index 3e705dd..c580ed4 100644 --- a/HTTPDownloader.py +++ b/HTTPDownloader.py @@ -5,10 +5,14 @@ from twisted.web2.client.interfaces import IHTTPClientManager from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol from twisted.trial import unittest from zope.interface import implements +from twisted.web2 import stream as stream_mod, http, http_headers, responsecode class HTTPClientManager(ClientFactory): """A manager for all HTTP requests to a single site. + Controls all requests that got to a single site (host and port). + This includes buffering requests until they can be sent and reconnecting + in the even of the connection being closed. """ @@ -42,6 +46,9 @@ class HTTPClientManager(ClientFactory): if not self.closed: self.proto.transport.loseConnection() + def is_idle(self): + return not self.busy and not self.request_queue and not self.response_queue + def submitRequest(self, request): request.deferRequest = defer.Deferred() self.request_queue.append(request) @@ -97,16 +104,54 @@ class HTTPClientManager(ClientFactory): self.proto = None if self.request_queue: self.processQueue() + +class HTTPDownloader: + """Manages all the HTTP connections to various sites.""" + + def __init__(self): + self.clients = {} + + def setCommonHeaders(self, host): + headers = http_headers.Headers() + headers.setHeader('Host', host) + headers.setHeader('User-Agent', 'apt-dht/0.0.0 (twisted.web2 0.2.0+svn20070403)') + return headers + + def get(self, host, port, path, method="GET"): + site = host + ":" + str(port) + if site not in self.clients: + self.clients[site] = HTTPClientManager(host, port) + headers = self.setCommonHeaders(host) + return self.clients[site].submitRequest(ClientRequest(method, path, headers, None)) + + def getRange(self, host, port, path, rangeStart, rangeEnd, method="GET"): + site = host + ":" + str(port) + if site not in self.clients: + self.clients[site] = HTTPClientManager(host, port) + headers = self.setCommonHeaders(host) + headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) + return self.clients[site].submitRequest(ClientRequest(method, path, headers, None)) + + def closeAll(self): + for site in self.clients: + self.clients[site].close() + self.clients = {} -class TestDownloader(unittest.TestCase): +class TestClientManager(unittest.TestCase): + """Unit tests for the HTTPClientManager.""" client = None pending_calls = [] def gotResp(self, resp, num, expect): self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) - self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - resp.stream.close() + if expect is not None: + self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) + def print_(n): + pass + def printdone(n): + pass + stream_mod.readStream(resp.stream, print_).addCallback(printdone) def test_download(self): host = 'www.camrdale.org' @@ -173,3 +218,84 @@ class TestDownloader(unittest.TestCase): if self.client: self.client.close() self.client = None + +class TestDownloader(unittest.TestCase): + """Unit tests for the HTTPDownloader.""" + + manager = None + pending_calls = [] + + def gotResp(self, resp, num, expect): + self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) + if expect is not None: + self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) + def print_(n): + pass + def printdone(n): + pass + stream_mod.readStream(resp.stream, print_).addCallback(printdone) + + def test_download(self): + self.manager = HTTPDownloader() + self.timeout = 10 + lastDefer = defer.Deferred() + + host = 'www.camrdale.org' + d = self.manager.get(host, 80, '/robots.txt') + d.addCallback(self.gotResp, 1, 309) + d.addBoth(lastDefer.callback) + return lastDefer + + def test_head(self): + self.manager = HTTPDownloader() + self.timeout = 10 + lastDefer = defer.Deferred() + + host = 'www.camrdale.org' + d = self.manager.get(host, 80, '/robots.txt', "HEAD") + d.addCallback(self.gotResp, 1, 0) + d.addBoth(lastDefer.callback) + return lastDefer + + def test_multiple_downloads(self): + self.manager = HTTPDownloader() + self.timeout = 120 + lastDefer = defer.Deferred() + + def newRequest(host, path, num, expect, last=False): + d = self.manager.get(host, 80, path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest('www.camrdale.org', "/", 1, 3433) + newRequest('www.camrdale.org', "/blog/", 2, 37121) + newRequest('www.google.ca', "/", 3, None) + self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None)) + self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None)) + self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True)) + return lastDefer + + def test_range(self): + self.manager = HTTPDownloader() + self.timeout = 10 + lastDefer = defer.Deferred() + + host = 'www.camrdale.org' + d = self.manager.getRange(host, 80, '/robots.txt', 100, 199) + d.addCallback(self.gotResp, 1, 100) + d.addBoth(lastDefer.callback) + return lastDefer + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.pending_calls = [] + if self.manager: + self.manager.closeAll() + self.manager = None