55a818f1a135624306494c077db9ced72fca7b95
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
18
19 from apt_p2p_conf import version
20
21 class PipelineError(Exception):
22     """An error has occurred in pipelining requests."""
23
24 class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
25     """Fix the broken _error function."""
26
27     def __init__(self, channel, request, closeAfter):
28         HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
29         self.started = False
30
31     def _error(self, err):
32         """
33         Abort parsing, and depending of the status of the request, either fire
34         the C{responseDefer} if no response has been sent yet, or close the
35         stream.
36         """
37         if self.started:
38             self.abortParse()
39         if hasattr(self, 'stream') and self.stream is not None:
40             self.stream.finish(err)
41         else:
42             self.responseDefer.errback(err)
43
44     def gotInitialLine(self, initialLine):
45         self.started = True
46         HTTPClientChannelRequest.gotInitialLine(self, initialLine)
47     
48 class LoggingHTTPClientProtocol(HTTPClientProtocol):
49     """A modified client protocol that logs the number of bytes received."""
50     
51     def __init__(self, factory, stats = None, mirror = False):
52         HTTPClientProtocol.__init__(self, factory)
53         self.stats = stats
54         self.mirror = mirror
55     
56     def lineReceived(self, line):
57         if self.stats:
58             self.stats.receivedBytes(len(line) + 2, self.mirror)
59         HTTPClientProtocol.lineReceived(self, line)
60
61     def rawDataReceived(self, data):
62         if self.stats:
63             self.stats.receivedBytes(len(data), self.mirror)
64         HTTPClientProtocol.rawDataReceived(self, data)
65
66     def submitRequest(self, request, closeAfter=True):
67         """
68         @param request: The request to send to a remote server.
69         @type request: L{ClientRequest}
70
71         @param closeAfter: If True the 'Connection: close' header will be sent,
72             otherwise 'Connection: keep-alive'
73         @type closeAfter: C{bool}
74
75         @rtype: L{twisted.internet.defer.Deferred}
76         @return: A Deferred which will be called back with the
77             L{twisted.web2.http.Response} from the server.
78         """
79
80         # Assert we're in a valid state to submit more
81         assert self.outRequest is None
82         assert ((self.readPersistent is PERSIST_NO_PIPELINE
83                  and not self.inRequests)
84                 or self.readPersistent is PERSIST_PIPELINE)
85
86         self.manager.clientBusy(self)
87         if closeAfter:
88             self.readPersistent = False
89
90         self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
91                                             request, closeAfter)
92         self.inRequests.append(chanRequest)
93
94         chanRequest.submit()
95         return chanRequest.responseDefer
96
97     def setReadPersistent(self, persist):
98         self.readPersistent = persist
99         if not persist:
100             # Tell all requests but first to abort.
101             lostRequests = self.inRequests[1:]
102             del self.inRequests[1:]
103             for request in lostRequests:
104                 request.connectionLost(PipelineError('The pipelined connection was lost'))
105
106     def connectionLost(self, reason):
107         self.readPersistent = False
108         self.setTimeout(None)
109         self.manager.clientGone(self)
110         # Cancel the current request
111         if self.inRequests and self.inRequests[0] is not None:
112             self.inRequests[0].connectionLost(reason)
113         # Tell all remaining requests to abort.
114         lostRequests = self.inRequests[1:]
115         del self.inRequests[1:]
116         for request in lostRequests:
117             if request is not None:
118                 request.connectionLost(reason)
119                 
120 class Peer(ClientFactory):
121     """A manager for all HTTP requests to a single peer.
122     
123     Controls all requests that go to a single peer (host and port).
124     This includes buffering requests until they can be sent and reconnecting
125     in the event of the connection being closed.
126     
127     """
128
129     implements(IHTTPClientManager)
130     
131     def __init__(self, host, port = 80, stats = None):
132         self.host = host
133         self.port = port
134         self.stats = stats
135         self.mirror = False
136         self.rank = 0.01
137         self.busy = False
138         self.pipeline = False
139         self.closed = True
140         self.connecting = False
141         self.request_queue = []
142         self.outstanding = 0
143         self.proto = None
144         self.connector = None
145         self._errors = 0
146         self._completed = 0
147         self._downloadSpeeds = []
148         self._lastResponse = None
149         self._responseTimes = []
150     
151     def __repr__(self):
152         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
153         
154     #{ Manage the request queue
155     def connect(self):
156         """Connect to the peer."""
157         assert self.closed and not self.connecting
158         log.msg('Connecting to (%s, %d)' % (self.host, self.port))
159         self.connecting = True
160         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
161                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
162         d.addCallbacks(self.connected, self.connectionError)
163
164     def connected(self, proto):
165         """Begin processing the queued requests."""
166         log.msg('Connected to (%s, %d)' % (self.host, self.port))
167         self.closed = False
168         self.connecting = False
169         self.proto = proto
170         reactor.callLater(0, self.processQueue)
171         
172     def connectionError(self, err):
173         """Cancel the requests."""
174         log.msg('Failed to connect to the peer by HTTP.')
175         log.err(err)
176
177         # Remove one request so that we don't loop indefinitely
178         if self.request_queue:
179             req, deferRequest, submissionTime = self.request_queue.pop(0)
180             deferRequest.errback(err)
181             
182         self._completed += 1
183         self._errors += 1
184         self.rerank()
185         if self.connecting:
186             self.connecting = False
187             self.clientGone(None)
188         
189     def close(self):
190         """Close the connection to the peer."""
191         if not self.closed:
192             self.proto.transport.loseConnection()
193
194     def submitRequest(self, request):
195         """Add a new request to the queue.
196         
197         @type request: L{twisted.web2.client.http.ClientRequest}
198         @return: deferred that will fire with the completed request
199         """
200         submissionTime = datetime.now()
201         deferRequest = defer.Deferred()
202         self.request_queue.append((request, deferRequest, submissionTime))
203         self.rerank()
204         reactor.callLater(0, self.processQueue)
205         return deferRequest
206
207     def processQueue(self):
208         """Check the queue to see if new requests can be sent to the peer."""
209         if not self.request_queue:
210             return
211         if self.connecting:
212             return
213         if self.closed:
214             self.connect()
215             return
216         if self.busy and not self.pipeline:
217             return
218         if self.outstanding and not self.pipeline:
219             return
220         if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
221                  and not self.proto.inRequests)
222                  or self.proto.readPersistent is PERSIST_PIPELINE):
223             log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
224                     (self.proto.readPersistent, self.proto.inRequests))
225             return
226
227         req, deferRequest, submissionTime = self.request_queue.pop(0)
228         try:
229             deferResponse = self.proto.submitRequest(req, False)
230         except:
231             # Try again later
232             log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
233             log.err()
234             self.request_queue.insert(0, (request, deferRequest, submissionTime))
235             ractor.callLater(1, self.processQueue)
236             return
237             
238         self.outstanding += 1
239         self.rerank()
240         deferResponse.addCallbacks(self.requestComplete, self.requestError,
241                                    callbackArgs = (req, deferRequest, submissionTime),
242                                    errbackArgs = (req, deferRequest))
243
244     def requestComplete(self, resp, req, deferRequest, submissionTime):
245         """Process a completed request."""
246         self._processLastResponse()
247         self.outstanding -= 1
248         assert self.outstanding >= 0
249         log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
250         self._completed += 1
251         now = datetime.now()
252         self._responseTimes.append((now, now - submissionTime))
253         self._lastResponse = (now, resp.stream.length)
254         self.rerank()
255         deferRequest.callback(resp)
256
257     def requestError(self, error, req, deferRequest):
258         """Process a request that ended with an error."""
259         self._processLastResponse()
260         self.outstanding -= 1
261         assert self.outstanding >= 0
262         log.msg('Download of %s generated error %r' % (req.uri, error))
263         self._completed += 1
264         self._errors += 1
265         self.rerank()
266         deferRequest.errback(error)
267         
268     def hashError(self, error):
269         """Log that a hash error occurred from the peer."""
270         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
271         self._errors += 1
272         self.rerank()
273
274     #{ IHTTPClientManager interface
275     def clientBusy(self, proto):
276         """Save the busy state."""
277         self.busy = True
278
279     def clientIdle(self, proto):
280         """Try to send a new request."""
281         self._processLastResponse()
282         self.busy = False
283         reactor.callLater(0, self.processQueue)
284         self.rerank()
285
286     def clientPipelining(self, proto):
287         """Try to send a new request."""
288         self.pipeline = True
289         reactor.callLater(0, self.processQueue)
290
291     def clientGone(self, proto):
292         """Mark sent requests as errors."""
293         self._processLastResponse()
294         log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
295         self.busy = False
296         self.pipeline = False
297         self.closed = True
298         self.connecting = False
299         self.proto = None
300         self.rerank()
301         if self.request_queue:
302             reactor.callLater(0, self.processQueue)
303             
304     #{ Downloading request interface
305     def setCommonHeaders(self):
306         """Get the common HTTP headers for all requests."""
307         headers = http_headers.Headers()
308         headers.setHeader('Host', self.host)
309         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
310                           (version.short(), twisted_version.short(), web2_version.short()))
311         return headers
312     
313     def get(self, path, method="GET", modtime=None):
314         """Add a new request to the queue.
315         
316         @type path: C{string}
317         @param path: the path to request from the peer
318         @type method: C{string}
319         @param method: the HTTP method to use, 'GET' or 'HEAD'
320             (optional, defaults to 'GET')
321         @type modtime: C{int}
322         @param modtime: the modification time to use for an 'If-Modified-Since'
323             header, as seconds since the epoch
324             (optional, defaults to not sending that header)
325         """
326         headers = self.setCommonHeaders()
327         if modtime:
328             headers.setHeader('If-Modified-Since', modtime)
329         return self.submitRequest(ClientRequest(method, path, headers, None))
330     
331     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
332         """Add a new request with a Range header to the queue.
333         
334         @type path: C{string}
335         @param path: the path to request from the peer
336         @type rangeStart: C{int}
337         @param rangeStart: the byte to begin the request at
338         @type rangeEnd: C{int}
339         @param rangeEnd: the byte to end the request at (inclusive)
340         @type method: C{string}
341         @param method: the HTTP method to use, 'GET' or 'HEAD'
342             (optional, defaults to 'GET')
343         """
344         headers = self.setCommonHeaders()
345         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
346         return self.submitRequest(ClientRequest(method, path, headers, None))
347     
348     #{ Peer information
349     def isIdle(self):
350         """Check whether the peer is idle or not."""
351         return not self.busy and not self.request_queue and not self.outstanding
352     
353     def _processLastResponse(self):
354         """Save the download time of the last request for speed calculations."""
355         if self._lastResponse is not None:
356             now = datetime.now()
357             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
358             self._lastResponse = None
359             
360     def downloadSpeed(self):
361         """Gets the latest average download speed for the peer.
362         
363         The average is over the last 10 responses that occurred in the last hour.
364         """
365         total_time = 0.0
366         total_download = 0
367         now = datetime.now()
368         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
369                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
370             self._downloadSpeeds.pop(0)
371
372         # If there are none, then you get 0
373         if not self._downloadSpeeds:
374             return 0.0
375         
376         for download in self._downloadSpeeds:
377             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
378             total_download += download[2]
379
380         return total_download / total_time
381     
382     def responseTime(self):
383         """Gets the latest average response time for the peer.
384         
385         Response time is the time from receiving the request, to the time
386         the download begins. The average is over the last 10 responses that
387         occurred in the last hour.
388         """
389         total_response = 0.0
390         now = datetime.now()
391         while self._responseTimes and (len(self._responseTimes) > 10 or 
392                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
393             self._responseTimes.pop(0)
394
395         # If there are none, give it the benefit of the doubt
396         if not self._responseTimes:
397             return 0.0
398
399         for response in self._responseTimes:
400             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
401
402         return total_response / len(self._responseTimes)
403     
404     def rerank(self):
405         """Determine the ranking value for the peer.
406         
407         The ranking value is composed of 5 numbers, each exponentially
408         decreasing from 1 to 0 based on:
409          - if a connection to the peer is open
410          - the number of pending requests
411          - the time to download a single piece
412          - the number of errors
413          - the response time
414         """
415         rank = 1.0
416         if self.closed:
417             rank *= 0.9
418         rank *= exp(-(len(self.request_queue) + self.outstanding))
419         speed = self.downloadSpeed()
420         if speed > 0.0:
421             rank *= exp(-512.0*1024 / speed)
422         if self._completed:
423             rank *= exp(-10.0 * self._errors / self._completed)
424         rank *= exp(-self.responseTime() / 5.0)
425         self.rank = rank
426         
427 class TestClientManager(unittest.TestCase):
428     """Unit tests for the Peer."""
429     
430     client = None
431     pending_calls = []
432     length = []
433     
434     def gotResp(self, resp, num, expect):
435         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
436         if expect is not None:
437             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
438         while len(self.length) <= num:
439             self.length.append(0)
440         self.length[num] = 0
441         def addData(data, self = self, num = num):
442             self.length[num] += len(data)
443         def checkLength(resp, self = self, num = num, length = resp.stream.length):
444             self.failUnlessEqual(self.length[num], length)
445             return resp
446         df = stream_mod.readStream(resp.stream, addData)
447         df.addCallback(checkLength)
448         return df
449     
450     def test_download(self):
451         """Tests a normal download."""
452         host = 'www.ietf.org'
453         self.client = Peer(host, 80)
454         self.timeout = 10
455         
456         d = self.client.get('/rfc/rfc0013.txt')
457         d.addCallback(self.gotResp, 1, 1070)
458         return d
459         
460     def test_head(self):
461         """Tests a 'HEAD' request."""
462         host = 'www.ietf.org'
463         self.client = Peer(host, 80)
464         self.timeout = 10
465         
466         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
467         d.addCallback(self.gotResp, 1, 0)
468         return d
469         
470     def test_multiple_downloads(self):
471         """Tests multiple downloads with queueing and connection closing."""
472         host = 'www.ietf.org'
473         self.client = Peer(host, 80)
474         self.timeout = 120
475         lastDefer = defer.Deferred()
476         
477         def newRequest(path, num, expect, last=False):
478             d = self.client.get(path)
479             d.addCallback(self.gotResp, num, expect)
480             if last:
481                 d.addBoth(lastDefer.callback)
482
483         # 3 quick requests
484         newRequest("/rfc/rfc0006.txt", 1, 1776)
485         newRequest("/rfc/rfc2362.txt", 2, 159833)
486         newRequest("/rfc/rfc0801.txt", 3, 40824)
487         
488         # This one will probably be queued
489         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
490         
491         # Connection should still be open, but idle
492         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
493         
494         #Connection should be closed
495         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
496         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
497         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
498         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
499         
500         # Now it should definitely be closed
501         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
502         return lastDefer
503         
504     def test_multiple_quick_downloads(self):
505         """Tests lots of multiple downloads with queueing."""
506         host = 'www.ietf.org'
507         self.client = Peer(host, 80)
508         self.timeout = 30
509         lastDefer = defer.Deferred()
510         
511         def newRequest(path, num, expect, last=False):
512             d = self.client.get(path)
513             d.addCallback(self.gotResp, num, expect)
514             if last:
515                 d.addBoth(lastDefer.callback)
516                 
517         newRequest("/rfc/rfc0006.txt", 1, 1776)
518         newRequest("/rfc/rfc2362.txt", 2, 159833)
519         newRequest("/rfc/rfc0801.txt", 3, 40824)
520         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
521         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
522         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
523         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
524         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
525         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
526         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
527         return lastDefer
528         
529     def checkInfo(self):
530         log.msg('Rank is: %r' % self.client.rank)
531         log.msg('Download speed is: %r' % self.client.downloadSpeed())
532         log.msg('Response Time is: %r' % self.client.responseTime())
533         
534     def test_peer_info(self):
535         """Test retrieving the peer info during a download."""
536         host = 'www.ietf.org'
537         self.client = Peer(host, 80)
538         self.timeout = 120
539         lastDefer = defer.Deferred()
540         
541         def newRequest(path, num, expect, last=False):
542             d = self.client.get(path)
543             d.addCallback(self.gotResp, num, expect)
544             if last:
545                 d.addBoth(lastDefer.callback)
546                 
547         newRequest("/rfc/rfc0006.txt", 1, 1776)
548         newRequest("/rfc/rfc2362.txt", 2, 159833)
549         newRequest("/rfc/rfc0801.txt", 3, 40824)
550         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
551         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
552         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
553         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
554         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
555         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
556         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
557         
558         for i in xrange(2, 122, 2):
559             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
560         
561         return lastDefer
562         
563     def test_range(self):
564         """Test a Range request."""
565         host = 'www.ietf.org'
566         self.client = Peer(host, 80)
567         self.timeout = 10
568         
569         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
570         d.addCallback(self.gotResp, 1, 100)
571         return d
572         
573     def tearDown(self):
574         for p in self.pending_calls:
575             if p.active():
576                 p.cancel()
577         self.pending_calls = []
578         if self.client:
579             self.client.close()
580             self.client = None