]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - HTTPDownloader.py
Read the response in the unit tests to prevent RST packets.
[quix0rs-apt-p2p.git] / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted.web2.client.interfaces import IHTTPClientManager
5 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
6 from twisted.trial import unittest
7 from zope.interface import implements
8 from twisted.web2 import stream as stream_mod, http, http_headers, responsecode
9
10 class HTTPClientManager(ClientFactory):
11     """A manager for all HTTP requests to a single site.
12     
13     Controls all requests that got to a single site (host and port).
14     This includes buffering requests until they can be sent and reconnecting
15     in the even of the connection being closed.
16     
17     """
18
19     implements(IHTTPClientManager)
20     
21     def __init__(self, host, port):
22         self.host = host
23         self.port = port
24         self.busy = False
25         self.pipeline = False
26         self.closed = True
27         self.connecting = False
28         self.request_queue = []
29         self.response_queue = []
30         self.proto = None
31         self.connector = None
32         
33     def connect(self):
34         assert(self.closed and not self.connecting)
35         self.connecting = True
36         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
37         d.addCallback(self.connected)
38
39     def connected(self, proto):
40         self.closed = False
41         self.connecting = False
42         self.proto = proto
43         self.processQueue()
44         
45     def close(self):
46         if not self.closed:
47             self.proto.transport.loseConnection()
48
49     def is_idle(self):
50         return not self.busy and not self.request_queue and not self.response_queue
51     
52     def submitRequest(self, request):
53         request.deferRequest = defer.Deferred()
54         self.request_queue.append(request)
55         self.processQueue()
56         return request.deferRequest
57
58     def processQueue(self):
59         if not self.request_queue:
60             return
61         if self.connecting:
62             return
63         if self.closed:
64             self.connect()
65             return
66         if self.busy and not self.pipeline:
67             return
68         if self.response_queue and not self.pipeline:
69             return
70
71         req = self.request_queue.pop(0)
72         self.response_queue.append(req)
73         req.deferResponse = self.proto.submitRequest(req, False)
74         req.deferResponse.addCallback(self.requestComplete)
75         req.deferResponse.addErrback(self.requestError)
76
77     def requestComplete(self, resp):
78         req = self.response_queue.pop(0)
79         req.deferRequest.callback(resp)
80
81     def requestError(self, error):
82         req = self.response_queue.pop(0)
83         req.deferRequest.errback(error)
84
85     def clientBusy(self, proto):
86         self.busy = True
87
88     def clientIdle(self, proto):
89         self.busy = False
90         self.processQueue()
91
92     def clientPipelining(self, proto):
93         self.pipeline = True
94         self.processQueue()
95
96     def clientGone(self, proto):
97         for req in self.response_queue:
98             req.deferRequest.errback(ProtocolError('lost connection'))
99         self.busy = False
100         self.pipeline = False
101         self.closed = True
102         self.connecting = False
103         self.response_queue = []
104         self.proto = None
105         if self.request_queue:
106             self.processQueue()
107             
108 class HTTPDownloader:
109     """Manages all the HTTP connections to various sites."""
110     
111     def __init__(self):
112         self.clients = {}
113     
114     def get(self, host, port, request):
115         site = host + ":" + str(port)
116         if site not in self.clients:
117             self.clients[site] = HTTPClientManager(host, port)
118         return self.clients[site].submitRequest(request)
119     
120     def closeAll(self):
121         for site in self.clients:
122             self.clients[site].close()
123         self.clients = {}
124
125 class TestClientManager(unittest.TestCase):
126     """Unit tests for the HTTPClientManager."""
127     
128     client = None
129     pending_calls = []
130     
131     def gotResp(self, resp, num, expect):
132         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
133         if expect is not None:
134             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
135         def print_(n):
136             pass
137         def printdone(n):
138             pass
139         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
140     
141     def test_download(self):
142         host = 'www.camrdale.org'
143         self.client = HTTPClientManager(host, 80)
144         self.timeout = 10
145         lastDefer = defer.Deferred()
146         
147         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host}, None))
148         d.addCallback(self.gotResp, 1, 309)
149         d.addBoth(lastDefer.callback)
150         return lastDefer
151         
152     def test_head(self):
153         host = 'www.camrdale.org'
154         self.client = HTTPClientManager(host, 80)
155         self.timeout = 10
156         lastDefer = defer.Deferred()
157         
158         d = self.client.submitRequest(ClientRequest("HEAD", '/robots.txt', {'Host':host}, None))
159         d.addCallback(self.gotResp, 1, 0)
160         d.addBoth(lastDefer.callback)
161         return lastDefer
162         
163     def test_multiple_downloads(self):
164         host = 'www.camrdale.org'
165         self.client = HTTPClientManager(host, 80)
166         self.timeout = 120
167         lastDefer = defer.Deferred()
168         
169         def newRequest(path, num, expect, last=False):
170             d = self.client.submitRequest(ClientRequest("GET", path, {'Host':host}, None))
171             d.addCallback(self.gotResp, num, expect)
172             if last:
173                 d.addCallback(lastDefer.callback)
174                 
175         newRequest("/", 1, 3433)
176         newRequest("/blog/", 2, 37121)
177         newRequest("/camrdale.html", 3, 2234)
178         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
179         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
180         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
181         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
182         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
183         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
184         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
185         return lastDefer
186         
187     def test_range(self):
188         host = 'www.camrdale.org'
189         self.client = HTTPClientManager(host, 80)
190         self.timeout = 10
191         lastDefer = defer.Deferred()
192         
193         d = self.client.submitRequest(ClientRequest("GET", '/robots.txt', {'Host':host, 'Range': ('bytes', [(100, 199)])}, None))
194         d.addCallback(self.gotResp, 1, 100)
195         d.addBoth(lastDefer.callback)
196         return lastDefer
197         
198     def tearDown(self):
199         for p in self.pending_calls:
200             if p.active():
201                 p.cancel()
202         self.pending_calls = []
203         if self.client:
204             self.client.close()
205             self.client = None
206
207 class TestDownloader(unittest.TestCase):
208     """Unit tests for the HTTPDownloader."""
209     
210     manager = None
211     pending_calls = []
212     
213     def gotResp(self, resp, num, expect):
214         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
215         if expect is not None:
216             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
217         def print_(n):
218             pass
219         def printdone(n):
220             pass
221         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
222     
223     def test_download(self):
224         self.manager = HTTPDownloader()
225         self.timeout = 10
226         lastDefer = defer.Deferred()
227         
228         host = 'www.camrdale.org'
229         d = self.manager.get(host, 80, ClientRequest("GET", '/robots.txt', {'Host':host}, None))
230         d.addCallback(self.gotResp, 1, 309)
231         d.addBoth(lastDefer.callback)
232         return lastDefer
233         
234     def test_head(self):
235         self.manager = HTTPDownloader()
236         self.timeout = 10
237         lastDefer = defer.Deferred()
238         
239         host = 'www.camrdale.org'
240         d = self.manager.get(host, 80, ClientRequest("HEAD", '/robots.txt', {'Host':host}, None))
241         d.addCallback(self.gotResp, 1, 0)
242         d.addBoth(lastDefer.callback)
243         return lastDefer
244         
245     def test_multiple_downloads(self):
246         self.manager = HTTPDownloader()
247         self.timeout = 120
248         lastDefer = defer.Deferred()
249         
250         def newRequest(host, path, num, expect, last=False):
251             d = self.manager.get(host, 80, ClientRequest("GET", path, {'Host':host}, None))
252             d.addCallback(self.gotResp, num, expect)
253             if last:
254                 d.addCallback(lastDefer.callback)
255                 
256         newRequest('www.camrdale.org', "/", 1, 3433)
257         newRequest('www.camrdale.org', "/blog/", 2, 37121)
258         newRequest('www.google.ca', "/", 3, None)
259         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
260         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084))
261         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750))
262         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
263         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525))
264         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381))
265         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
266         return lastDefer
267         
268     def test_range(self):
269         self.manager = HTTPDownloader()
270         self.timeout = 10
271         lastDefer = defer.Deferred()
272         
273         host = 'www.camrdale.org'
274         d = self.manager.get(host, 80, ClientRequest("GET", '/robots.txt', {'Host':host, 'Range': ('bytes', [(100, 199)])}, None))
275         d.addCallback(self.gotResp, 1, 100)
276         d.addBoth(lastDefer.callback)
277         return lastDefer
278         
279     def tearDown(self):
280         for p in self.pending_calls:
281             if p.active():
282                 p.cancel()
283         self.pending_calls = []
284         if self.manager:
285             self.manager.closeAll()
286             self.manager = None