Fix some errors in the new twisted HTTP client's connectionLost() methods.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
18
19 from apt_p2p_conf import version
20
21 class PipelineError(Exception):
22     """An error has occurred in pipelining requests."""
23     
24 class LoggingHTTPClientProtocol(HTTPClientProtocol):
25     """A modified client protocol that logs the number of bytes received."""
26     
27     def __init__(self, factory, stats = None, mirror = False):
28         HTTPClientProtocol.__init__(self, factory)
29         self.stats = stats
30         self.mirror = mirror
31     
32     def lineReceived(self, line):
33         if self.stats:
34             self.stats.receivedBytes(len(line) + 2, self.mirror)
35         HTTPClientProtocol.lineReceived(self, line)
36
37     def rawDataReceived(self, data):
38         if self.stats:
39             self.stats.receivedBytes(len(data), self.mirror)
40         HTTPClientProtocol.rawDataReceived(self, data)
41
42     def setReadPersistent(self, persist):
43         self.readPersistent = persist
44         if not persist:
45             # Tell all requests but first to abort.
46             lostRequests = self.inRequests[1:]
47             del self.inRequests[1:]
48             for request in lostRequests:
49                 request.connectionLost(PipelineError('The pipelined connection was lost'))
50
51 class Peer(ClientFactory):
52     """A manager for all HTTP requests to a single peer.
53     
54     Controls all requests that go to a single peer (host and port).
55     This includes buffering requests until they can be sent and reconnecting
56     in the event of the connection being closed.
57     
58     """
59
60     implements(IHTTPClientManager)
61     
62     def __init__(self, host, port = 80, stats = None):
63         self.host = host
64         self.port = port
65         self.stats = stats
66         self.mirror = False
67         self.rank = 0.1
68         self.busy = False
69         self.pipeline = False
70         self.closed = True
71         self.connecting = False
72         self.request_queue = []
73         self.outstanding = 0
74         self.proto = None
75         self.connector = None
76         self._errors = 0
77         self._completed = 0
78         self._downloadSpeeds = []
79         self._lastResponse = None
80         self._responseTimes = []
81     
82     def __repr__(self):
83         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
84         
85     #{ Manage the request queue
86     def connect(self):
87         """Connect to the peer."""
88         assert self.closed and not self.connecting
89         self.connecting = True
90         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
91                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
92         d.addCallbacks(self.connected, self.connectionError)
93
94     def connected(self, proto):
95         """Begin processing the queued requests."""
96         self.closed = False
97         self.connecting = False
98         self.proto = proto
99         reactor.callLater(0, self.processQueue)
100         
101     def connectionError(self, err):
102         """Cancel the requests."""
103         log.msg('Failed to connect to the peer by HTTP.')
104         log.err(err)
105
106         # Remove one request so that we don't loop indefinitely
107         if self.request_queue:
108             req, deferRequest, submissionTime = self.request_queue.pop(0)
109             deferRequest.errback(err)
110             
111         self._completed += 1
112         self._errors += 1
113         self.rerank()
114         if self.connecting:
115             self.connecting = False
116             self.clientGone(None)
117         
118     def close(self):
119         """Close the connection to the peer."""
120         if not self.closed:
121             self.proto.transport.loseConnection()
122
123     def submitRequest(self, request):
124         """Add a new request to the queue.
125         
126         @type request: L{twisted.web2.client.http.ClientRequest}
127         @return: deferred that will fire with the completed request
128         """
129         submissionTime = datetime.now()
130         deferRequest = defer.Deferred()
131         self.request_queue.append((request, deferRequest, submissionTime))
132         self.rerank()
133         reactor.callLater(0, self.processQueue)
134         return deferRequest
135
136     def processQueue(self):
137         """Check the queue to see if new requests can be sent to the peer."""
138         if not self.request_queue:
139             return
140         if self.connecting:
141             return
142         if self.closed:
143             self.connect()
144             return
145         if self.busy and not self.pipeline:
146             return
147         if self.outstanding and not self.pipeline:
148             return
149         if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
150                  and not self.proto.inRequests)
151                  or self.proto.readPersistent is PERSIST_PIPELINE):
152             log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
153                     (self.proto.readPersistent, self.proto.inRequests))
154             return
155
156         req, deferRequest, submissionTime = self.request_queue.pop(0)
157         try:
158             deferResponse = self.proto.submitRequest(req, False)
159         except:
160             # Try again later
161             log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
162             log.err()
163             self.request_queue.insert(0, (request, deferRequest, submissionTime))
164             ractor.callLater(1, self.processQueue)
165             return
166             
167         self.outstanding += 1
168         self.rerank()
169         deferResponse.addCallbacks(self.requestComplete, self.requestError,
170                                    callbackArgs = (req, deferRequest, submissionTime),
171                                    errbackArgs = (req, deferRequest))
172
173     def requestComplete(self, resp, req, deferRequest, submissionTime):
174         """Process a completed request."""
175         self._processLastResponse()
176         self.outstanding -= 1
177         assert self.outstanding >= 0
178         log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
179         self._completed += 1
180         now = datetime.now()
181         self._responseTimes.append((now, now - submissionTime))
182         self._lastResponse = (now, resp.stream.length)
183         self.rerank()
184         deferRequest.callback(resp)
185
186     def requestError(self, error, req, deferRequest):
187         """Process a request that ended with an error."""
188         self._processLastResponse()
189         self.outstanding -= 1
190         assert self.outstanding >= 0
191         log.msg('Download of %s generated error %r' % (req.uri, error))
192         self._completed += 1
193         self._errors += 1
194         self.rerank()
195         deferRequest.errback(error)
196         
197     def hashError(self, error):
198         """Log that a hash error occurred from the peer."""
199         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
200         self._errors += 1
201         self.rerank()
202
203     #{ IHTTPClientManager interface
204     def clientBusy(self, proto):
205         """Save the busy state."""
206         self.busy = True
207
208     def clientIdle(self, proto):
209         """Try to send a new request."""
210         self._processLastResponse()
211         self.busy = False
212         reactor.callLater(0, self.processQueue)
213         self.rerank()
214
215     def clientPipelining(self, proto):
216         """Try to send a new request."""
217         self.pipeline = True
218         reactor.callLater(0, self.processQueue)
219
220     def clientGone(self, proto):
221         """Mark sent requests as errors."""
222         self._processLastResponse()
223         self.busy = False
224         self.pipeline = False
225         self.closed = True
226         self.connecting = False
227         self.proto = None
228         self.rerank()
229         if self.request_queue:
230             reactor.callLater(0, self.processQueue)
231             
232     #{ Downloading request interface
233     def setCommonHeaders(self):
234         """Get the common HTTP headers for all requests."""
235         headers = http_headers.Headers()
236         headers.setHeader('Host', self.host)
237         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
238                           (version.short(), twisted_version.short(), web2_version.short()))
239         return headers
240     
241     def get(self, path, method="GET", modtime=None):
242         """Add a new request to the queue.
243         
244         @type path: C{string}
245         @param path: the path to request from the peer
246         @type method: C{string}
247         @param method: the HTTP method to use, 'GET' or 'HEAD'
248             (optional, defaults to 'GET')
249         @type modtime: C{int}
250         @param modtime: the modification time to use for an 'If-Modified-Since'
251             header, as seconds since the epoch
252             (optional, defaults to not sending that header)
253         """
254         headers = self.setCommonHeaders()
255         if modtime:
256             headers.setHeader('If-Modified-Since', modtime)
257         return self.submitRequest(ClientRequest(method, path, headers, None))
258     
259     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
260         """Add a new request with a Range header to the queue.
261         
262         @type path: C{string}
263         @param path: the path to request from the peer
264         @type rangeStart: C{int}
265         @param rangeStart: the byte to begin the request at
266         @type rangeEnd: C{int}
267         @param rangeEnd: the byte to end the request at (inclusive)
268         @type method: C{string}
269         @param method: the HTTP method to use, 'GET' or 'HEAD'
270             (optional, defaults to 'GET')
271         """
272         headers = self.setCommonHeaders()
273         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
274         return self.submitRequest(ClientRequest(method, path, headers, None))
275     
276     #{ Peer information
277     def isIdle(self):
278         """Check whether the peer is idle or not."""
279         return not self.busy and not self.request_queue and not self.outstanding
280     
281     def _processLastResponse(self):
282         """Save the download time of the last request for speed calculations."""
283         if self._lastResponse is not None:
284             now = datetime.now()
285             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
286             self._lastResponse = None
287             
288     def downloadSpeed(self):
289         """Gets the latest average download speed for the peer.
290         
291         The average is over the last 10 responses that occurred in the last hour.
292         """
293         total_time = 0.0
294         total_download = 0
295         now = datetime.now()
296         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
297                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
298             self._downloadSpeeds.pop(0)
299
300         # If there are none, then you get 0
301         if not self._downloadSpeeds:
302             return 0.0
303         
304         for download in self._downloadSpeeds:
305             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
306             total_download += download[2]
307
308         return total_download / total_time
309     
310     def responseTime(self):
311         """Gets the latest average response time for the peer.
312         
313         Response time is the time from receiving the request, to the time
314         the download begins. The average is over the last 10 responses that
315         occurred in the last hour.
316         """
317         total_response = 0.0
318         now = datetime.now()
319         while self._responseTimes and (len(self._responseTimes) > 10 or 
320                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
321             self._responseTimes.pop(0)
322
323         # If there are none, give it the benefit of the doubt
324         if not self._responseTimes:
325             return 0.0
326
327         for response in self._responseTimes:
328             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
329
330         return total_response / len(self._responseTimes)
331     
332     def rerank(self):
333         """Determine the ranking value for the peer.
334         
335         The ranking value is composed of 5 numbers, each exponentially
336         decreasing from 1 to 0 based on:
337          - if a connection to the peer is open
338          - the number of pending requests
339          - the time to download a single piece
340          - the number of errors
341          - the response time
342         """
343         rank = 1.0
344         if self.closed:
345             rank *= 0.9
346         rank *= exp(-(len(self.request_queue) + self.outstanding))
347         speed = self.downloadSpeed()
348         if speed > 0.0:
349             rank *= exp(-512.0*1024 / speed)
350         if self._completed:
351             rank *= exp(-10.0 * self._errors / self._completed)
352         rank *= exp(-self.responseTime() / 5.0)
353         self.rank = rank
354         
355 class TestClientManager(unittest.TestCase):
356     """Unit tests for the Peer."""
357     
358     client = None
359     pending_calls = []
360     length = []
361     
362     def gotResp(self, resp, num, expect):
363         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
364         if expect is not None:
365             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
366         while len(self.length) <= num:
367             self.length.append(0)
368         self.length[num] = 0
369         def addData(data, self = self, num = num):
370             self.length[num] += len(data)
371         def checkLength(resp, self = self, num = num, length = resp.stream.length):
372             self.failUnlessEqual(self.length[num], length)
373             return resp
374         df = stream_mod.readStream(resp.stream, addData)
375         df.addCallback(checkLength)
376         return df
377     
378     def test_download(self):
379         """Tests a normal download."""
380         host = 'www.ietf.org'
381         self.client = Peer(host, 80)
382         self.timeout = 10
383         
384         d = self.client.get('/rfc/rfc0013.txt')
385         d.addCallback(self.gotResp, 1, 1070)
386         return d
387         
388     def test_head(self):
389         """Tests a 'HEAD' request."""
390         host = 'www.ietf.org'
391         self.client = Peer(host, 80)
392         self.timeout = 10
393         
394         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
395         d.addCallback(self.gotResp, 1, 0)
396         return d
397         
398     def test_multiple_downloads(self):
399         """Tests multiple downloads with queueing and connection closing."""
400         host = 'www.ietf.org'
401         self.client = Peer(host, 80)
402         self.timeout = 120
403         lastDefer = defer.Deferred()
404         
405         def newRequest(path, num, expect, last=False):
406             d = self.client.get(path)
407             d.addCallback(self.gotResp, num, expect)
408             if last:
409                 d.addBoth(lastDefer.callback)
410
411         # 3 quick requests
412         newRequest("/rfc/rfc0006.txt", 1, 1776)
413         newRequest("/rfc/rfc2362.txt", 2, 159833)
414         newRequest("/rfc/rfc0801.txt", 3, 40824)
415         
416         # This one will probably be queued
417         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
418         
419         # Connection should still be open, but idle
420         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
421         
422         #Connection should be closed
423         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
424         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
425         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
426         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
427         
428         # Now it should definitely be closed
429         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
430         return lastDefer
431         
432     def test_multiple_quick_downloads(self):
433         """Tests lots of multiple downloads with queueing."""
434         host = 'www.ietf.org'
435         self.client = Peer(host, 80)
436         self.timeout = 30
437         lastDefer = defer.Deferred()
438         
439         def newRequest(path, num, expect, last=False):
440             d = self.client.get(path)
441             d.addCallback(self.gotResp, num, expect)
442             if last:
443                 d.addBoth(lastDefer.callback)
444                 
445         newRequest("/rfc/rfc0006.txt", 1, 1776)
446         newRequest("/rfc/rfc2362.txt", 2, 159833)
447         newRequest("/rfc/rfc0801.txt", 3, 40824)
448         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
449         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
450         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
451         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
452         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
453         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
454         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
455         return lastDefer
456         
457     def checkInfo(self):
458         log.msg('Rank is: %r' % self.client.rank)
459         log.msg('Download speed is: %r' % self.client.downloadSpeed())
460         log.msg('Response Time is: %r' % self.client.responseTime())
461         
462     def test_peer_info(self):
463         """Test retrieving the peer info during a download."""
464         host = 'www.ietf.org'
465         self.client = Peer(host, 80)
466         self.timeout = 120
467         lastDefer = defer.Deferred()
468         
469         def newRequest(path, num, expect, last=False):
470             d = self.client.get(path)
471             d.addCallback(self.gotResp, num, expect)
472             if last:
473                 d.addBoth(lastDefer.callback)
474                 
475         newRequest("/rfc/rfc0006.txt", 1, 1776)
476         newRequest("/rfc/rfc2362.txt", 2, 159833)
477         newRequest("/rfc/rfc0801.txt", 3, 40824)
478         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
479         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
480         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
481         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
482         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
483         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
484         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
485         
486         for i in xrange(2, 122, 2):
487             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
488         
489         return lastDefer
490         
491     def test_range(self):
492         """Test a Range request."""
493         host = 'www.ietf.org'
494         self.client = Peer(host, 80)
495         self.timeout = 10
496         
497         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
498         d.addCallback(self.gotResp, 1, 100)
499         return d
500         
501     def tearDown(self):
502         for p in self.pending_calls:
503             if p.active():
504                 p.cancel()
505         self.pending_calls = []
506         if self.client:
507             self.client.close()
508             self.client = None