]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht/HTTPDownloader.py
f43b6e1c6f8f041b1d405acd3b5102f8e7a03460
[quix0rs-apt-p2p.git] / apt_dht / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted import version as twisted_version
5 from twisted.python import log
6 from twisted.web2.client.interfaces import IHTTPClientManager
7 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
8 from twisted.web2 import stream as stream_mod, http_headers
9 from twisted.web2 import version as web2_version
10 from twisted.trial import unittest
11 from zope.interface import implements
12
13 from apt_dht_conf import version
14
15 class HTTPClientManager(ClientFactory):
16     """A manager for all HTTP requests to a single site.
17     
18     Controls all requests that got to a single site (host and port).
19     This includes buffering requests until they can be sent and reconnecting
20     in the even of the connection being closed.
21     
22     """
23
24     implements(IHTTPClientManager)
25     
26     def __init__(self, host, port=80):
27         self.host = host
28         self.port = port
29         self.busy = False
30         self.pipeline = False
31         self.closed = True
32         self.connecting = False
33         self.request_queue = []
34         self.response_queue = []
35         self.proto = None
36         self.connector = None
37         
38     def connect(self):
39         assert(self.closed and not self.connecting)
40         self.connecting = True
41         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
42         d.addCallback(self.connected)
43
44     def connected(self, proto):
45         self.closed = False
46         self.connecting = False
47         self.proto = proto
48         self.processQueue()
49         
50     def close(self):
51         if not self.closed:
52             self.proto.transport.loseConnection()
53
54     def is_idle(self):
55         return not self.busy and not self.request_queue and not self.response_queue
56     
57     def submitRequest(self, request):
58         request.deferRequest = defer.Deferred()
59         self.request_queue.append(request)
60         self.processQueue()
61         return request.deferRequest
62
63     def processQueue(self):
64         if not self.request_queue:
65             return
66         if self.connecting:
67             return
68         if self.closed:
69             self.connect()
70             return
71         if self.busy and not self.pipeline:
72             return
73         if self.response_queue and not self.pipeline:
74             return
75
76         req = self.request_queue.pop(0)
77         self.response_queue.append(req)
78         req.deferResponse = self.proto.submitRequest(req, False)
79         req.deferResponse.addCallback(self.requestComplete)
80         req.deferResponse.addErrback(self.requestError)
81
82     def requestComplete(self, resp):
83         req = self.response_queue.pop(0)
84         log.msg('Download of %s completed with code %d' % (req.uri, resp.code))
85         req.deferRequest.callback(resp)
86
87     def requestError(self, error):
88         req = self.response_queue.pop(0)
89         log.msg('Download of %s generated error %r' % (req.uri, error))
90         req.deferRequest.errback(error)
91
92     def clientBusy(self, proto):
93         self.busy = True
94
95     def clientIdle(self, proto):
96         self.busy = False
97         self.processQueue()
98
99     def clientPipelining(self, proto):
100         self.pipeline = True
101         self.processQueue()
102
103     def clientGone(self, proto):
104         for req in self.response_queue:
105             req.deferRequest.errback(ProtocolError('lost connection'))
106         self.busy = False
107         self.pipeline = False
108         self.closed = True
109         self.connecting = False
110         self.response_queue = []
111         self.proto = None
112         if self.request_queue:
113             self.processQueue()
114             
115     def setCommonHeaders(self):
116         headers = http_headers.Headers()
117         headers.setHeader('Host', self.host)
118         headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % 
119                           (version.short(), twisted_version.short(), web2_version.short()))
120         return headers
121     
122     def get(self, path, method="GET", modtime=None):
123         headers = self.setCommonHeaders()
124         if modtime:
125             headers.setHeader('If-Modified-Since', modtime)
126         return self.submitRequest(ClientRequest(method, path, headers, None))
127     
128     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
129         headers = self.setCommonHeaders()
130         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
131         return self.submitRequest(ClientRequest(method, path, headers, None))
132     
133 class TestClientManager(unittest.TestCase):
134     """Unit tests for the HTTPClientManager."""
135     
136     client = None
137     pending_calls = []
138     
139     def gotResp(self, resp, num, expect):
140         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
141         if expect is not None:
142             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
143         def print_(n):
144             pass
145         def printdone(n):
146             pass
147         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
148     
149     def test_download(self):
150         host = 'www.camrdale.org'
151         self.client = HTTPClientManager(host, 80)
152         self.timeout = 10
153         
154         d = self.client.get('/robots.txt')
155         d.addCallback(self.gotResp, 1, 309)
156         return d
157         
158     def test_head(self):
159         host = 'www.camrdale.org'
160         self.client = HTTPClientManager(host, 80)
161         self.timeout = 10
162         
163         d = self.client.get('/robots.txt', "HEAD")
164         d.addCallback(self.gotResp, 1, 0)
165         return d
166         
167     def test_multiple_downloads(self):
168         host = 'www.camrdale.org'
169         self.client = HTTPClientManager(host, 80)
170         self.timeout = 120
171         lastDefer = defer.Deferred()
172         
173         def newRequest(path, num, expect, last=False):
174             d = self.client.get(path)
175             d.addCallback(self.gotResp, num, expect)
176             if last:
177                 d.addBoth(lastDefer.callback)
178                 
179         newRequest("/", 1, 3433)
180         newRequest("/blog/", 2, 37121)
181         newRequest("/camrdale.html", 3, 2234)
182         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
183         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
184         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
185         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
186         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
187         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
188         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
189         return lastDefer
190         
191     def test_multiple_quick_downloads(self):
192         host = 'www.camrdale.org'
193         self.client = HTTPClientManager(host, 80)
194         self.timeout = 30
195         lastDefer = defer.Deferred()
196         
197         def newRequest(path, num, expect, last=False):
198             d = self.client.get(path)
199             d.addCallback(self.gotResp, num, expect)
200             if last:
201                 d.addBoth(lastDefer.callback)
202                 
203         newRequest("/", 1, 3433)
204         newRequest("/blog/", 2, 37121)
205         newRequest("/camrdale.html", 3, 2234)
206         self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
207         self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
208         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750))
209         self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
210         self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
211         self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
212         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True))
213         return lastDefer
214         
215     def test_range(self):
216         host = 'www.camrdale.org'
217         self.client = HTTPClientManager(host, 80)
218         self.timeout = 10
219         
220         d = self.client.getRange('/robots.txt', 100, 199)
221         d.addCallback(self.gotResp, 1, 100)
222         return d
223         
224     def tearDown(self):
225         for p in self.pending_calls:
226             if p.active():
227                 p.cancel()
228         self.pending_calls = []
229         if self.client:
230             self.client.close()
231             self.client = None