Make the downloader statistics work.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21     """A modified client protocol that logs the number of bytes received."""
22     
23     def __init__(self, factory, stats = None, mirror = False):
24         HTTPClientProtocol.__init__(self, factory)
25         self.stats = stats
26         self.mirror = mirror
27     
28     def lineReceived(self, line):
29         if self.stats:
30             self.stats.receivedBytes(len(line) + 2, self.mirror)
31         HTTPClientProtocol.lineReceived(self, line)
32
33     def rawDataReceived(self, data):
34         if self.stats:
35             self.stats.receivedBytes(len(data), self.mirror)
36         HTTPClientProtocol.rawDataReceived(self, data)
37
38 class Peer(ClientFactory):
39     """A manager for all HTTP requests to a single peer.
40     
41     Controls all requests that go to a single peer (host and port).
42     This includes buffering requests until they can be sent and reconnecting
43     in the event of the connection being closed.
44     
45     """
46
47     implements(IHTTPClientManager)
48     
49     def __init__(self, host, port = 80, stats = None):
50         self.host = host
51         self.port = port
52         self.stats = stats
53         self.mirror = False
54         self.rank = 0.5
55         self.busy = False
56         self.pipeline = False
57         self.closed = True
58         self.connecting = False
59         self.request_queue = []
60         self.response_queue = []
61         self.proto = None
62         self.connector = None
63         self._errors = 0
64         self._completed = 0
65         self._downloadSpeeds = []
66         self._lastResponse = None
67         self._responseTimes = []
68     
69     def __repr__(self):
70         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
71         
72     #{ Manage the request queue
73     def connect(self):
74         """Connect to the peer."""
75         assert self.closed and not self.connecting
76         self.connecting = True
77         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79         d.addCallbacks(self.connected, self.connectionError)
80
81     def connected(self, proto):
82         """Begin processing the queued requests."""
83         self.closed = False
84         self.connecting = False
85         self.proto = proto
86         self.processQueue()
87         
88     def connectionError(self, err):
89         """Cancel the requests."""
90         log.msg('Failed to connect to the peer by HTTP.')
91         log.err(err)
92
93         # Remove one request so that we don't loop indefinitely
94         if self.request_queue:
95             req = self.request_queue.pop(0)
96             req.deferRequest.errback(err)
97             
98         self._completed += 1
99         self._errors += 1
100         self.rerank()
101         if self.connecting:
102             self.connecting = False
103             self.clientGone(None)
104         
105     def close(self):
106         """Close the connection to the peer."""
107         if not self.closed:
108             self.proto.transport.loseConnection()
109
110     def submitRequest(self, request):
111         """Add a new request to the queue.
112         
113         @type request: L{twisted.web2.client.http.ClientRequest}
114         @return: deferred that will fire with the completed request
115         """
116         request.submissionTime = datetime.now()
117         request.deferRequest = defer.Deferred()
118         self.request_queue.append(request)
119         self.rerank()
120         self.processQueue()
121         return request.deferRequest
122
123     def processQueue(self):
124         """Check the queue to see if new requests can be sent to the peer."""
125         if not self.request_queue:
126             return
127         if self.connecting:
128             return
129         if self.closed:
130             self.connect()
131             return
132         if self.busy and not self.pipeline:
133             return
134         if self.response_queue and not self.pipeline:
135             return
136
137         req = self.request_queue.pop(0)
138         self.response_queue.append(req)
139         self.rerank()
140         req.deferResponse = self.proto.submitRequest(req, False)
141         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
142
143     def requestComplete(self, resp):
144         """Process a completed request."""
145         self._processLastResponse()
146         req = self.response_queue.pop(0)
147         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
148         self._completed += 1
149         now = datetime.now()
150         self._responseTimes.append((now, now - req.submissionTime))
151         self._lastResponse = (now, resp.stream.length)
152         self.rerank()
153         req.deferRequest.callback(resp)
154
155     def requestError(self, error):
156         """Process a request that ended with an error."""
157         self._processLastResponse()
158         req = self.response_queue.pop(0)
159         log.msg('Download of %s generated error %r' % (req.uri, error))
160         self._completed += 1
161         self._errors += 1
162         self.rerank()
163         req.deferRequest.errback(error)
164         
165     def hashError(self, error):
166         """Log that a hash error occurred from the peer."""
167         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
168         self._errors += 1
169         self.rerank()
170
171     #{ IHTTPClientManager interface
172     def clientBusy(self, proto):
173         """Save the busy state."""
174         self.busy = True
175
176     def clientIdle(self, proto):
177         """Try to send a new request."""
178         self._processLastResponse()
179         self.busy = False
180         self.processQueue()
181         self.rerank()
182
183     def clientPipelining(self, proto):
184         """Try to send a new request."""
185         self.pipeline = True
186         self.processQueue()
187
188     def clientGone(self, proto):
189         """Mark sent requests as errors."""
190         self._processLastResponse()
191         for req in self.response_queue:
192             req.deferRequest.errback(ProtocolError('lost connection'))
193         self.busy = False
194         self.pipeline = False
195         self.closed = True
196         self.connecting = False
197         self.response_queue = []
198         self.proto = None
199         self.rerank()
200         if self.request_queue:
201             self.processQueue()
202             
203     #{ Downloading request interface
204     def setCommonHeaders(self):
205         """Get the common HTTP headers for all requests."""
206         headers = http_headers.Headers()
207         headers.setHeader('Host', self.host)
208         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
209                           (version.short(), twisted_version.short(), web2_version.short()))
210         return headers
211     
212     def get(self, path, method="GET", modtime=None):
213         """Add a new request to the queue.
214         
215         @type path: C{string}
216         @param path: the path to request from the peer
217         @type method: C{string}
218         @param method: the HTTP method to use, 'GET' or 'HEAD'
219             (optional, defaults to 'GET')
220         @type modtime: C{int}
221         @param modtime: the modification time to use for an 'If-Modified-Since'
222             header, as seconds since the epoch
223             (optional, defaults to not sending that header)
224         """
225         headers = self.setCommonHeaders()
226         if modtime:
227             headers.setHeader('If-Modified-Since', modtime)
228         return self.submitRequest(ClientRequest(method, path, headers, None))
229     
230     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
231         """Add a new request with a Range header to the queue.
232         
233         @type path: C{string}
234         @param path: the path to request from the peer
235         @type rangeStart: C{int}
236         @param rangeStart: the byte to begin the request at
237         @type rangeEnd: C{int}
238         @param rangeEnd: the byte to end the request at (inclusive)
239         @type method: C{string}
240         @param method: the HTTP method to use, 'GET' or 'HEAD'
241             (optional, defaults to 'GET')
242         """
243         headers = self.setCommonHeaders()
244         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
245         return self.submitRequest(ClientRequest(method, path, headers, None))
246     
247     #{ Peer information
248     def isIdle(self):
249         """Check whether the peer is idle or not."""
250         return not self.busy and not self.request_queue and not self.response_queue
251     
252     def _processLastResponse(self):
253         """Save the download time of the last request for speed calculations."""
254         if self._lastResponse is not None:
255             now = datetime.now()
256             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
257             self._lastResponse = None
258             
259     def downloadSpeed(self):
260         """Gets the latest average download speed for the peer.
261         
262         The average is over the last 10 responses that occurred in the last hour.
263         """
264         total_time = 0.0
265         total_download = 0
266         now = datetime.now()
267         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
268                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
269             self._downloadSpeeds.pop(0)
270
271         # If there are none, then you get 0
272         if not self._downloadSpeeds:
273             return 0.0
274         
275         for download in self._downloadSpeeds:
276             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
277             total_download += download[2]
278
279         return total_download / total_time
280     
281     def responseTime(self):
282         """Gets the latest average response time for the peer.
283         
284         Response time is the time from receiving the request, to the time
285         the download begins. The average is over the last 10 responses that
286         occurred in the last hour.
287         """
288         total_response = 0.0
289         now = datetime.now()
290         while self._responseTimes and (len(self._responseTimes) > 10 or 
291                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
292             self._responseTimes.pop(0)
293
294         # If there are none, give it the benefit of the doubt
295         if not self._responseTimes:
296             return 0.0
297
298         for response in self._responseTimes:
299             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
300
301         return total_response / len(self._responseTimes)
302     
303     def rerank(self):
304         """Determine the ranking value for the peer.
305         
306         The ranking value is composed of 5 numbers, each exponentially
307         decreasing from 1 to 0 based on:
308          - if a connection to the peer is open
309          - the number of pending requests
310          - the time to download a single piece
311          - the number of errors
312          - the response time
313         """
314         rank = 1.0
315         if self.closed:
316             rank *= 0.9
317         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
318         speed = self.downloadSpeed()
319         if speed > 0.0:
320             rank *= exp(-512.0*1024 / speed)
321         if self._completed:
322             rank *= exp(-float(self._errors) / self._completed)
323         rank *= exp(-self.responseTime() / 5.0)
324         self.rank = rank
325         
326 class TestClientManager(unittest.TestCase):
327     """Unit tests for the Peer."""
328     
329     client = None
330     pending_calls = []
331     
332     def gotResp(self, resp, num, expect):
333         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
334         if expect is not None:
335             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
336         def print_(n):
337             pass
338         def printdone(n):
339             pass
340         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
341     
342     def test_download(self):
343         """Tests a normal download."""
344         host = 'www.ietf.org'
345         self.client = Peer(host, 80)
346         self.timeout = 10
347         
348         d = self.client.get('/rfc/rfc0013.txt')
349         d.addCallback(self.gotResp, 1, 1070)
350         return d
351         
352     def test_head(self):
353         """Tests a 'HEAD' request."""
354         host = 'www.ietf.org'
355         self.client = Peer(host, 80)
356         self.timeout = 10
357         
358         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
359         d.addCallback(self.gotResp, 1, 0)
360         return d
361         
362     def test_multiple_downloads(self):
363         """Tests multiple downloads with queueing and connection closing."""
364         host = 'www.ietf.org'
365         self.client = Peer(host, 80)
366         self.timeout = 120
367         lastDefer = defer.Deferred()
368         
369         def newRequest(path, num, expect, last=False):
370             d = self.client.get(path)
371             d.addCallback(self.gotResp, num, expect)
372             if last:
373                 d.addBoth(lastDefer.callback)
374
375         # 3 quick requests
376         newRequest("/rfc/rfc0006.txt", 1, 1776)
377         newRequest("/rfc/rfc2362.txt", 2, 159833)
378         newRequest("/rfc/rfc0801.txt", 3, 40824)
379         
380         # This one will probably be queued
381         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
382         
383         # Connection should still be open, but idle
384         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
385         
386         #Connection should be closed
387         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
388         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
389         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
390         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
391         
392         # Now it should definitely be closed
393         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
394         return lastDefer
395         
396     def test_multiple_quick_downloads(self):
397         """Tests lots of multiple downloads with queueing."""
398         host = 'www.ietf.org'
399         self.client = Peer(host, 80)
400         self.timeout = 30
401         lastDefer = defer.Deferred()
402         
403         def newRequest(path, num, expect, last=False):
404             d = self.client.get(path)
405             d.addCallback(self.gotResp, num, expect)
406             if last:
407                 d.addBoth(lastDefer.callback)
408                 
409         newRequest("/rfc/rfc0006.txt", 1, 1776)
410         newRequest("/rfc/rfc2362.txt", 2, 159833)
411         newRequest("/rfc/rfc0801.txt", 3, 40824)
412         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
413         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
414         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
415         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
416         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
417         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
418         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
419         return lastDefer
420         
421     def checkInfo(self):
422         log.msg('Rank is: %r' % self.client.rank)
423         log.msg('Download speed is: %r' % self.client.downloadSpeed())
424         log.msg('Response Time is: %r' % self.client.responseTime())
425         
426     def test_peer_info(self):
427         """Test retrieving the peer info during a download."""
428         host = 'www.ietf.org'
429         self.client = Peer(host, 80)
430         self.timeout = 120
431         lastDefer = defer.Deferred()
432         
433         def newRequest(path, num, expect, last=False):
434             d = self.client.get(path)
435             d.addCallback(self.gotResp, num, expect)
436             if last:
437                 d.addBoth(lastDefer.callback)
438                 
439         newRequest("/rfc/rfc0006.txt", 1, 1776)
440         newRequest("/rfc/rfc2362.txt", 2, 159833)
441         newRequest("/rfc/rfc0801.txt", 3, 40824)
442         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
443         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
444         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
445         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
446         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
447         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
448         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
449         
450         for i in xrange(2, 122, 2):
451             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
452         
453         return lastDefer
454         
455     def test_range(self):
456         """Test a Range request."""
457         host = 'www.ietf.org'
458         self.client = Peer(host, 80)
459         self.timeout = 10
460         
461         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
462         d.addCallback(self.gotResp, 1, 100)
463         return d
464         
465     def tearDown(self):
466         for p in self.pending_calls:
467             if p.active():
468                 p.cancel()
469         self.pending_calls = []
470         if self.client:
471             self.client.close()
472             self.client = None