]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - HTTPDownloader.py
Add a manager for the client downloads and the tests for it.
[quix0rs-apt-p2p.git] / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted.web2.client.interfaces import IHTTPClientManager
5 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
6 from twisted.trial import unittest
7 from zope.interface import implements
8
9 class HTTPClientManager(ClientFactory):
10     """A manager for all HTTP requests to a single site.
11     
12     Controls all requests that got to a single site (host and port).
13     This includes buffering requests until they can be sent and reconnecting
14     in the even of the connection being closed.
15     
16     """
17
18     implements(IHTTPClientManager)
19     
20     def __init__(self, host, port):
21         self.host = host
22         self.port = port
23         self.busy = False
24         self.pipeline = False
25         self.closed = True
26         self.connecting = False
27         self.request_queue = []
28         self.response_queue = []
29         self.proto = None
30         self.connector = None
31         
32     def connect(self):
33         assert(self.closed and not self.connecting)
34         self.connecting = True
35         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
36         d.addCallback(self.connected)
37
38     def connected(self, proto):
39         self.closed = False
40         self.connecting = False
41         self.proto = proto
42         self.processQueue()
43         
44     def close(self):
45         if not self.closed:
46             self.proto.transport.loseConnection()
47
48     def is_idle(self):
49         return not self.busy and not self.request_queue and not self.response_queue
50     
51     def submitRequest(self, request):
52         request.deferRequest = defer.Deferred()
53         self.request_queue.append(request)
54         self.processQueue()
55         return request.deferRequest
56
57     def processQueue(self):
58         if not self.request_queue:
59             return
60         if self.connecting:
61             return
62         if self.closed:
63             self.connect()
64             return
65         if self.busy and not self.pipeline:
66             return
67         if self.response_queue and not self.pipeline:
68             return
69
70         req = self.request_queue.pop(0)
71         self.response_queue.append(req)
72         req.deferResponse = self.proto.submitRequest(req, False)
73         req.deferResponse.addCallback(self.requestComplete)
74         req.deferResponse.addErrback(self.requestError)
75
76     def requestComplete(self, resp):
77         req = self.response_queue.pop(0)
78         req.deferRequest.callback(resp)
79
80     def requestError(self, error):
81         req = self.response_queue.pop(0)
82         req.deferRequest.errback(error)
83
84     def clientBusy(self, proto):
85         self.busy = True
86
87     def clientIdle(self, proto):
88         self.busy = False
89         self.processQueue()
90
91     def clientPipelining(self, proto):
92         self.pipeline = True
93         self.processQueue()
94
95     def clientGone(self, proto):
96         for req in self.response_queue:
97             req.deferRequest.errback(ProtocolError('lost connection'))
98         self.busy = False
99         self.pipeline = False
100         self.closed = True
101         self.connecting = False
102         self.response_queue = []
103         self.proto = None
104         if self.request_queue:
105             self.processQueue()
106             
107 class HTTPDownloader:
108     """Manages all the HTTP connections to various sites."""
109     
110     def __init__(self):
111         self.clients = {}
112     
113     def get(self, host, port, request):
114         site = host + ":" + str(port)
115         if site not in self.clients:
116             self.clients[site] = HTTPClientManager(host, port)
117         return self.clients[site].submitRequest(request)
118     
119     def closeAll(self):
120         for site in self.clients:
121             self.clients[site].close()
122         self.clients = {}
123
124 class TestClientManager(unittest.TestCase):
125     """Unit tests for the HTTPClientManager."""
126     
127     client = None
128     pending_calls = []
129     
130     def gotResp(self, resp, num, expect):
131         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
132         self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
133         resp.stream.close()
134     
135     def test_download(self):
136         host = 'www.camrdale.org'
137         self.client = HTTPClientManager(host, 80)
138         self.timeout = 10
139         lastDefer = defer.Deferred()
140         
141         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host}, None))
142         d.addCallback(self.gotResp, 1, 309)
143         d.addBoth(lastDefer.callback)
144         return lastDefer
145         
146     def test_head(self):
147         host = 'www.camrdale.org'
148         self.client = HTTPClientManager(host, 80)
149         self.timeout = 10
150         lastDefer = defer.Deferred()
151         
152         d = self.client.submitRequest(ClientRequest("HEAD", '/robots.txt', {'Host':host}, None))
153         d.addCallback(self.gotResp, 1, 0)
154         d.addBoth(lastDefer.callback)
155         return lastDefer
156         
157     def test_multiple_downloads(self):
158         host = 'www.camrdale.org'
159         self.client = HTTPClientManager(host, 80)
160         self.timeout = 120
161         lastDefer = defer.Deferred()
162         
163         def newRequest(path, num, expect, last=False):
164             d = self.client.submitRequest(ClientRequest("GET", path, {'Host':host}, None))
165             d.addCallback(self.gotResp, num, expect)
166             if last:
167                 d.addCallback(lastDefer.callback)
168                 
169         newRequest("/", 1, 3433)
170         newRequest("/blog/", 2, 37121)
171         newRequest("/camrdale.html", 3, 2234)
172         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
173         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
174         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
175         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
176         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
177         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
178         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
179         return lastDefer
180         
181     def test_range(self):
182         host = 'www.camrdale.org'
183         self.client = HTTPClientManager(host, 80)
184         self.timeout = 10
185         lastDefer = defer.Deferred()
186         
187         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host, 'Range': ('bytes', [(100, 199)])}, None))
188         d.addCallback(self.gotResp, 1, 100)
189         d.addBoth(lastDefer.callback)
190         return lastDefer
191         
192     def tearDown(self):
193         for p in self.pending_calls:
194             if p.active():
195                 p.cancel()
196         self.pending_calls = []
197         if self.client:
198             self.client.close()
199             self.client = None
200
201 class TestDownloader(unittest.TestCase):
202     """Unit tests for the HTTPDownloader."""
203     
204     manager = None
205     pending_calls = []
206     
207     def gotResp(self, resp, num, expect):
208         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
209         if expect is not None:
210             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
211         resp.stream.close()
212     
213     def test_download(self):
214         self.manager = HTTPDownloader()
215         self.timeout = 10
216         lastDefer = defer.Deferred()
217         
218         host = 'www.camrdale.org'
219         d = self.manager.get(host, 80, ClientRequest("GET", '/robots.txt', {'Host':host}, None))
220         d.addCallback(self.gotResp, 1, 309)
221         d.addBoth(lastDefer.callback)
222         return lastDefer
223         
224     def test_head(self):
225         self.manager = HTTPDownloader()
226         self.timeout = 10
227         lastDefer = defer.Deferred()
228         
229         host = 'www.camrdale.org'
230         d = self.manager.get(host, 80, ClientRequest("HEAD", '/robots.txt', {'Host':host}, None))
231         d.addCallback(self.gotResp, 1, 0)
232         d.addBoth(lastDefer.callback)
233         return lastDefer
234         
235     def test_multiple_downloads(self):
236         self.manager = HTTPDownloader()
237         self.timeout = 120
238         lastDefer = defer.Deferred()
239         
240         def newRequest(host, path, num, expect, last=False):
241             d = self.manager.get(host, 80, ClientRequest("GET", path, {'Host':host}, None))
242             d.addCallback(self.gotResp, num, expect)
243             if last:
244                 d.addCallback(lastDefer.callback)
245                 
246         newRequest('www.camrdale.org', "/", 1, 3433)
247         newRequest('www.camrdale.org', "/blog/", 2, 37121)
248         newRequest('www.google.ca', "/", 3, None)
249         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
250         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084))
251         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750))
252         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
253         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525))
254         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381))
255         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
256         return lastDefer
257         
258     def test_range(self):
259         self.manager = HTTPDownloader()
260         self.timeout = 10
261         lastDefer = defer.Deferred()
262         
263         host = 'www.camrdale.org'
264         d = self.manager.get(host, 80, ClientRequest("GET", '/robots.txt', {'Host':host, 'Range': ('bytes', [(100, 199)])}, None))
265         d.addCallback(self.gotResp, 1, 100)
266         d.addBoth(lastDefer.callback)
267         return lastDefer
268         
269     def tearDown(self):
270         for p in self.pending_calls:
271             if p.active():
272                 p.cancel()
273         self.pending_calls = []
274         if self.manager:
275             self.manager.closeAll()
276             self.manager = None