Add an optimization to the HTTP client protocol so that pieplining begins sooner.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
18
19 from apt_p2p_conf import version
20
21 class PipelineError(Exception):
22     """An error has occurred in pipelining requests."""
23
24 class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
25     """Fix the broken _error function."""
26
27     def __init__(self, channel, request, closeAfter):
28         HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
29         self.started = False
30
31     def _error(self, err):
32         """
33         Abort parsing, and depending of the status of the request, either fire
34         the C{responseDefer} if no response has been sent yet, or close the
35         stream.
36         """
37         if self.started:
38             self.abortParse()
39         if hasattr(self, 'stream') and self.stream is not None:
40             self.stream.finish(err)
41         else:
42             self.responseDefer.errback(err)
43
44     def gotInitialLine(self, initialLine):
45         self.started = True
46         HTTPClientChannelRequest.gotInitialLine(self, initialLine)
47     
48 class LoggingHTTPClientProtocol(HTTPClientProtocol):
49     """A modified client protocol that logs the number of bytes received."""
50     
51     def __init__(self, factory, stats = None, mirror = False):
52         HTTPClientProtocol.__init__(self, factory)
53         self.stats = stats
54         self.mirror = mirror
55     
56     def lineReceived(self, line):
57         if self.stats:
58             self.stats.receivedBytes(len(line) + 2, self.mirror)
59         HTTPClientProtocol.lineReceived(self, line)
60
61     def rawDataReceived(self, data):
62         if self.stats:
63             self.stats.receivedBytes(len(data), self.mirror)
64         HTTPClientProtocol.rawDataReceived(self, data)
65
66     def submitRequest(self, request, closeAfter=True):
67         """
68         @param request: The request to send to a remote server.
69         @type request: L{ClientRequest}
70
71         @param closeAfter: If True the 'Connection: close' header will be sent,
72             otherwise 'Connection: keep-alive'
73         @type closeAfter: C{bool}
74
75         @rtype: L{twisted.internet.defer.Deferred}
76         @return: A Deferred which will be called back with the
77             L{twisted.web2.http.Response} from the server.
78         """
79
80         # Assert we're in a valid state to submit more
81         assert self.outRequest is None
82         assert ((self.readPersistent is PERSIST_NO_PIPELINE
83                  and not self.inRequests)
84                 or self.readPersistent is PERSIST_PIPELINE)
85
86         self.manager.clientBusy(self)
87         if closeAfter:
88             self.readPersistent = False
89
90         self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
91                                             request, closeAfter)
92         self.inRequests.append(chanRequest)
93
94         chanRequest.submit()
95         return chanRequest.responseDefer
96
97     def setReadPersistent(self, persist):
98         oldPersist = self.readPersistent
99         self.readPersistent = persist
100         if not persist:
101             # Tell all requests but first to abort.
102             lostRequests = self.inRequests[1:]
103             del self.inRequests[1:]
104             for request in lostRequests:
105                 request.connectionLost(PipelineError('Pipelined connection was closed.'))
106         elif (oldPersist is PERSIST_NO_PIPELINE and
107               persist is PERSIST_PIPELINE and
108               self.outRequest is None):
109             self.manager.clientPipelining(self)
110
111     def connectionLost(self, reason):
112         self.readPersistent = False
113         self.setTimeout(None)
114         self.manager.clientGone(self)
115         # Cancel the current request
116         if self.inRequests and self.inRequests[0] is not None:
117             self.inRequests[0].connectionLost(reason)
118         # Tell all remaining requests to abort.
119         lostRequests = self.inRequests[1:]
120         del self.inRequests[1:]
121         for request in lostRequests:
122             if request is not None:
123                 request.connectionLost(PipelineError('Pipelined connection was closed.'))
124                 
125 class Peer(ClientFactory):
126     """A manager for all HTTP requests to a single peer.
127     
128     Controls all requests that go to a single peer (host and port).
129     This includes buffering requests until they can be sent and reconnecting
130     in the event of the connection being closed.
131     
132     """
133
134     implements(IHTTPClientManager)
135     
136     def __init__(self, host, port = 80, stats = None):
137         self.host = host
138         self.port = port
139         self.stats = stats
140         self.mirror = False
141         self.rank = 0.01
142         self.busy = False
143         self.pipeline = False
144         self.closed = True
145         self.connecting = False
146         self.request_queue = []
147         self.outstanding = 0
148         self.proto = None
149         self.connector = None
150         self._errors = 0
151         self._completed = 0
152         self._downloadSpeeds = []
153         self._lastResponse = None
154         self._responseTimes = []
155     
156     def __repr__(self):
157         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
158         
159     #{ Manage the request queue
160     def connect(self):
161         """Connect to the peer."""
162         assert self.closed and not self.connecting
163         log.msg('Connecting to (%s, %d)' % (self.host, self.port))
164         self.connecting = True
165         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
166                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
167         d.addCallbacks(self.connected, self.connectionError)
168
169     def connected(self, proto):
170         """Begin processing the queued requests."""
171         log.msg('Connected to (%s, %d)' % (self.host, self.port))
172         self.closed = False
173         self.connecting = False
174         self.proto = proto
175         reactor.callLater(0, self.processQueue)
176         
177     def connectionError(self, err):
178         """Cancel the requests."""
179         log.msg('Failed to connect to the peer by HTTP.')
180         log.err(err)
181
182         # Remove one request so that we don't loop indefinitely
183         if self.request_queue:
184             req, deferRequest, submissionTime = self.request_queue.pop(0)
185             deferRequest.errback(err)
186             
187         self._completed += 1
188         self._errors += 1
189         self.rerank()
190         if self.connecting:
191             self.connecting = False
192             self.clientGone(None)
193         
194     def close(self):
195         """Close the connection to the peer."""
196         if not self.closed:
197             self.proto.transport.loseConnection()
198
199     def submitRequest(self, request):
200         """Add a new request to the queue.
201         
202         @type request: L{twisted.web2.client.http.ClientRequest}
203         @return: deferred that will fire with the completed request
204         """
205         submissionTime = datetime.now()
206         deferRequest = defer.Deferred()
207         self.request_queue.append((request, deferRequest, submissionTime))
208         self.rerank()
209         reactor.callLater(0, self.processQueue)
210         return deferRequest
211
212     def processQueue(self):
213         """Check the queue to see if new requests can be sent to the peer."""
214         if not self.request_queue:
215             return
216         if self.connecting:
217             return
218         if self.closed:
219             self.connect()
220             return
221         if self.busy and not self.pipeline:
222             return
223         if self.outstanding and not self.pipeline:
224             return
225         if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
226                  and not self.proto.inRequests)
227                  or self.proto.readPersistent is PERSIST_PIPELINE):
228             log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
229                     (self.proto.readPersistent, self.proto.inRequests))
230             return
231
232         req, deferRequest, submissionTime = self.request_queue.pop(0)
233         try:
234             deferResponse = self.proto.submitRequest(req, False)
235         except:
236             # Try again later
237             log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
238             log.err()
239             self.request_queue.insert(0, (request, deferRequest, submissionTime))
240             ractor.callLater(1, self.processQueue)
241             return
242             
243         self.outstanding += 1
244         self.rerank()
245         deferResponse.addCallbacks(self.requestComplete, self.requestError,
246                                    callbackArgs = (req, deferRequest, submissionTime),
247                                    errbackArgs = (req, deferRequest))
248
249     def requestComplete(self, resp, req, deferRequest, submissionTime):
250         """Process a completed request."""
251         self._processLastResponse()
252         self.outstanding -= 1
253         assert self.outstanding >= 0
254         log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
255         self._completed += 1
256         now = datetime.now()
257         self._responseTimes.append((now, now - submissionTime))
258         self._lastResponse = (now, resp.stream.length)
259         self.rerank()
260         deferRequest.callback(resp)
261
262     def requestError(self, error, req, deferRequest):
263         """Process a request that ended with an error."""
264         self._processLastResponse()
265         self.outstanding -= 1
266         assert self.outstanding >= 0
267         log.msg('Download of %s generated error %r' % (req.uri, error))
268         self._completed += 1
269         self._errors += 1
270         self.rerank()
271         deferRequest.errback(error)
272         
273     def hashError(self, error):
274         """Log that a hash error occurred from the peer."""
275         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
276         self._errors += 1
277         self.rerank()
278
279     #{ IHTTPClientManager interface
280     def clientBusy(self, proto):
281         """Save the busy state."""
282         self.busy = True
283
284     def clientIdle(self, proto):
285         """Try to send a new request."""
286         self._processLastResponse()
287         self.busy = False
288         reactor.callLater(0, self.processQueue)
289         self.rerank()
290
291     def clientPipelining(self, proto):
292         """Try to send a new request."""
293         self.pipeline = True
294         reactor.callLater(0, self.processQueue)
295
296     def clientGone(self, proto):
297         """Mark sent requests as errors."""
298         self._processLastResponse()
299         log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
300         self.busy = False
301         self.pipeline = False
302         self.closed = True
303         self.connecting = False
304         self.proto = None
305         self.rerank()
306         if self.request_queue:
307             reactor.callLater(0, self.processQueue)
308             
309     #{ Downloading request interface
310     def setCommonHeaders(self):
311         """Get the common HTTP headers for all requests."""
312         headers = http_headers.Headers()
313         headers.setHeader('Host', self.host)
314         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
315                           (version.short(), twisted_version.short(), web2_version.short()))
316         return headers
317     
318     def get(self, path, method="GET", modtime=None):
319         """Add a new request to the queue.
320         
321         @type path: C{string}
322         @param path: the path to request from the peer
323         @type method: C{string}
324         @param method: the HTTP method to use, 'GET' or 'HEAD'
325             (optional, defaults to 'GET')
326         @type modtime: C{int}
327         @param modtime: the modification time to use for an 'If-Modified-Since'
328             header, as seconds since the epoch
329             (optional, defaults to not sending that header)
330         """
331         headers = self.setCommonHeaders()
332         if modtime:
333             headers.setHeader('If-Modified-Since', modtime)
334         return self.submitRequest(ClientRequest(method, path, headers, None))
335     
336     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
337         """Add a new request with a Range header to the queue.
338         
339         @type path: C{string}
340         @param path: the path to request from the peer
341         @type rangeStart: C{int}
342         @param rangeStart: the byte to begin the request at
343         @type rangeEnd: C{int}
344         @param rangeEnd: the byte to end the request at (inclusive)
345         @type method: C{string}
346         @param method: the HTTP method to use, 'GET' or 'HEAD'
347             (optional, defaults to 'GET')
348         """
349         headers = self.setCommonHeaders()
350         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
351         return self.submitRequest(ClientRequest(method, path, headers, None))
352     
353     #{ Peer information
354     def isIdle(self):
355         """Check whether the peer is idle or not."""
356         return not self.busy and not self.request_queue and not self.outstanding
357     
358     def _processLastResponse(self):
359         """Save the download time of the last request for speed calculations."""
360         if self._lastResponse is not None:
361             now = datetime.now()
362             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
363             self._lastResponse = None
364             
365     def downloadSpeed(self):
366         """Gets the latest average download speed for the peer.
367         
368         The average is over the last 10 responses that occurred in the last hour.
369         """
370         total_time = 0.0
371         total_download = 0
372         now = datetime.now()
373         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
374                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
375             self._downloadSpeeds.pop(0)
376
377         # If there are none, then you get 0
378         if not self._downloadSpeeds:
379             return 0.0
380         
381         for download in self._downloadSpeeds:
382             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
383             total_download += download[2]
384
385         return total_download / total_time
386     
387     def responseTime(self):
388         """Gets the latest average response time for the peer.
389         
390         Response time is the time from receiving the request, to the time
391         the download begins. The average is over the last 10 responses that
392         occurred in the last hour.
393         """
394         total_response = 0.0
395         now = datetime.now()
396         while self._responseTimes and (len(self._responseTimes) > 10 or 
397                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
398             self._responseTimes.pop(0)
399
400         # If there are none, give it the benefit of the doubt
401         if not self._responseTimes:
402             return 0.0
403
404         for response in self._responseTimes:
405             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
406
407         return total_response / len(self._responseTimes)
408     
409     def rerank(self):
410         """Determine the ranking value for the peer.
411         
412         The ranking value is composed of 5 numbers, each exponentially
413         decreasing from 1 to 0 based on:
414          - if a connection to the peer is open
415          - the number of pending requests
416          - the time to download a single piece
417          - the number of errors
418          - the response time
419         """
420         rank = 1.0
421         if self.closed:
422             rank *= 0.9
423         rank *= exp(-(len(self.request_queue) + self.outstanding))
424         speed = self.downloadSpeed()
425         if speed > 0.0:
426             rank *= exp(-512.0*1024 / speed)
427         if self._completed:
428             rank *= exp(-10.0 * self._errors / self._completed)
429         rank *= exp(-self.responseTime() / 5.0)
430         self.rank = rank
431         
432 class TestClientManager(unittest.TestCase):
433     """Unit tests for the Peer."""
434     
435     client = None
436     pending_calls = []
437     length = []
438     
439     def gotResp(self, resp, num, expect):
440         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
441         if expect is not None:
442             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
443         while len(self.length) <= num:
444             self.length.append(0)
445         self.length[num] = 0
446         def addData(data, self = self, num = num):
447             self.length[num] += len(data)
448         def checkLength(resp, self = self, num = num, length = resp.stream.length):
449             self.failUnlessEqual(self.length[num], length)
450             return resp
451         df = stream_mod.readStream(resp.stream, addData)
452         df.addCallback(checkLength)
453         return df
454     
455     def test_download(self):
456         """Tests a normal download."""
457         host = 'www.ietf.org'
458         self.client = Peer(host, 80)
459         self.timeout = 10
460         
461         d = self.client.get('/rfc/rfc0013.txt')
462         d.addCallback(self.gotResp, 1, 1070)
463         return d
464         
465     def test_head(self):
466         """Tests a 'HEAD' request."""
467         host = 'www.ietf.org'
468         self.client = Peer(host, 80)
469         self.timeout = 10
470         
471         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
472         d.addCallback(self.gotResp, 1, 0)
473         return d
474         
475     def test_multiple_downloads(self):
476         """Tests multiple downloads with queueing and connection closing."""
477         host = 'www.ietf.org'
478         self.client = Peer(host, 80)
479         self.timeout = 120
480         lastDefer = defer.Deferred()
481         
482         def newRequest(path, num, expect, last=False):
483             d = self.client.get(path)
484             d.addCallback(self.gotResp, num, expect)
485             if last:
486                 d.addBoth(lastDefer.callback)
487
488         # 3 quick requests
489         newRequest("/rfc/rfc0006.txt", 1, 1776)
490         newRequest("/rfc/rfc2362.txt", 2, 159833)
491         newRequest("/rfc/rfc0801.txt", 3, 40824)
492         
493         # This one will probably be queued
494         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
495         
496         # Connection should still be open, but idle
497         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
498         
499         #Connection should be closed
500         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
501         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
502         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
503         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
504         
505         # Now it should definitely be closed
506         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
507         return lastDefer
508         
509     def test_multiple_quick_downloads(self):
510         """Tests lots of multiple downloads with queueing."""
511         host = 'www.ietf.org'
512         self.client = Peer(host, 80)
513         self.timeout = 30
514         lastDefer = defer.Deferred()
515         
516         def newRequest(path, num, expect, last=False):
517             d = self.client.get(path)
518             d.addCallback(self.gotResp, num, expect)
519             if last:
520                 d.addBoth(lastDefer.callback)
521                 
522         newRequest("/rfc/rfc0006.txt", 1, 1776)
523         newRequest("/rfc/rfc2362.txt", 2, 159833)
524         newRequest("/rfc/rfc0801.txt", 3, 40824)
525         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
526         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
527         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
528         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
529         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
530         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
531         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
532         return lastDefer
533         
534     def checkInfo(self):
535         log.msg('Rank is: %r' % self.client.rank)
536         log.msg('Download speed is: %r' % self.client.downloadSpeed())
537         log.msg('Response Time is: %r' % self.client.responseTime())
538         
539     def test_peer_info(self):
540         """Test retrieving the peer info during a download."""
541         host = 'www.ietf.org'
542         self.client = Peer(host, 80)
543         self.timeout = 120
544         lastDefer = defer.Deferred()
545         
546         def newRequest(path, num, expect, last=False):
547             d = self.client.get(path)
548             d.addCallback(self.gotResp, num, expect)
549             if last:
550                 d.addBoth(lastDefer.callback)
551                 
552         newRequest("/rfc/rfc0006.txt", 1, 1776)
553         newRequest("/rfc/rfc2362.txt", 2, 159833)
554         newRequest("/rfc/rfc0801.txt", 3, 40824)
555         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
556         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
557         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
558         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
559         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
560         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
561         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
562         
563         for i in xrange(2, 122, 2):
564             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
565         
566         return lastDefer
567         
568     def test_range(self):
569         """Test a Range request."""
570         host = 'www.ietf.org'
571         self.client = Peer(host, 80)
572         self.timeout = 10
573         
574         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
575         d.addCallback(self.gotResp, 1, 100)
576         return d
577         
578     def tearDown(self):
579         for p in self.pending_calls:
580             if p.active():
581                 p.cancel()
582         self.pending_calls = []
583         if self.client:
584             self.client.close()
585             self.client = None