97346db878aa5adfdb06feb84c68b1dad11eb92e
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21     """A modified client protocol that logs the number of bytes received."""
22     
23     def __init__(self, factory, stats = None, mirror = False):
24         HTTPClientProtocol.__init__(self, factory)
25         self.stats = stats
26         self.mirror = mirror
27     
28     def lineReceived(self, line):
29         if self.stats:
30             self.stats.receivedBytes(len(line) + 2, self.mirror)
31         HTTPClientProtocol.lineReceived(self, line)
32
33     def rawDataReceived(self, data):
34         if self.stats:
35             self.stats.receivedBytes(len(data), self.mirror)
36         HTTPClientProtocol.rawDataReceived(self, data)
37
38 class Peer(ClientFactory):
39     """A manager for all HTTP requests to a single peer.
40     
41     Controls all requests that go to a single peer (host and port).
42     This includes buffering requests until they can be sent and reconnecting
43     in the event of the connection being closed.
44     
45     """
46
47     implements(IHTTPClientManager)
48     
49     def __init__(self, host, port = 80, stats = None):
50         self.host = host
51         self.port = port
52         self.stats = stats
53         self.mirror = False
54         self.rank = 0.1
55         self.busy = False
56         self.pipeline = False
57         self.closed = True
58         self.connecting = False
59         self.request_queue = []
60         self.response_queue = []
61         self.proto = None
62         self.connector = None
63         self._errors = 0
64         self._completed = 0
65         self._downloadSpeeds = []
66         self._lastResponse = None
67         self._responseTimes = []
68     
69     def __repr__(self):
70         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
71         
72     #{ Manage the request queue
73     def connect(self):
74         """Connect to the peer."""
75         assert self.closed and not self.connecting
76         self.connecting = True
77         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79         d.addCallbacks(self.connected, self.connectionError)
80
81     def connected(self, proto):
82         """Begin processing the queued requests."""
83         self.closed = False
84         self.connecting = False
85         self.proto = proto
86         self.processQueue()
87         
88     def connectionError(self, err):
89         """Cancel the requests."""
90         log.msg('Failed to connect to the peer by HTTP.')
91         log.err(err)
92
93         # Remove one request so that we don't loop indefinitely
94         if self.request_queue:
95             req = self.request_queue.pop(0)
96             req.deferRequest.errback(err)
97             
98         self._completed += 1
99         self._errors += 1
100         self.rerank()
101         if self.connecting:
102             self.connecting = False
103             self.clientGone(None)
104         
105     def close(self):
106         """Close the connection to the peer."""
107         if not self.closed:
108             self.proto.transport.loseConnection()
109
110     def submitRequest(self, request):
111         """Add a new request to the queue.
112         
113         @type request: L{twisted.web2.client.http.ClientRequest}
114         @return: deferred that will fire with the completed request
115         """
116         request.submissionTime = datetime.now()
117         request.deferRequest = defer.Deferred()
118         self.request_queue.append(request)
119         self.rerank()
120         self.processQueue()
121         return request.deferRequest
122
123     def processQueue(self):
124         """Check the queue to see if new requests can be sent to the peer."""
125         if not self.request_queue:
126             return
127         if self.connecting:
128             return
129         if self.closed:
130             self.connect()
131             return
132         if self.busy and not self.pipeline:
133             return
134         if self.response_queue and not self.pipeline:
135             return
136
137         req = self.request_queue.pop(0)
138         self.response_queue.append(req)
139         self.rerank()
140         req.deferResponse = self.proto.submitRequest(req, False)
141         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
142
143     def requestComplete(self, resp):
144         """Process a completed request."""
145         self._processLastResponse()
146         req = self.response_queue.pop(0)
147         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
148         self._completed += 1
149         now = datetime.now()
150         self._responseTimes.append((now, now - req.submissionTime))
151         self._lastResponse = (now, resp.stream.length)
152         self.rerank()
153         req.deferRequest.callback(resp)
154
155     def requestError(self, error):
156         """Process a request that ended with an error."""
157         self._processLastResponse()
158         req = self.response_queue.pop(0)
159         log.msg('Download of %s generated error %r' % (req.uri, error))
160         self._completed += 1
161         self._errors += 1
162         self.rerank()
163         req.deferRequest.errback(error)
164         
165     def hashError(self, error):
166         """Log that a hash error occurred from the peer."""
167         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
168         self._errors += 1
169         self.rerank()
170
171     #{ IHTTPClientManager interface
172     def clientBusy(self, proto):
173         """Save the busy state."""
174         self.busy = True
175
176     def clientIdle(self, proto):
177         """Try to send a new request."""
178         self._processLastResponse()
179         self.busy = False
180         self.processQueue()
181         self.rerank()
182
183     def clientPipelining(self, proto):
184         """Try to send a new request."""
185         self.pipeline = True
186         self.processQueue()
187
188     def clientGone(self, proto):
189         """Mark sent requests as errors."""
190         self._processLastResponse()
191         for req in self.response_queue:
192             reactor.callLater(0, req.deferRequest.errback,
193                                  ProtocolError('lost connection'))
194         self.busy = False
195         self.pipeline = False
196         self.closed = True
197         self.connecting = False
198         self.response_queue = []
199         self.proto = None
200         self.rerank()
201         if self.request_queue:
202             self.processQueue()
203             
204     #{ Downloading request interface
205     def setCommonHeaders(self):
206         """Get the common HTTP headers for all requests."""
207         headers = http_headers.Headers()
208         headers.setHeader('Host', self.host)
209         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
210                           (version.short(), twisted_version.short(), web2_version.short()))
211         return headers
212     
213     def get(self, path, method="GET", modtime=None):
214         """Add a new request to the queue.
215         
216         @type path: C{string}
217         @param path: the path to request from the peer
218         @type method: C{string}
219         @param method: the HTTP method to use, 'GET' or 'HEAD'
220             (optional, defaults to 'GET')
221         @type modtime: C{int}
222         @param modtime: the modification time to use for an 'If-Modified-Since'
223             header, as seconds since the epoch
224             (optional, defaults to not sending that header)
225         """
226         headers = self.setCommonHeaders()
227         if modtime:
228             headers.setHeader('If-Modified-Since', modtime)
229         return self.submitRequest(ClientRequest(method, path, headers, None))
230     
231     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
232         """Add a new request with a Range header to the queue.
233         
234         @type path: C{string}
235         @param path: the path to request from the peer
236         @type rangeStart: C{int}
237         @param rangeStart: the byte to begin the request at
238         @type rangeEnd: C{int}
239         @param rangeEnd: the byte to end the request at (inclusive)
240         @type method: C{string}
241         @param method: the HTTP method to use, 'GET' or 'HEAD'
242             (optional, defaults to 'GET')
243         """
244         headers = self.setCommonHeaders()
245         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
246         return self.submitRequest(ClientRequest(method, path, headers, None))
247     
248     #{ Peer information
249     def isIdle(self):
250         """Check whether the peer is idle or not."""
251         return not self.busy and not self.request_queue and not self.response_queue
252     
253     def _processLastResponse(self):
254         """Save the download time of the last request for speed calculations."""
255         if self._lastResponse is not None:
256             now = datetime.now()
257             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
258             self._lastResponse = None
259             
260     def downloadSpeed(self):
261         """Gets the latest average download speed for the peer.
262         
263         The average is over the last 10 responses that occurred in the last hour.
264         """
265         total_time = 0.0
266         total_download = 0
267         now = datetime.now()
268         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
269                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
270             self._downloadSpeeds.pop(0)
271
272         # If there are none, then you get 0
273         if not self._downloadSpeeds:
274             return 0.0
275         
276         for download in self._downloadSpeeds:
277             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
278             total_download += download[2]
279
280         return total_download / total_time
281     
282     def responseTime(self):
283         """Gets the latest average response time for the peer.
284         
285         Response time is the time from receiving the request, to the time
286         the download begins. The average is over the last 10 responses that
287         occurred in the last hour.
288         """
289         total_response = 0.0
290         now = datetime.now()
291         while self._responseTimes and (len(self._responseTimes) > 10 or 
292                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
293             self._responseTimes.pop(0)
294
295         # If there are none, give it the benefit of the doubt
296         if not self._responseTimes:
297             return 0.0
298
299         for response in self._responseTimes:
300             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
301
302         return total_response / len(self._responseTimes)
303     
304     def rerank(self):
305         """Determine the ranking value for the peer.
306         
307         The ranking value is composed of 5 numbers, each exponentially
308         decreasing from 1 to 0 based on:
309          - if a connection to the peer is open
310          - the number of pending requests
311          - the time to download a single piece
312          - the number of errors
313          - the response time
314         """
315         rank = 1.0
316         if self.closed:
317             rank *= 0.9
318         rank *= exp(-(len(self.request_queue) + len(self.response_queue)))
319         speed = self.downloadSpeed()
320         if speed > 0.0:
321             rank *= exp(-512.0*1024 / speed)
322         if self._completed:
323             rank *= exp(-10.0 * self._errors / self._completed)
324         rank *= exp(-self.responseTime() / 5.0)
325         self.rank = rank
326         
327 class TestClientManager(unittest.TestCase):
328     """Unit tests for the Peer."""
329     
330     client = None
331     pending_calls = []
332     
333     def gotResp(self, resp, num, expect):
334         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
335         if expect is not None:
336             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
337         def print_(n):
338             pass
339         def printdone(n):
340             pass
341         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
342     
343     def test_download(self):
344         """Tests a normal download."""
345         host = 'www.ietf.org'
346         self.client = Peer(host, 80)
347         self.timeout = 10
348         
349         d = self.client.get('/rfc/rfc0013.txt')
350         d.addCallback(self.gotResp, 1, 1070)
351         return d
352         
353     def test_head(self):
354         """Tests a 'HEAD' request."""
355         host = 'www.ietf.org'
356         self.client = Peer(host, 80)
357         self.timeout = 10
358         
359         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
360         d.addCallback(self.gotResp, 1, 0)
361         return d
362         
363     def test_multiple_downloads(self):
364         """Tests multiple downloads with queueing and connection closing."""
365         host = 'www.ietf.org'
366         self.client = Peer(host, 80)
367         self.timeout = 120
368         lastDefer = defer.Deferred()
369         
370         def newRequest(path, num, expect, last=False):
371             d = self.client.get(path)
372             d.addCallback(self.gotResp, num, expect)
373             if last:
374                 d.addBoth(lastDefer.callback)
375
376         # 3 quick requests
377         newRequest("/rfc/rfc0006.txt", 1, 1776)
378         newRequest("/rfc/rfc2362.txt", 2, 159833)
379         newRequest("/rfc/rfc0801.txt", 3, 40824)
380         
381         # This one will probably be queued
382         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
383         
384         # Connection should still be open, but idle
385         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
386         
387         #Connection should be closed
388         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
389         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
390         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
391         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
392         
393         # Now it should definitely be closed
394         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
395         return lastDefer
396         
397     def test_multiple_quick_downloads(self):
398         """Tests lots of multiple downloads with queueing."""
399         host = 'www.ietf.org'
400         self.client = Peer(host, 80)
401         self.timeout = 30
402         lastDefer = defer.Deferred()
403         
404         def newRequest(path, num, expect, last=False):
405             d = self.client.get(path)
406             d.addCallback(self.gotResp, num, expect)
407             if last:
408                 d.addBoth(lastDefer.callback)
409                 
410         newRequest("/rfc/rfc0006.txt", 1, 1776)
411         newRequest("/rfc/rfc2362.txt", 2, 159833)
412         newRequest("/rfc/rfc0801.txt", 3, 40824)
413         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
414         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
415         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
416         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
417         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
418         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
419         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
420         return lastDefer
421         
422     def checkInfo(self):
423         log.msg('Rank is: %r' % self.client.rank)
424         log.msg('Download speed is: %r' % self.client.downloadSpeed())
425         log.msg('Response Time is: %r' % self.client.responseTime())
426         
427     def test_peer_info(self):
428         """Test retrieving the peer info during a download."""
429         host = 'www.ietf.org'
430         self.client = Peer(host, 80)
431         self.timeout = 120
432         lastDefer = defer.Deferred()
433         
434         def newRequest(path, num, expect, last=False):
435             d = self.client.get(path)
436             d.addCallback(self.gotResp, num, expect)
437             if last:
438                 d.addBoth(lastDefer.callback)
439                 
440         newRequest("/rfc/rfc0006.txt", 1, 1776)
441         newRequest("/rfc/rfc2362.txt", 2, 159833)
442         newRequest("/rfc/rfc0801.txt", 3, 40824)
443         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
444         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
445         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
446         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
447         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
448         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
449         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
450         
451         for i in xrange(2, 122, 2):
452             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
453         
454         return lastDefer
455         
456     def test_range(self):
457         """Test a Range request."""
458         host = 'www.ietf.org'
459         self.client = Peer(host, 80)
460         self.timeout = 10
461         
462         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
463         d.addCallback(self.gotResp, 1, 100)
464         return d
465         
466     def tearDown(self):
467         for p in self.pending_calls:
468             if p.active():
469                 p.cancel()
470         self.pending_calls = []
471         if self.client:
472             self.client.close()
473             self.client = None