Fix an HTTP download bug that caused HEAD requests to not pipeline.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21     """A modified client protocol that logs the number of bytes received."""
22     
23     def __init__(self, factory, stats = None, mirror = False):
24         HTTPClientProtocol.__init__(self, factory)
25         self.stats = stats
26         self.mirror = mirror
27     
28     def lineReceived(self, line):
29         if self.stats:
30             self.stats.receivedBytes(len(line) + 2, self.mirror)
31         HTTPClientProtocol.lineReceived(self, line)
32
33     def rawDataReceived(self, data):
34         if self.stats:
35             self.stats.receivedBytes(len(data), self.mirror)
36         HTTPClientProtocol.rawDataReceived(self, data)
37
38 class Peer(ClientFactory):
39     """A manager for all HTTP requests to a single peer.
40     
41     Controls all requests that go to a single peer (host and port).
42     This includes buffering requests until they can be sent and reconnecting
43     in the event of the connection being closed.
44     
45     """
46
47     implements(IHTTPClientManager)
48     
49     def __init__(self, host, port = 80, stats = None):
50         self.host = host
51         self.port = port
52         self.stats = stats
53         self.mirror = False
54         self.rank = 0.1
55         self.busy = False
56         self.pipeline = False
57         self.closed = True
58         self.connecting = False
59         self.request_queue = []
60         self.response_queue = []
61         self.proto = None
62         self.connector = None
63         self._errors = 0
64         self._completed = 0
65         self._downloadSpeeds = []
66         self._lastResponse = None
67         self._responseTimes = []
68     
69     def __repr__(self):
70         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
71         
72     #{ Manage the request queue
73     def connect(self):
74         """Connect to the peer."""
75         assert self.closed and not self.connecting
76         self.connecting = True
77         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79         d.addCallbacks(self.connected, self.connectionError)
80
81     def connected(self, proto):
82         """Begin processing the queued requests."""
83         self.closed = False
84         self.connecting = False
85         self.proto = proto
86         reactor.callLater(0, self.processQueue)
87         
88     def connectionError(self, err):
89         """Cancel the requests."""
90         log.msg('Failed to connect to the peer by HTTP.')
91         log.err(err)
92
93         # Remove one request so that we don't loop indefinitely
94         if self.request_queue:
95             req = self.request_queue.pop(0)
96             req.deferRequest.errback(err)
97             
98         self._completed += 1
99         self._errors += 1
100         self.rerank()
101         if self.connecting:
102             self.connecting = False
103             self.clientGone(None)
104         
105     def close(self):
106         """Close the connection to the peer."""
107         if not self.closed:
108             self.proto.transport.loseConnection()
109
110     def submitRequest(self, request):
111         """Add a new request to the queue.
112         
113         @type request: L{twisted.web2.client.http.ClientRequest}
114         @return: deferred that will fire with the completed request
115         """
116         request.submissionTime = datetime.now()
117         request.deferRequest = defer.Deferred()
118         self.request_queue.append(request)
119         self.rerank()
120         reactor.callLater(0, self.processQueue)
121         return request.deferRequest
122
123     def processQueue(self):
124         """Check the queue to see if new requests can be sent to the peer."""
125         if not self.request_queue:
126             return
127         if self.connecting:
128             return
129         if self.closed:
130             self.connect()
131             return
132         if self.busy and not self.pipeline:
133             return
134         if self.response_queue and not self.pipeline:
135             return
136
137         req = self.request_queue.pop(0)
138         self.response_queue.append(req)
139         self.rerank()
140         req.deferResponse = self.proto.submitRequest(req, False)
141         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
142
143     def requestComplete(self, resp):
144         """Process a completed request."""
145         self._processLastResponse()
146         req = self.response_queue.pop(0)
147         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
148         self._completed += 1
149         now = datetime.now()
150         self._responseTimes.append((now, now - req.submissionTime))
151         self._lastResponse = (now, resp.stream.length)
152         self.rerank()
153         req.deferRequest.callback(resp)
154
155     def requestError(self, error):
156         """Process a request that ended with an error."""
157         self._processLastResponse()
158         req = self.response_queue.pop(0)
159         log.msg('Download of %s generated error %r' % (req.uri, error))
160         self._completed += 1
161         self._errors += 1
162         self.rerank()
163         req.deferRequest.errback(error)
164         
165     def hashError(self, error):
166         """Log that a hash error occurred from the peer."""
167         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
168         self._errors += 1
169         self.rerank()
170
171     #{ IHTTPClientManager interface
172     def clientBusy(self, proto):
173         """Save the busy state."""
174         self.busy = True
175
176     def clientIdle(self, proto):
177         """Try to send a new request."""
178         self._processLastResponse()
179         self.busy = False
180         reactor.callLater(0, self.processQueue)
181         self.rerank()
182
183     def clientPipelining(self, proto):
184         """Try to send a new request."""
185         self.pipeline = True
186         reactor.callLater(0, self.processQueue)
187
188     def clientGone(self, proto):
189         """Mark sent requests as errors."""
190         self._processLastResponse()
191         for req in self.response_queue:
192             reactor.callLater(0, req.deferRequest.errback,
193                                  ProtocolError('lost connection'))
194         self.busy = False
195         self.pipeline = False
196         self.closed = True
197         self.connecting = False
198         self.response_queue = []
199         self.proto = None
200         self.rerank()
201         if self.request_queue:
202             reactor.callLater(0, self.processQueue)
203             
204     #{ Downloading request interface
205     def setCommonHeaders(self):
206         """Get the common HTTP headers for all requests."""
207         headers = http_headers.Headers()
208         headers.setHeader('Host', self.host)
209         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
210                           (version.short(), twisted_version.short(), web2_version.short()))
211         return headers
212     
213     def get(self, path, method="GET", modtime=None):
214         """Add a new request to the queue.
215         
216         @type path: C{string}
217         @param path: the path to request from the peer
218         @type method: C{string}
219         @param method: the HTTP method to use, 'GET' or 'HEAD'
220             (optional, defaults to 'GET')
221         @type modtime: C{int}
222         @param modtime: the modification time to use for an 'If-Modified-Since'
223             header, as seconds since the epoch
224             (optional, defaults to not sending that header)
225         """
226         headers = self.setCommonHeaders()
227         if modtime:
228             headers.setHeader('If-Modified-Since', modtime)
229         return self.submitRequest(ClientRequest(method, path, headers, None))
230     
231     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
232         """Add a new request with a Range header to the queue.
233         
234         @type path: C{string}
235         @param path: the path to request from the peer
236         @type rangeStart: C{int}
237         @param rangeStart: the byte to begin the request at
238         @type rangeEnd: C{int}
239         @param rangeEnd: the byte to end the request at (inclusive)
240         @type method: C{string}
241         @param method: the HTTP method to use, 'GET' or 'HEAD'
242             (optional, defaults to 'GET')
243         """
244         headers = self.setCommonHeaders()
245         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
246         return self.submitRequest(ClientRequest(method, path, headers, None))
247     
248     #{ Peer information
249     def isIdle(self):
250         """Check whether the peer is idle or not."""
251         return not self.busy and not self.request_queue and not self.response_queue
252     
253     def _processLastResponse(self):
254         """Save the download time of the last request for speed calculations."""
255         if self._lastResponse is not None:
256             now = datetime.now()
257             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
258             self._lastResponse = None
259             
260     def downloadSpeed(self):
261         """Gets the latest average download speed for the peer.
262         
263         The average is over the last 10 responses that occurred in the last hour.
264         """
265         total_time = 0.0
266         total_download = 0
267         now = datetime.now()
268         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
269                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
270             self._downloadSpeeds.pop(0)
271
272         # If there are none, then you get 0
273         if not self._downloadSpeeds:
274             return 0.0
275         
276         for download in self._downloadSpeeds:
277             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
278             total_download += download[2]
279
280         return total_download / total_time
281     
282     def responseTime(self):
283         """Gets the latest average response time for the peer.
284         
285         Response time is the time from receiving the request, to the time
286         the download begins. The average is over the last 10 responses that
287         occurred in the last hour.
288         """
289         total_response = 0.0
290         now = datetime.now()
291         while self._responseTimes and (len(self._responseTimes) > 10 or 
292                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
293             self._responseTimes.pop(0)
294
295         # If there are none, give it the benefit of the doubt
296         if not self._responseTimes:
297             return 0.0
298
299         for response in self._responseTimes:
300             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
301
302         return total_response / len(self._responseTimes)
303     
304     def rerank(self):
305         """Determine the ranking value for the peer.
306         
307         The ranking value is composed of 5 numbers, each exponentially
308         decreasing from 1 to 0 based on:
309          - if a connection to the peer is open
310          - the number of pending requests
311          - the time to download a single piece
312          - the number of errors
313          - the response time
314         """
315         rank = 1.0
316         if self.closed:
317             rank *= 0.9
318         rank *= exp(-(len(self.request_queue) + len(self.response_queue)))
319         speed = self.downloadSpeed()
320         if speed > 0.0:
321             rank *= exp(-512.0*1024 / speed)
322         if self._completed:
323             rank *= exp(-10.0 * self._errors / self._completed)
324         rank *= exp(-self.responseTime() / 5.0)
325         self.rank = rank
326         
327 class TestClientManager(unittest.TestCase):
328     """Unit tests for the Peer."""
329     
330     client = None
331     pending_calls = []
332     length = []
333     
334     def gotResp(self, resp, num, expect):
335         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
336         if expect is not None:
337             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
338         while len(self.length) <= num:
339             self.length.append(0)
340         self.length[num] = 0
341         def addData(data, self = self, num = num):
342             self.length[num] += len(data)
343         def checkLength(resp, self = self, num = num, length = resp.stream.length):
344             self.failUnlessEqual(self.length[num], length)
345             return resp
346         df = stream_mod.readStream(resp.stream, addData)
347         df.addCallback(checkLength)
348         return df
349     
350     def test_download(self):
351         """Tests a normal download."""
352         host = 'www.ietf.org'
353         self.client = Peer(host, 80)
354         self.timeout = 10
355         
356         d = self.client.get('/rfc/rfc0013.txt')
357         d.addCallback(self.gotResp, 1, 1070)
358         return d
359         
360     def test_head(self):
361         """Tests a 'HEAD' request."""
362         host = 'www.ietf.org'
363         self.client = Peer(host, 80)
364         self.timeout = 10
365         
366         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
367         d.addCallback(self.gotResp, 1, 0)
368         return d
369         
370     def test_multiple_downloads(self):
371         """Tests multiple downloads with queueing and connection closing."""
372         host = 'www.ietf.org'
373         self.client = Peer(host, 80)
374         self.timeout = 120
375         lastDefer = defer.Deferred()
376         
377         def newRequest(path, num, expect, last=False):
378             d = self.client.get(path)
379             d.addCallback(self.gotResp, num, expect)
380             if last:
381                 d.addBoth(lastDefer.callback)
382
383         # 3 quick requests
384         newRequest("/rfc/rfc0006.txt", 1, 1776)
385         newRequest("/rfc/rfc2362.txt", 2, 159833)
386         newRequest("/rfc/rfc0801.txt", 3, 40824)
387         
388         # This one will probably be queued
389         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
390         
391         # Connection should still be open, but idle
392         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
393         
394         #Connection should be closed
395         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
396         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
397         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
398         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
399         
400         # Now it should definitely be closed
401         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
402         return lastDefer
403         
404     def test_multiple_quick_downloads(self):
405         """Tests lots of multiple downloads with queueing."""
406         host = 'www.ietf.org'
407         self.client = Peer(host, 80)
408         self.timeout = 30
409         lastDefer = defer.Deferred()
410         
411         def newRequest(path, num, expect, last=False):
412             d = self.client.get(path)
413             d.addCallback(self.gotResp, num, expect)
414             if last:
415                 d.addBoth(lastDefer.callback)
416                 
417         newRequest("/rfc/rfc0006.txt", 1, 1776)
418         newRequest("/rfc/rfc2362.txt", 2, 159833)
419         newRequest("/rfc/rfc0801.txt", 3, 40824)
420         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
421         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
422         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
423         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
424         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
425         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
426         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
427         return lastDefer
428         
429     def checkInfo(self):
430         log.msg('Rank is: %r' % self.client.rank)
431         log.msg('Download speed is: %r' % self.client.downloadSpeed())
432         log.msg('Response Time is: %r' % self.client.responseTime())
433         
434     def test_peer_info(self):
435         """Test retrieving the peer info during a download."""
436         host = 'www.ietf.org'
437         self.client = Peer(host, 80)
438         self.timeout = 120
439         lastDefer = defer.Deferred()
440         
441         def newRequest(path, num, expect, last=False):
442             d = self.client.get(path)
443             d.addCallback(self.gotResp, num, expect)
444             if last:
445                 d.addBoth(lastDefer.callback)
446                 
447         newRequest("/rfc/rfc0006.txt", 1, 1776)
448         newRequest("/rfc/rfc2362.txt", 2, 159833)
449         newRequest("/rfc/rfc0801.txt", 3, 40824)
450         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
451         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
452         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
453         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
454         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
455         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
456         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
457         
458         for i in xrange(2, 122, 2):
459             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
460         
461         return lastDefer
462         
463     def test_range(self):
464         """Test a Range request."""
465         host = 'www.ietf.org'
466         self.client = Peer(host, 80)
467         self.timeout = 10
468         
469         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
470         d.addCallback(self.gotResp, 1, 100)
471         return d
472         
473     def tearDown(self):
474         for p in self.pending_calls:
475             if p.active():
476                 p.cancel()
477         self.pending_calls = []
478         if self.client:
479             self.client.close()
480             self.client = None