]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - HTTPDownloader.py
Moved the HTTPDownloader to the new PeerManager.
[quix0rs-apt-p2p.git] / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted.web2.client.interfaces import IHTTPClientManager
5 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
6 from twisted.trial import unittest
7 from zope.interface import implements
8 from twisted.web2 import stream as stream_mod, http_headers
9
10 class HTTPClientManager(ClientFactory):
11     """A manager for all HTTP requests to a single site.
12     
13     Controls all requests that got to a single site (host and port).
14     This includes buffering requests until they can be sent and reconnecting
15     in the even of the connection being closed.
16     
17     """
18
19     implements(IHTTPClientManager)
20     
21     def __init__(self, host, port=80):
22         self.host = host
23         self.port = port
24         self.busy = False
25         self.pipeline = False
26         self.closed = True
27         self.connecting = False
28         self.request_queue = []
29         self.response_queue = []
30         self.proto = None
31         self.connector = None
32         
33     def connect(self):
34         assert(self.closed and not self.connecting)
35         self.connecting = True
36         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
37         d.addCallback(self.connected)
38
39     def connected(self, proto):
40         self.closed = False
41         self.connecting = False
42         self.proto = proto
43         self.processQueue()
44         
45     def close(self):
46         if not self.closed:
47             self.proto.transport.loseConnection()
48
49     def is_idle(self):
50         return not self.busy and not self.request_queue and not self.response_queue
51     
52     def submitRequest(self, request):
53         request.deferRequest = defer.Deferred()
54         self.request_queue.append(request)
55         self.processQueue()
56         return request.deferRequest
57
58     def processQueue(self):
59         if not self.request_queue:
60             return
61         if self.connecting:
62             return
63         if self.closed:
64             self.connect()
65             return
66         if self.busy and not self.pipeline:
67             return
68         if self.response_queue and not self.pipeline:
69             return
70
71         req = self.request_queue.pop(0)
72         self.response_queue.append(req)
73         req.deferResponse = self.proto.submitRequest(req, False)
74         req.deferResponse.addCallback(self.requestComplete)
75         req.deferResponse.addErrback(self.requestError)
76
77     def requestComplete(self, resp):
78         req = self.response_queue.pop(0)
79         req.deferRequest.callback(resp)
80
81     def requestError(self, error):
82         req = self.response_queue.pop(0)
83         req.deferRequest.errback(error)
84
85     def clientBusy(self, proto):
86         self.busy = True
87
88     def clientIdle(self, proto):
89         self.busy = False
90         self.processQueue()
91
92     def clientPipelining(self, proto):
93         self.pipeline = True
94         self.processQueue()
95
96     def clientGone(self, proto):
97         for req in self.response_queue:
98             req.deferRequest.errback(ProtocolError('lost connection'))
99         self.busy = False
100         self.pipeline = False
101         self.closed = True
102         self.connecting = False
103         self.response_queue = []
104         self.proto = None
105         if self.request_queue:
106             self.processQueue()
107             
108     def setCommonHeaders(self):
109         headers = http_headers.Headers()
110         headers.setHeader('Host', self.host)
111         headers.setHeader('User-Agent', 'apt-dht/0.0.0 (twisted.web2 0.2.0+svn20070403)')
112         return headers
113     
114     def get(self, path, method="GET"):
115         headers = self.setCommonHeaders()
116         return self.submitRequest(ClientRequest(method, path, headers, None))
117     
118     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
119         headers = self.setCommonHeaders()
120         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
121         return self.submitRequest(ClientRequest(method, path, headers, None))
122     
123 class TestClientManager(unittest.TestCase):
124     """Unit tests for the HTTPClientManager."""
125     
126     client = None
127     pending_calls = []
128     
129     def gotResp(self, resp, num, expect):
130         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
131         if expect is not None:
132             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
133         def print_(n):
134             pass
135         def printdone(n):
136             pass
137         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
138     
139     def test_download(self):
140         host = 'www.camrdale.org'
141         self.client = HTTPClientManager(host, 80)
142         self.timeout = 10
143         
144         d = self.client.get('/robots.txt')
145         d.addCallback(self.gotResp, 1, 309)
146         return d
147         
148     def test_head(self):
149         host = 'www.camrdale.org'
150         self.client = HTTPClientManager(host, 80)
151         self.timeout = 10
152         
153         d = self.client.get('/robots.txt', "HEAD")
154         d.addCallback(self.gotResp, 1, 0)
155         return d
156         
157     def test_multiple_downloads(self):
158         host = 'www.camrdale.org'
159         self.client = HTTPClientManager(host, 80)
160         self.timeout = 120
161         lastDefer = defer.Deferred()
162         
163         def newRequest(path, num, expect, last=False):
164             d = self.client.get(path)
165             d.addCallback(self.gotResp, num, expect)
166             if last:
167                 d.addCallback(lastDefer.callback)
168                 
169         newRequest("/", 1, 3433)
170         newRequest("/blog/", 2, 37121)
171         newRequest("/camrdale.html", 3, 2234)
172         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
173         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
174         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
175         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
176         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
177         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
178         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
179         return lastDefer
180         
181     def test_multiple_quick_downloads(self):
182         host = 'www.camrdale.org'
183         self.client = HTTPClientManager(host, 80)
184         self.timeout = 30
185         lastDefer = defer.Deferred()
186         
187         def newRequest(path, num, expect, last=False):
188             d = self.client.get(path)
189             d.addCallback(self.gotResp, num, expect)
190             if last:
191                 d.addCallback(lastDefer.callback)
192                 
193         newRequest("/", 1, 3433)
194         newRequest("/blog/", 2, 37121)
195         newRequest("/camrdale.html", 3, 2234)
196         self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
197         self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
198         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750))
199         self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
200         self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
201         self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
202         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True))
203         return lastDefer
204         
205     def test_range(self):
206         host = 'www.camrdale.org'
207         self.client = HTTPClientManager(host, 80)
208         self.timeout = 10
209         
210         d = self.client.getRange('/robots.txt', 100, 199)
211         d.addCallback(self.gotResp, 1, 100)
212         return d
213         
214     def tearDown(self):
215         for p in self.pending_calls:
216             if p.active():
217                 p.cancel()
218         self.pending_calls = []
219         if self.client:
220             self.client.close()
221             self.client = None