Final version of INFOCOM paper.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
18
19 from apt_p2p_conf import version
20
21 class PipelineError(Exception):
22     """An error has occurred in pipelining requests."""
23
24 class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
25     """Fix the broken _error function."""
26
27     def __init__(self, channel, request, closeAfter):
28         HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
29         self.started = False
30
31     def _error(self, err):
32         """
33         Abort parsing, and depending of the status of the request, either fire
34         the C{responseDefer} if no response has been sent yet, or close the
35         stream.
36         """
37         if self.started:
38             self.abortParse()
39         if hasattr(self, 'stream') and self.stream is not None:
40             self.stream.finish(err)
41         else:
42             self.responseDefer.errback(err)
43
44     def gotInitialLine(self, initialLine):
45         self.started = True
46         HTTPClientChannelRequest.gotInitialLine(self, initialLine)
47     
48 class LoggingHTTPClientProtocol(HTTPClientProtocol):
49     """A modified client protocol that logs the number of bytes received."""
50     
51     def __init__(self, factory, stats = None, mirror = False):
52         HTTPClientProtocol.__init__(self, factory)
53         self.stats = stats
54         self.mirror = mirror
55     
56     def lineReceived(self, line):
57         if self.stats:
58             self.stats.receivedBytes(len(line) + 2, self.mirror)
59         HTTPClientProtocol.lineReceived(self, line)
60
61     def rawDataReceived(self, data):
62         if self.stats:
63             self.stats.receivedBytes(len(data), self.mirror)
64         HTTPClientProtocol.rawDataReceived(self, data)
65
66     def submitRequest(self, request, closeAfter=True):
67         """
68         @param request: The request to send to a remote server.
69         @type request: L{ClientRequest}
70
71         @param closeAfter: If True the 'Connection: close' header will be sent,
72             otherwise 'Connection: keep-alive'
73         @type closeAfter: C{bool}
74
75         @rtype: L{twisted.internet.defer.Deferred}
76         @return: A Deferred which will be called back with the
77             L{twisted.web2.http.Response} from the server.
78         """
79
80         # Assert we're in a valid state to submit more
81         assert self.outRequest is None
82         assert ((self.readPersistent is PERSIST_NO_PIPELINE
83                  and not self.inRequests)
84                 or self.readPersistent is PERSIST_PIPELINE)
85
86         self.manager.clientBusy(self)
87         if closeAfter:
88             self.readPersistent = False
89
90         self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
91                                             request, closeAfter)
92         self.inRequests.append(chanRequest)
93
94         chanRequest.submit()
95         return chanRequest.responseDefer
96
97     def setReadPersistent(self, persist):
98         oldPersist = self.readPersistent
99         self.readPersistent = persist
100         if not persist:
101             # Tell all requests but first to abort.
102             lostRequests = self.inRequests[1:]
103             del self.inRequests[1:]
104             for request in lostRequests:
105                 request.connectionLost(PipelineError('Pipelined connection was closed.'))
106         elif (oldPersist is PERSIST_NO_PIPELINE and
107               persist is PERSIST_PIPELINE and
108               self.outRequest is None):
109             self.manager.clientPipelining(self)
110
111     def connectionLost(self, reason):
112         self.readPersistent = False
113         self.setTimeout(None)
114         self.manager.clientGone(self)
115         # Cancel the current request
116         if self.inRequests and self.inRequests[0] is not None:
117             self.inRequests[0].connectionLost(reason)
118         # Tell all remaining requests to abort.
119         lostRequests = self.inRequests[1:]
120         del self.inRequests[1:]
121         for request in lostRequests:
122             if request is not None:
123                 request.connectionLost(PipelineError('Pipelined connection was closed.'))
124                 
125 class Peer(ClientFactory):
126     """A manager for all HTTP requests to a single peer.
127     
128     Controls all requests that go to a single peer (host and port).
129     This includes buffering requests until they can be sent and reconnecting
130     in the event of the connection being closed.
131     
132     """
133
134     implements(IHTTPClientManager)
135     
136     def __init__(self, host, port = 80, stats = None):
137         self.host = host
138         self.port = port
139         self.stats = stats
140         self.mirror = False
141         self.rank = 0.01
142         self.busy = False
143         self.pipeline = False
144         self.closed = True
145         self.connecting = False
146         self.request_queue = []
147         self.outstanding = 0
148         self.proto = None
149         self.connector = None
150         self._errors = 0
151         self._completed = 0
152         self._downloadSpeeds = []
153         self._lastResponse = None
154         self._responseTimes = []
155     
156     def __repr__(self):
157         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
158         
159     #{ Manage the request queue
160     def connect(self):
161         """Connect to the peer."""
162         assert self.closed and not self.connecting
163         log.msg('Connecting to (%s, %d)' % (self.host, self.port))
164         self.connecting = True
165         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
166                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port, timeout = 10)
167         d.addCallbacks(self.connected, self.connectionError)
168
169     def connected(self, proto):
170         """Begin processing the queued requests."""
171         log.msg('Connected to (%s, %d)' % (self.host, self.port))
172         self.closed = False
173         self.connecting = False
174         self.proto = proto
175         reactor.callLater(0, self.processQueue)
176         
177     def connectionError(self, err):
178         """Cancel the requests."""
179         log.msg('Failed to connect to the peer by HTTP.')
180         log.err(err)
181
182         # Remove one request so that we don't loop indefinitely
183         if self.request_queue:
184             req, deferRequest, submissionTime = self.request_queue.pop(0)
185             deferRequest.errback(err)
186             
187         self._completed += 1
188         self._errors += 1
189         self.rerank()
190         if self.connecting:
191             self.connecting = False
192             self.clientGone(None)
193         
194     def close(self):
195         """Close the connection to the peer."""
196         if not self.closed:
197             self.proto.transport.loseConnection()
198
199     def submitRequest(self, request):
200         """Add a new request to the queue.
201         
202         @type request: L{twisted.web2.client.http.ClientRequest}
203         @return: deferred that will fire with the completed request
204         """
205         submissionTime = datetime.now()
206         deferRequest = defer.Deferred()
207         self.request_queue.append((request, deferRequest, submissionTime))
208         self.rerank()
209         reactor.callLater(0, self.processQueue)
210         return deferRequest
211
212     def processQueue(self):
213         """Check the queue to see if new requests can be sent to the peer."""
214         if not self.request_queue:
215             return
216         if self.connecting:
217             return
218         if self.closed:
219             self.connect()
220             return
221         if self.busy and not self.pipeline:
222             return
223         if self.outstanding and not self.pipeline:
224             return
225         if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
226                  and not self.proto.inRequests)
227                  or self.proto.readPersistent is PERSIST_PIPELINE):
228             log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
229                     (self.proto.readPersistent, self.proto.inRequests))
230             return
231
232         req, deferRequest, submissionTime = self.request_queue.pop(0)
233         try:
234             deferResponse = self.proto.submitRequest(req, False)
235         except:
236             # Try again later
237             log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
238             log.err()
239             self.request_queue.insert(0, (request, deferRequest, submissionTime))
240             ractor.callLater(1, self.processQueue)
241             return
242             
243         self.outstanding += 1
244         self.rerank()
245         deferResponse.addCallbacks(self.requestComplete, self.requestError,
246                                    callbackArgs = (req, deferRequest, submissionTime),
247                                    errbackArgs = (req, deferRequest))
248
249     def requestComplete(self, resp, req, deferRequest, submissionTime):
250         """Process a completed request."""
251         self._processLastResponse()
252         self.outstanding -= 1
253         assert self.outstanding >= 0
254         log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
255         self._completed += 1
256         now = datetime.now()
257         self._responseTimes.append((now, now - submissionTime))
258         self._lastResponse = (now, resp.stream.length)
259         self.rerank()
260         deferRequest.callback(resp)
261
262     def requestError(self, error, req, deferRequest):
263         """Process a request that ended with an error."""
264         self._processLastResponse()
265         self.outstanding -= 1
266         assert self.outstanding >= 0
267         log.msg('Download of %s generated error %r' % (req.uri, error))
268         self._completed += 1
269         self._errors += 1
270         self.rerank()
271         deferRequest.errback(error)
272         
273     def hashError(self, error):
274         """Log that a hash error occurred from the peer."""
275         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
276         self._errors += 1
277         self.rerank()
278
279     #{ IHTTPClientManager interface
280     def clientBusy(self, proto):
281         """Save the busy state."""
282         self.busy = True
283
284     def clientIdle(self, proto):
285         """Try to send a new request."""
286         self._processLastResponse()
287         self.busy = False
288         reactor.callLater(0, self.processQueue)
289         self.rerank()
290
291     def clientPipelining(self, proto):
292         """Try to send a new request."""
293         self.pipeline = True
294         reactor.callLater(0, self.processQueue)
295
296     def clientGone(self, proto):
297         """Mark sent requests as errors."""
298         self._processLastResponse()
299         log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
300         self.busy = False
301         self.pipeline = False
302         self.closed = True
303         self.connecting = False
304         self.proto = None
305         self.rerank()
306         if self.request_queue:
307             reactor.callLater(0, self.processQueue)
308             
309     #{ Downloading request interface
310     def setCommonHeaders(self):
311         """Get the common HTTP headers for all requests."""
312         headers = http_headers.Headers()
313         headers.setHeader('Host', self.host)
314         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
315                           (version.short(), twisted_version.short(), web2_version.short()))
316         return headers
317     
318     def get(self, path, method="GET", modtime=None):
319         """Add a new request to the queue.
320         
321         @type path: C{string}
322         @param path: the path to request from the peer
323         @type method: C{string}
324         @param method: the HTTP method to use, 'GET' or 'HEAD'
325             (optional, defaults to 'GET')
326         @type modtime: C{int}
327         @param modtime: the modification time to use for an 'If-Modified-Since'
328             header, as seconds since the epoch
329             (optional, defaults to not sending that header)
330         """
331         headers = self.setCommonHeaders()
332         if modtime:
333             headers.setHeader('If-Modified-Since', modtime)
334         return self.submitRequest(ClientRequest(method, path, headers, None))
335     
336     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
337         """Add a new request with a Range header to the queue.
338         
339         @type path: C{string}
340         @param path: the path to request from the peer
341         @type rangeStart: C{int}
342         @param rangeStart: the byte to begin the request at
343         @type rangeEnd: C{int}
344         @param rangeEnd: the byte to end the request at (inclusive)
345         @type method: C{string}
346         @param method: the HTTP method to use, 'GET' or 'HEAD'
347             (optional, defaults to 'GET')
348         """
349         headers = self.setCommonHeaders()
350         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
351         return self.submitRequest(ClientRequest(method, path, headers, None))
352     
353     #{ Peer information
354     def isIdle(self):
355         """Check whether the peer is idle or not."""
356         return not self.busy and not self.request_queue and not self.outstanding
357     
358     def _processLastResponse(self):
359         """Save the download time of the last request for speed calculations."""
360         if self._lastResponse is not None:
361             if self._lastResponse[1] is not None:
362                 now = datetime.now()
363                 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
364             self._lastResponse = None
365             
366     def downloadSpeed(self):
367         """Gets the latest average download speed for the peer.
368         
369         The average is over the last 10 responses that occurred in the last hour.
370         """
371         total_time = 0.0
372         total_download = 0
373         now = datetime.now()
374         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
375                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
376             self._downloadSpeeds.pop(0)
377
378         # If there are none, then you get 0
379         if not self._downloadSpeeds:
380             return 150000.0
381         
382         for download in self._downloadSpeeds:
383             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
384             total_download += download[2]
385
386         return total_download / total_time
387     
388     def responseTime(self):
389         """Gets the latest average response time for the peer.
390         
391         Response time is the time from receiving the request, to the time
392         the download begins. The average is over the last 10 responses that
393         occurred in the last hour.
394         """
395         total_response = 0.0
396         now = datetime.now()
397         while self._responseTimes and (len(self._responseTimes) > 10 or 
398                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
399             self._responseTimes.pop(0)
400
401         # If there are none, give it the benefit of the doubt
402         if not self._responseTimes:
403             return 0.1
404
405         for response in self._responseTimes:
406             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
407
408         return total_response / len(self._responseTimes)
409     
410     def rerank(self):
411         """Determine the ranking value for the peer.
412         
413         The ranking value is composed of 5 numbers, each exponentially
414         decreasing from 1 to 0 based on:
415          - if a connection to the peer is open
416          - the number of pending requests
417          - the time to download a single piece
418          - the number of errors
419          - the response time
420         """
421         rank = 1.0
422         if self.closed:
423             rank *= 0.9
424         rank *= exp(-(len(self.request_queue) + self.outstanding))
425         speed = self.downloadSpeed()
426         if speed > 0.0:
427             rank *= exp(-512.0*1024 / speed)
428         if self._completed:
429             rank *= exp(-10.0 * self._errors / self._completed)
430         rank *= exp(-self.responseTime() / 5.0)
431         self.rank = rank
432         
433 class TestClientManager(unittest.TestCase):
434     """Unit tests for the Peer."""
435     
436     client = None
437     pending_calls = []
438     length = []
439     
440     def gotResp(self, resp, num, expect):
441         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
442         if expect is not None:
443             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
444         while len(self.length) <= num:
445             self.length.append(0)
446         self.length[num] = 0
447         def addData(data, self = self, num = num):
448             self.length[num] += len(data)
449         def checkLength(resp, self = self, num = num, length = resp.stream.length):
450             self.failUnlessEqual(self.length[num], length)
451             return resp
452         df = stream_mod.readStream(resp.stream, addData)
453         df.addCallback(checkLength)
454         return df
455     
456     def test_download(self):
457         """Tests a normal download."""
458         host = 'www.ietf.org'
459         self.client = Peer(host, 80)
460         self.timeout = 10
461         
462         d = self.client.get('/rfc/rfc0013.txt')
463         d.addCallback(self.gotResp, 1, 1070)
464         return d
465         
466     def test_head(self):
467         """Tests a 'HEAD' request."""
468         host = 'www.ietf.org'
469         self.client = Peer(host, 80)
470         self.timeout = 10
471         
472         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
473         d.addCallback(self.gotResp, 1, 0)
474         return d
475         
476     def test_multiple_downloads(self):
477         """Tests multiple downloads with queueing and connection closing."""
478         host = 'www.ietf.org'
479         self.client = Peer(host, 80)
480         self.timeout = 120
481         lastDefer = defer.Deferred()
482         
483         def newRequest(path, num, expect, last=False):
484             d = self.client.get(path)
485             d.addCallback(self.gotResp, num, expect)
486             if last:
487                 d.addBoth(lastDefer.callback)
488
489         # 3 quick requests
490         newRequest("/rfc/rfc0006.txt", 1, 1776)
491         newRequest("/rfc/rfc2362.txt", 2, 159833)
492         newRequest("/rfc/rfc0801.txt", 3, 40824)
493         
494         # This one will probably be queued
495         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
496         
497         # Connection should still be open, but idle
498         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
499         
500         #Connection should be closed
501         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
502         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
503         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
504         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
505         
506         # Now it should definitely be closed
507         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
508         return lastDefer
509         
510     def test_multiple_quick_downloads(self):
511         """Tests lots of multiple downloads with queueing."""
512         host = 'www.ietf.org'
513         self.client = Peer(host, 80)
514         self.timeout = 30
515         lastDefer = defer.Deferred()
516         
517         def newRequest(path, num, expect, last=False):
518             d = self.client.get(path)
519             d.addCallback(self.gotResp, num, expect)
520             if last:
521                 d.addBoth(lastDefer.callback)
522                 
523         newRequest("/rfc/rfc0006.txt", 1, 1776)
524         newRequest("/rfc/rfc2362.txt", 2, 159833)
525         newRequest("/rfc/rfc0801.txt", 3, 40824)
526         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
527         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
528         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
529         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
530         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
531         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
532         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
533         return lastDefer
534         
535     def checkInfo(self):
536         log.msg('Rank is: %r' % self.client.rank)
537         log.msg('Download speed is: %r' % self.client.downloadSpeed())
538         log.msg('Response Time is: %r' % self.client.responseTime())
539         
540     def test_peer_info(self):
541         """Test retrieving the peer info during a download."""
542         host = 'www.ietf.org'
543         self.client = Peer(host, 80)
544         self.timeout = 120
545         lastDefer = defer.Deferred()
546         
547         def newRequest(path, num, expect, last=False):
548             d = self.client.get(path)
549             d.addCallback(self.gotResp, num, expect)
550             if last:
551                 d.addBoth(lastDefer.callback)
552                 
553         newRequest("/rfc/rfc0006.txt", 1, 1776)
554         newRequest("/rfc/rfc2362.txt", 2, 159833)
555         newRequest("/rfc/rfc0801.txt", 3, 40824)
556         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
557         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
558         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
559         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
560         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
561         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
562         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
563         
564         for i in xrange(2, 122, 2):
565             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
566         
567         return lastDefer
568         
569     def test_range(self):
570         """Test a Range request."""
571         host = 'www.ietf.org'
572         self.client = Peer(host, 80)
573         self.timeout = 10
574         
575         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
576         d.addCallback(self.gotResp, 1, 100)
577         return d
578         
579     def test_timeout(self):
580         """Tests a connection timeout."""
581         from twisted.internet.error import TimeoutError
582         host = 'steveholt.hopto.org'
583         self.client = Peer(host, 80)
584         self.timeout = 60
585         
586         d = self.client.get('/rfc/rfc0013.txt')
587         d.addCallback(self.gotResp, 1, 1070)
588         d = self.failUnlessFailure(d, TimeoutError)
589         d.addCallback(lambda a: self.flushLoggedErrors(TimeoutError))
590         return d
591         
592     def test_dnserror(self):
593         """Tests a connection timeout."""
594         from twisted.internet.error import DNSLookupError
595         host = 'hureyfnvbfha.debian.net'
596         self.client = Peer(host, 80)
597         self.timeout = 5
598         
599         d = self.client.get('/rfc/rfc0013.txt')
600         d.addCallback(self.gotResp, 1, 1070)
601         d = self.failUnlessFailure(d, DNSLookupError)
602         d.addCallback(lambda a: self.flushLoggedErrors(DNSLookupError))
603         return d
604         
605     def test_noroute(self):
606         """Tests a connection timeout."""
607         from twisted.internet.error import NoRouteError
608         host = '1.2.3.4'
609         self.client = Peer(host, 80)
610         self.timeout = 5
611         
612         d = self.client.get('/rfc/rfc0013.txt')
613         d.addCallback(self.gotResp, 1, 1070)
614         d = self.failUnlessFailure(d, NoRouteError)
615         d.addCallback(lambda a: self.flushLoggedErrors(NoRouteError))
616         return d
617         
618     def tearDown(self):
619         for p in self.pending_calls:
620             if p.active():
621                 p.cancel()
622         self.pending_calls = []
623         if self.client:
624             self.client.close()
625             self.client = None