]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - HTTPDownloader.py
AptPackages only takes a single cache directory.
[quix0rs-apt-p2p.git] / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted.web2.client.interfaces import IHTTPClientManager
5 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
6 from twisted.trial import unittest
7 from zope.interface import implements
8 from twisted.web2 import stream as stream_mod, http, http_headers, responsecode
9
10 class HTTPClientManager(ClientFactory):
11     """A manager for all HTTP requests to a single site.
12     
13     Controls all requests that got to a single site (host and port).
14     This includes buffering requests until they can be sent and reconnecting
15     in the even of the connection being closed.
16     
17     """
18
19     implements(IHTTPClientManager)
20     
21     def __init__(self, host, port):
22         self.host = host
23         self.port = port
24         self.busy = False
25         self.pipeline = False
26         self.closed = True
27         self.connecting = False
28         self.request_queue = []
29         self.response_queue = []
30         self.proto = None
31         self.connector = None
32         
33     def connect(self):
34         assert(self.closed and not self.connecting)
35         self.connecting = True
36         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
37         d.addCallback(self.connected)
38
39     def connected(self, proto):
40         self.closed = False
41         self.connecting = False
42         self.proto = proto
43         self.processQueue()
44         
45     def close(self):
46         if not self.closed:
47             self.proto.transport.loseConnection()
48
49     def is_idle(self):
50         return not self.busy and not self.request_queue and not self.response_queue
51     
52     def submitRequest(self, request):
53         request.deferRequest = defer.Deferred()
54         self.request_queue.append(request)
55         self.processQueue()
56         return request.deferRequest
57
58     def processQueue(self):
59         if not self.request_queue:
60             return
61         if self.connecting:
62             return
63         if self.closed:
64             self.connect()
65             return
66         if self.busy and not self.pipeline:
67             return
68         if self.response_queue and not self.pipeline:
69             return
70
71         req = self.request_queue.pop(0)
72         self.response_queue.append(req)
73         req.deferResponse = self.proto.submitRequest(req, False)
74         req.deferResponse.addCallback(self.requestComplete)
75         req.deferResponse.addErrback(self.requestError)
76
77     def requestComplete(self, resp):
78         req = self.response_queue.pop(0)
79         req.deferRequest.callback(resp)
80
81     def requestError(self, error):
82         req = self.response_queue.pop(0)
83         req.deferRequest.errback(error)
84
85     def clientBusy(self, proto):
86         self.busy = True
87
88     def clientIdle(self, proto):
89         self.busy = False
90         self.processQueue()
91
92     def clientPipelining(self, proto):
93         self.pipeline = True
94         self.processQueue()
95
96     def clientGone(self, proto):
97         for req in self.response_queue:
98             req.deferRequest.errback(ProtocolError('lost connection'))
99         self.busy = False
100         self.pipeline = False
101         self.closed = True
102         self.connecting = False
103         self.response_queue = []
104         self.proto = None
105         if self.request_queue:
106             self.processQueue()
107             
108 class HTTPDownloader:
109     """Manages all the HTTP connections to various sites."""
110     
111     def __init__(self):
112         self.clients = {}
113         
114     def setCommonHeaders(self, host):
115         headers = http_headers.Headers()
116         headers.setHeader('Host', host)
117         headers.setHeader('User-Agent', 'apt-dht/0.0.0 (twisted.web2 0.2.0+svn20070403)')
118         return headers
119     
120     def get(self, host, port, path, method="GET"):
121         site = host + ":" + str(port)
122         if site not in self.clients:
123             self.clients[site] = HTTPClientManager(host, port)
124         headers = self.setCommonHeaders(host)
125         return self.clients[site].submitRequest(ClientRequest(method, path, headers, None))
126     
127     def getRange(self, host, port, path, rangeStart, rangeEnd, method="GET"):
128         site = host + ":" + str(port)
129         if site not in self.clients:
130             self.clients[site] = HTTPClientManager(host, port)
131         headers = self.setCommonHeaders(host)
132         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
133         return self.clients[site].submitRequest(ClientRequest(method, path, headers, None))
134     
135     def closeAll(self):
136         for site in self.clients:
137             self.clients[site].close()
138         self.clients = {}
139
140 class TestClientManager(unittest.TestCase):
141     """Unit tests for the HTTPClientManager."""
142     
143     client = None
144     pending_calls = []
145     
146     def gotResp(self, resp, num, expect):
147         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
148         if expect is not None:
149             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
150         def print_(n):
151             pass
152         def printdone(n):
153             pass
154         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
155     
156     def test_download(self):
157         host = 'www.camrdale.org'
158         self.client = HTTPClientManager(host, 80)
159         self.timeout = 10
160         lastDefer = defer.Deferred()
161         
162         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host}, None))
163         d.addCallback(self.gotResp, 1, 309)
164         d.addBoth(lastDefer.callback)
165         return lastDefer
166         
167     def test_head(self):
168         host = 'www.camrdale.org'
169         self.client = HTTPClientManager(host, 80)
170         self.timeout = 10
171         lastDefer = defer.Deferred()
172         
173         d = self.client.submitRequest(ClientRequest("HEAD", '/robots.txt', {'Host':host}, None))
174         d.addCallback(self.gotResp, 1, 0)
175         d.addBoth(lastDefer.callback)
176         return lastDefer
177         
178     def test_multiple_downloads(self):
179         host = 'www.camrdale.org'
180         self.client = HTTPClientManager(host, 80)
181         self.timeout = 120
182         lastDefer = defer.Deferred()
183         
184         def newRequest(path, num, expect, last=False):
185             d = self.client.submitRequest(ClientRequest("GET", path, {'Host':host}, None))
186             d.addCallback(self.gotResp, num, expect)
187             if last:
188                 d.addCallback(lastDefer.callback)
189                 
190         newRequest("/", 1, 3433)
191         newRequest("/blog/", 2, 37121)
192         newRequest("/camrdale.html", 3, 2234)
193         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
194         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
195         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
196         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
197         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
198         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
199         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
200         return lastDefer
201         
202     def test_range(self):
203         host = 'www.camrdale.org'
204         self.client = HTTPClientManager(host, 80)
205         self.timeout = 10
206         lastDefer = defer.Deferred()
207         
208         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host, 'Range': ('bytes', [(100, 199)])}, None))
209         d.addCallback(self.gotResp, 1, 100)
210         d.addBoth(lastDefer.callback)
211         return lastDefer
212         
213     def tearDown(self):
214         for p in self.pending_calls:
215             if p.active():
216                 p.cancel()
217         self.pending_calls = []
218         if self.client:
219             self.client.close()
220             self.client = None
221
222 class TestDownloader(unittest.TestCase):
223     """Unit tests for the HTTPDownloader."""
224     
225     manager = None
226     pending_calls = []
227     
228     def gotResp(self, resp, num, expect):
229         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
230         if expect is not None:
231             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
232         def print_(n):
233             pass
234         def printdone(n):
235             pass
236         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
237     
238     def test_download(self):
239         self.manager = HTTPDownloader()
240         self.timeout = 10
241         lastDefer = defer.Deferred()
242         
243         host = 'www.camrdale.org'
244         d = self.manager.get(host, 80, '/robots.txt')
245         d.addCallback(self.gotResp, 1, 309)
246         d.addBoth(lastDefer.callback)
247         return lastDefer
248         
249     def test_head(self):
250         self.manager = HTTPDownloader()
251         self.timeout = 10
252         lastDefer = defer.Deferred()
253         
254         host = 'www.camrdale.org'
255         d = self.manager.get(host, 80, '/robots.txt', "HEAD")
256         d.addCallback(self.gotResp, 1, 0)
257         d.addBoth(lastDefer.callback)
258         return lastDefer
259         
260     def test_multiple_downloads(self):
261         self.manager = HTTPDownloader()
262         self.timeout = 120
263         lastDefer = defer.Deferred()
264         
265         def newRequest(host, path, num, expect, last=False):
266             d = self.manager.get(host, 80, path)
267             d.addCallback(self.gotResp, num, expect)
268             if last:
269                 d.addCallback(lastDefer.callback)
270                 
271         newRequest('www.camrdale.org', "/", 1, 3433)
272         newRequest('www.camrdale.org', "/blog/", 2, 37121)
273         newRequest('www.google.ca', "/", 3, None)
274         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
275         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084))
276         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750))
277         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
278         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525))
279         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381))
280         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
281         return lastDefer
282         
283     def test_range(self):
284         self.manager = HTTPDownloader()
285         self.timeout = 10
286         lastDefer = defer.Deferred()
287         
288         host = 'www.camrdale.org'
289         d = self.manager.getRange(host, 80, '/robots.txt', 100, 199)
290         d.addCallback(self.gotResp, 1, 100)
291         d.addBoth(lastDefer.callback)
292         return lastDefer
293         
294     def tearDown(self):
295         for p in self.pending_calls:
296             if p.active():
297                 p.cancel()
298         self.pending_calls = []
299         if self.manager:
300             self.manager.closeAll()
301             self.manager = None