More strict use of errbacks when using deferreds.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.mirror = False
35         self.rank = 0.5
36         self.busy = False
37         self.pipeline = False
38         self.closed = True
39         self.connecting = False
40         self.request_queue = []
41         self.response_queue = []
42         self.proto = None
43         self.connector = None
44         self._errors = 0
45         self._completed = 0
46         self._downloadSpeeds = []
47         self._lastResponse = None
48         self._responseTimes = []
49     
50     def __repr__(self):
51         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
52         
53     #{ Manage the request queue
54     def connect(self):
55         """Connect to the peer."""
56         assert self.closed and not self.connecting
57         self.connecting = True
58         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
59         d.addCallbacks(self.connected, self.connectionError)
60
61     def connected(self, proto):
62         """Begin processing the queued requests."""
63         self.closed = False
64         self.connecting = False
65         self.proto = proto
66         self.processQueue()
67         
68     def connectionError(self, err):
69         """Cancel the requests."""
70         log.msg('Failed to connect to the peer by HTTP.')
71         log.err(err)
72
73         # Remove one request so that we don't loop indefinitely
74         if self.request_queue:
75             req = self.request_queue.pop(0)
76             req.deferRequest.errback(err)
77             
78         self._completed += 1
79         self._errors += 1
80         self.rerank()
81         if self.connecting:
82             self.connecting = False
83             self.clientGone(None)
84         
85     def close(self):
86         """Close the connection to the peer."""
87         if not self.closed:
88             self.proto.transport.loseConnection()
89
90     def submitRequest(self, request):
91         """Add a new request to the queue.
92         
93         @type request: L{twisted.web2.client.http.ClientRequest}
94         @return: deferred that will fire with the completed request
95         """
96         request.submissionTime = datetime.now()
97         request.deferRequest = defer.Deferred()
98         self.request_queue.append(request)
99         self.rerank()
100         self.processQueue()
101         return request.deferRequest
102
103     def processQueue(self):
104         """Check the queue to see if new requests can be sent to the peer."""
105         if not self.request_queue:
106             return
107         if self.connecting:
108             return
109         if self.closed:
110             self.connect()
111             return
112         if self.busy and not self.pipeline:
113             return
114         if self.response_queue and not self.pipeline:
115             return
116
117         req = self.request_queue.pop(0)
118         self.response_queue.append(req)
119         self.rerank()
120         req.deferResponse = self.proto.submitRequest(req, False)
121         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
122
123     def requestComplete(self, resp):
124         """Process a completed request."""
125         self._processLastResponse()
126         req = self.response_queue.pop(0)
127         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
128         self._completed += 1
129         now = datetime.now()
130         self._responseTimes.append((now, now - req.submissionTime))
131         self._lastResponse = (now, resp.stream.length)
132         self.rerank()
133         req.deferRequest.callback(resp)
134
135     def requestError(self, error):
136         """Process a request that ended with an error."""
137         self._processLastResponse()
138         req = self.response_queue.pop(0)
139         log.msg('Download of %s generated error %r' % (req.uri, error))
140         self._completed += 1
141         self._errors += 1
142         self.rerank()
143         req.deferRequest.errback(error)
144         
145     def hashError(self, error):
146         """Log that a hash error occurred from the peer."""
147         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
148         self._errors += 1
149         self.rerank()
150
151     #{ IHTTPClientManager interface
152     def clientBusy(self, proto):
153         """Save the busy state."""
154         self.busy = True
155
156     def clientIdle(self, proto):
157         """Try to send a new request."""
158         self._processLastResponse()
159         self.busy = False
160         self.processQueue()
161         self.rerank()
162
163     def clientPipelining(self, proto):
164         """Try to send a new request."""
165         self.pipeline = True
166         self.processQueue()
167
168     def clientGone(self, proto):
169         """Mark sent requests as errors."""
170         self._processLastResponse()
171         for req in self.response_queue:
172             req.deferRequest.errback(ProtocolError('lost connection'))
173         self.busy = False
174         self.pipeline = False
175         self.closed = True
176         self.connecting = False
177         self.response_queue = []
178         self.proto = None
179         self.rerank()
180         if self.request_queue:
181             self.processQueue()
182             
183     #{ Downloading request interface
184     def setCommonHeaders(self):
185         """Get the common HTTP headers for all requests."""
186         headers = http_headers.Headers()
187         headers.setHeader('Host', self.host)
188         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
189                           (version.short(), twisted_version.short(), web2_version.short()))
190         return headers
191     
192     def get(self, path, method="GET", modtime=None):
193         """Add a new request to the queue.
194         
195         @type path: C{string}
196         @param path: the path to request from the peer
197         @type method: C{string}
198         @param method: the HTTP method to use, 'GET' or 'HEAD'
199             (optional, defaults to 'GET')
200         @type modtime: C{int}
201         @param modtime: the modification time to use for an 'If-Modified-Since'
202             header, as seconds since the epoch
203             (optional, defaults to not sending that header)
204         """
205         headers = self.setCommonHeaders()
206         if modtime:
207             headers.setHeader('If-Modified-Since', modtime)
208         return self.submitRequest(ClientRequest(method, path, headers, None))
209     
210     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
211         """Add a new request with a Range header to the queue.
212         
213         @type path: C{string}
214         @param path: the path to request from the peer
215         @type rangeStart: C{int}
216         @param rangeStart: the byte to begin the request at
217         @type rangeEnd: C{int}
218         @param rangeEnd: the byte to end the request at (inclusive)
219         @type method: C{string}
220         @param method: the HTTP method to use, 'GET' or 'HEAD'
221             (optional, defaults to 'GET')
222         """
223         headers = self.setCommonHeaders()
224         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
225         return self.submitRequest(ClientRequest(method, path, headers, None))
226     
227     #{ Peer information
228     def isIdle(self):
229         """Check whether the peer is idle or not."""
230         return not self.busy and not self.request_queue and not self.response_queue
231     
232     def _processLastResponse(self):
233         """Save the download time of the last request for speed calculations."""
234         if self._lastResponse is not None:
235             now = datetime.now()
236             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
237             self._lastResponse = None
238             
239     def downloadSpeed(self):
240         """Gets the latest average download speed for the peer.
241         
242         The average is over the last 10 responses that occurred in the last hour.
243         """
244         total_time = 0.0
245         total_download = 0
246         now = datetime.now()
247         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
248                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
249             self._downloadSpeeds.pop(0)
250
251         # If there are none, then you get 0
252         if not self._downloadSpeeds:
253             return 0.0
254         
255         for download in self._downloadSpeeds:
256             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
257             total_download += download[2]
258
259         return total_download / total_time
260     
261     def responseTime(self):
262         """Gets the latest average response time for the peer.
263         
264         Response time is the time from receiving the request, to the time
265         the download begins. The average is over the last 10 responses that
266         occurred in the last hour.
267         """
268         total_response = 0.0
269         now = datetime.now()
270         while self._responseTimes and (len(self._responseTimes) > 10 or 
271                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
272             self._responseTimes.pop(0)
273
274         # If there are none, give it the benefit of the doubt
275         if not self._responseTimes:
276             return 0.0
277
278         for response in self._responseTimes:
279             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
280
281         return total_response / len(self._responseTimes)
282     
283     def rerank(self):
284         """Determine the ranking value for the peer.
285         
286         The ranking value is composed of 5 numbers, each exponentially
287         decreasing from 1 to 0 based on:
288          - if a connection to the peer is open
289          - the number of pending requests
290          - the time to download a single piece
291          - the number of errors
292          - the response time
293         """
294         rank = 1.0
295         if self.closed:
296             rank *= 0.9
297         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
298         speed = self.downloadSpeed()
299         if speed > 0.0:
300             rank *= exp(-512.0*1024 / speed)
301         if self._completed:
302             rank *= exp(-float(self._errors) / self._completed)
303         rank *= exp(-self.responseTime() / 5.0)
304         self.rank = rank
305         
306 class TestClientManager(unittest.TestCase):
307     """Unit tests for the Peer."""
308     
309     client = None
310     pending_calls = []
311     
312     def gotResp(self, resp, num, expect):
313         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
314         if expect is not None:
315             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
316         def print_(n):
317             pass
318         def printdone(n):
319             pass
320         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
321     
322     def test_download(self):
323         """Tests a normal download."""
324         host = 'www.ietf.org'
325         self.client = Peer(host, 80)
326         self.timeout = 10
327         
328         d = self.client.get('/rfc/rfc0013.txt')
329         d.addCallback(self.gotResp, 1, 1070)
330         return d
331         
332     def test_head(self):
333         """Tests a 'HEAD' request."""
334         host = 'www.ietf.org'
335         self.client = Peer(host, 80)
336         self.timeout = 10
337         
338         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
339         d.addCallback(self.gotResp, 1, 0)
340         return d
341         
342     def test_multiple_downloads(self):
343         """Tests multiple downloads with queueing and connection closing."""
344         host = 'www.ietf.org'
345         self.client = Peer(host, 80)
346         self.timeout = 120
347         lastDefer = defer.Deferred()
348         
349         def newRequest(path, num, expect, last=False):
350             d = self.client.get(path)
351             d.addCallback(self.gotResp, num, expect)
352             if last:
353                 d.addBoth(lastDefer.callback)
354
355         # 3 quick requests
356         newRequest("/rfc/rfc0006.txt", 1, 1776)
357         newRequest("/rfc/rfc2362.txt", 2, 159833)
358         newRequest("/rfc/rfc0801.txt", 3, 40824)
359         
360         # This one will probably be queued
361         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
362         
363         # Connection should still be open, but idle
364         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
365         
366         #Connection should be closed
367         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
368         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
369         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
370         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
371         
372         # Now it should definitely be closed
373         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
374         return lastDefer
375         
376     def test_multiple_quick_downloads(self):
377         """Tests lots of multiple downloads with queueing."""
378         host = 'www.ietf.org'
379         self.client = Peer(host, 80)
380         self.timeout = 30
381         lastDefer = defer.Deferred()
382         
383         def newRequest(path, num, expect, last=False):
384             d = self.client.get(path)
385             d.addCallback(self.gotResp, num, expect)
386             if last:
387                 d.addBoth(lastDefer.callback)
388                 
389         newRequest("/rfc/rfc0006.txt", 1, 1776)
390         newRequest("/rfc/rfc2362.txt", 2, 159833)
391         newRequest("/rfc/rfc0801.txt", 3, 40824)
392         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
393         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
394         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
395         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
396         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
397         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
398         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
399         return lastDefer
400         
401     def checkInfo(self):
402         log.msg('Rank is: %r' % self.client.rank)
403         log.msg('Download speed is: %r' % self.client.downloadSpeed())
404         log.msg('Response Time is: %r' % self.client.responseTime())
405         
406     def test_peer_info(self):
407         """Test retrieving the peer info during a download."""
408         host = 'www.ietf.org'
409         self.client = Peer(host, 80)
410         self.timeout = 120
411         lastDefer = defer.Deferred()
412         
413         def newRequest(path, num, expect, last=False):
414             d = self.client.get(path)
415             d.addCallback(self.gotResp, num, expect)
416             if last:
417                 d.addBoth(lastDefer.callback)
418                 
419         newRequest("/rfc/rfc0006.txt", 1, 1776)
420         newRequest("/rfc/rfc2362.txt", 2, 159833)
421         newRequest("/rfc/rfc0801.txt", 3, 40824)
422         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
423         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
424         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
425         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
426         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
427         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
428         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
429         
430         for i in xrange(2, 122, 2):
431             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
432         
433         return lastDefer
434         
435     def test_range(self):
436         """Test a Range request."""
437         host = 'www.ietf.org'
438         self.client = Peer(host, 80)
439         self.timeout = 10
440         
441         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
442         d.addCallback(self.gotResp, 1, 100)
443         return d
444         
445     def tearDown(self):
446         for p in self.pending_calls:
447             if p.active():
448                 p.cancel()
449         self.pending_calls = []
450         if self.client:
451             self.client.close()
452             self.client = None