Add some of the fetching logic.
[quix0rs-apt-p2p.git] / apt_dht / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted import version as twisted_version
5 from twisted.web2.client.interfaces import IHTTPClientManager
6 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
7 from twisted.web2 import stream as stream_mod, http_headers
8 from twisted.web2 import version as web2_version
9 from twisted.trial import unittest
10 from zope.interface import implements
11
12 from apt_dht_conf import version
13
14 class HTTPClientManager(ClientFactory):
15     """A manager for all HTTP requests to a single site.
16     
17     Controls all requests that got to a single site (host and port).
18     This includes buffering requests until they can be sent and reconnecting
19     in the even of the connection being closed.
20     
21     """
22
23     implements(IHTTPClientManager)
24     
25     def __init__(self, host, port=80):
26         self.host = host
27         self.port = port
28         self.busy = False
29         self.pipeline = False
30         self.closed = True
31         self.connecting = False
32         self.request_queue = []
33         self.response_queue = []
34         self.proto = None
35         self.connector = None
36         
37     def connect(self):
38         assert(self.closed and not self.connecting)
39         self.connecting = True
40         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
41         d.addCallback(self.connected)
42
43     def connected(self, proto):
44         self.closed = False
45         self.connecting = False
46         self.proto = proto
47         self.processQueue()
48         
49     def close(self):
50         if not self.closed:
51             self.proto.transport.loseConnection()
52
53     def is_idle(self):
54         return not self.busy and not self.request_queue and not self.response_queue
55     
56     def submitRequest(self, request):
57         request.deferRequest = defer.Deferred()
58         self.request_queue.append(request)
59         self.processQueue()
60         return request.deferRequest
61
62     def processQueue(self):
63         if not self.request_queue:
64             return
65         if self.connecting:
66             return
67         if self.closed:
68             self.connect()
69             return
70         if self.busy and not self.pipeline:
71             return
72         if self.response_queue and not self.pipeline:
73             return
74
75         req = self.request_queue.pop(0)
76         self.response_queue.append(req)
77         req.deferResponse = self.proto.submitRequest(req, False)
78         req.deferResponse.addCallback(self.requestComplete)
79         req.deferResponse.addErrback(self.requestError)
80
81     def requestComplete(self, resp):
82         req = self.response_queue.pop(0)
83         req.deferRequest.callback(resp)
84
85     def requestError(self, error):
86         req = self.response_queue.pop(0)
87         req.deferRequest.errback(error)
88
89     def clientBusy(self, proto):
90         self.busy = True
91
92     def clientIdle(self, proto):
93         self.busy = False
94         self.processQueue()
95
96     def clientPipelining(self, proto):
97         self.pipeline = True
98         self.processQueue()
99
100     def clientGone(self, proto):
101         for req in self.response_queue:
102             req.deferRequest.errback(ProtocolError('lost connection'))
103         self.busy = False
104         self.pipeline = False
105         self.closed = True
106         self.connecting = False
107         self.response_queue = []
108         self.proto = None
109         if self.request_queue:
110             self.processQueue()
111             
112     def setCommonHeaders(self):
113         headers = http_headers.Headers()
114         headers.setHeader('Host', self.host)
115         headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % 
116                           (version.short(), twisted_version.short(), web2_version.short()))
117         return headers
118     
119     def get(self, path, method="GET", modtime=None):
120         headers = self.setCommonHeaders()
121         if modtime:
122             headers.setHeader('If-Modified-Since', modtime)
123         return self.submitRequest(ClientRequest(method, path, headers, None))
124     
125     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
126         headers = self.setCommonHeaders()
127         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
128         return self.submitRequest(ClientRequest(method, path, headers, None))
129     
130 class TestClientManager(unittest.TestCase):
131     """Unit tests for the HTTPClientManager."""
132     
133     client = None
134     pending_calls = []
135     
136     def gotResp(self, resp, num, expect):
137         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
138         if expect is not None:
139             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
140         def print_(n):
141             pass
142         def printdone(n):
143             pass
144         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
145     
146     def test_download(self):
147         host = 'www.camrdale.org'
148         self.client = HTTPClientManager(host, 80)
149         self.timeout = 10
150         
151         d = self.client.get('/robots.txt')
152         d.addCallback(self.gotResp, 1, 309)
153         return d
154         
155     def test_head(self):
156         host = 'www.camrdale.org'
157         self.client = HTTPClientManager(host, 80)
158         self.timeout = 10
159         
160         d = self.client.get('/robots.txt', "HEAD")
161         d.addCallback(self.gotResp, 1, 0)
162         return d
163         
164     def test_multiple_downloads(self):
165         host = 'www.camrdale.org'
166         self.client = HTTPClientManager(host, 80)
167         self.timeout = 120
168         lastDefer = defer.Deferred()
169         
170         def newRequest(path, num, expect, last=False):
171             d = self.client.get(path)
172             d.addCallback(self.gotResp, num, expect)
173             if last:
174                 d.addCallback(lastDefer.callback)
175                 
176         newRequest("/", 1, 3433)
177         newRequest("/blog/", 2, 37121)
178         newRequest("/camrdale.html", 3, 2234)
179         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
180         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
181         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
182         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
183         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
184         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
185         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
186         return lastDefer
187         
188     def test_multiple_quick_downloads(self):
189         host = 'www.camrdale.org'
190         self.client = HTTPClientManager(host, 80)
191         self.timeout = 30
192         lastDefer = defer.Deferred()
193         
194         def newRequest(path, num, expect, last=False):
195             d = self.client.get(path)
196             d.addCallback(self.gotResp, num, expect)
197             if last:
198                 d.addCallback(lastDefer.callback)
199                 
200         newRequest("/", 1, 3433)
201         newRequest("/blog/", 2, 37121)
202         newRequest("/camrdale.html", 3, 2234)
203         self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
204         self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
205         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750))
206         self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
207         self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
208         self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
209         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True))
210         return lastDefer
211         
212     def test_range(self):
213         host = 'www.camrdale.org'
214         self.client = HTTPClientManager(host, 80)
215         self.timeout = 10
216         
217         d = self.client.getRange('/robots.txt', 100, 199)
218         d.addCallback(self.gotResp, 1, 100)
219         return d
220         
221     def tearDown(self):
222         for p in self.pending_calls:
223             if p.active():
224                 p.cancel()
225         self.pending_calls = []
226         if self.client:
227             self.client.close()
228             self.client = None