Scanning cache directories on startup waits for DHT storeValue to return.
[quix0rs-apt-p2p.git] / apt_dht / HTTPDownloader.py
1
2 from twisted.internet import reactor, defer, protocol
3 from twisted.internet.protocol import ClientFactory
4 from twisted import version as twisted_version
5 from twisted.python import log
6 from twisted.web2.client.interfaces import IHTTPClientManager
7 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
8 from twisted.web2 import stream as stream_mod, http_headers
9 from twisted.web2 import version as web2_version
10 from twisted.trial import unittest
11 from zope.interface import implements
12
13 from apt_dht_conf import version
14
15 class HTTPClientManager(ClientFactory):
16     """A manager for all HTTP requests to a single site.
17     
18     Controls all requests that got to a single site (host and port).
19     This includes buffering requests until they can be sent and reconnecting
20     in the even of the connection being closed.
21     
22     """
23
24     implements(IHTTPClientManager)
25     
26     def __init__(self, host, port=80):
27         self.host = host
28         self.port = port
29         self.busy = False
30         self.pipeline = False
31         self.closed = True
32         self.connecting = False
33         self.request_queue = []
34         self.response_queue = []
35         self.proto = None
36         self.connector = None
37         
38     def connect(self):
39         assert self.closed and not self.connecting
40         self.connecting = True
41         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
42         d.addCallback(self.connected)
43
44     def connected(self, proto):
45         self.closed = False
46         self.connecting = False
47         self.proto = proto
48         self.processQueue()
49         
50     def close(self):
51         if not self.closed:
52             self.proto.transport.loseConnection()
53
54     def is_idle(self):
55         return not self.busy and not self.request_queue and not self.response_queue
56     
57     def submitRequest(self, request):
58         request.deferRequest = defer.Deferred()
59         self.request_queue.append(request)
60         self.processQueue()
61         return request.deferRequest
62
63     def processQueue(self):
64         if not self.request_queue:
65             return
66         if self.connecting:
67             return
68         if self.closed:
69             self.connect()
70             return
71         if self.busy and not self.pipeline:
72             return
73         if self.response_queue and not self.pipeline:
74             return
75
76         req = self.request_queue.pop(0)
77         self.response_queue.append(req)
78         req.deferResponse = self.proto.submitRequest(req, False)
79         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
80
81     def requestComplete(self, resp):
82         req = self.response_queue.pop(0)
83         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
84         req.deferRequest.callback(resp)
85
86     def requestError(self, error):
87         req = self.response_queue.pop(0)
88         log.msg('Download of %s generated error %r' % (req.uri, error))
89         req.deferRequest.errback(error)
90
91     def clientBusy(self, proto):
92         self.busy = True
93
94     def clientIdle(self, proto):
95         self.busy = False
96         self.processQueue()
97
98     def clientPipelining(self, proto):
99         self.pipeline = True
100         self.processQueue()
101
102     def clientGone(self, proto):
103         for req in self.response_queue:
104             req.deferRequest.errback(ProtocolError('lost connection'))
105         self.busy = False
106         self.pipeline = False
107         self.closed = True
108         self.connecting = False
109         self.response_queue = []
110         self.proto = None
111         if self.request_queue:
112             self.processQueue()
113             
114     def setCommonHeaders(self):
115         headers = http_headers.Headers()
116         headers.setHeader('Host', self.host)
117         headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % 
118                           (version.short(), twisted_version.short(), web2_version.short()))
119         return headers
120     
121     def get(self, path, method="GET", modtime=None):
122         headers = self.setCommonHeaders()
123         if modtime:
124             headers.setHeader('If-Modified-Since', modtime)
125         return self.submitRequest(ClientRequest(method, path, headers, None))
126     
127     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
128         headers = self.setCommonHeaders()
129         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
130         return self.submitRequest(ClientRequest(method, path, headers, None))
131     
132 class TestClientManager(unittest.TestCase):
133     """Unit tests for the HTTPClientManager."""
134     
135     client = None
136     pending_calls = []
137     
138     def gotResp(self, resp, num, expect):
139         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
140         if expect is not None:
141             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
142         def print_(n):
143             pass
144         def printdone(n):
145             pass
146         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
147     
148     def test_download(self):
149         host = 'www.camrdale.org'
150         self.client = HTTPClientManager(host, 80)
151         self.timeout = 10
152         
153         d = self.client.get('/robots.txt')
154         d.addCallback(self.gotResp, 1, 309)
155         return d
156         
157     def test_head(self):
158         host = 'www.camrdale.org'
159         self.client = HTTPClientManager(host, 80)
160         self.timeout = 10
161         
162         d = self.client.get('/robots.txt', "HEAD")
163         d.addCallback(self.gotResp, 1, 0)
164         return d
165         
166     def test_multiple_downloads(self):
167         host = 'www.camrdale.org'
168         self.client = HTTPClientManager(host, 80)
169         self.timeout = 120
170         lastDefer = defer.Deferred()
171         
172         def newRequest(path, num, expect, last=False):
173             d = self.client.get(path)
174             d.addCallback(self.gotResp, num, expect)
175             if last:
176                 d.addBoth(lastDefer.callback)
177                 
178         newRequest("/", 1, 3433)
179         newRequest("/blog/", 2, 39152)
180         newRequest("/camrdale.html", 3, 2234)
181         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
182         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
183         self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4756))
184         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
185         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
186         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
187         self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 313470, True))
188         return lastDefer
189         
190     def test_multiple_quick_downloads(self):
191         host = 'www.camrdale.org'
192         self.client = HTTPClientManager(host, 80)
193         self.timeout = 30
194         lastDefer = defer.Deferred()
195         
196         def newRequest(path, num, expect, last=False):
197             d = self.client.get(path)
198             d.addCallback(self.gotResp, num, expect)
199             if last:
200                 d.addBoth(lastDefer.callback)
201                 
202         newRequest("/", 1, 3433)
203         newRequest("/blog/", 2, 39152)
204         newRequest("/camrdale.html", 3, 2234)
205         self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
206         self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
207         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4756))
208         self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
209         self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
210         self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
211         self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 313470, True))
212         return lastDefer
213         
214     def test_range(self):
215         host = 'www.camrdale.org'
216         self.client = HTTPClientManager(host, 80)
217         self.timeout = 10
218         
219         d = self.client.getRange('/robots.txt', 100, 199)
220         d.addCallback(self.gotResp, 1, 100)
221         return d
222         
223     def tearDown(self):
224         for p in self.pending_calls:
225             if p.active():
226                 p.cancel()
227         self.pending_calls = []
228         if self.client:
229             self.client.close()
230             self.client = None