057b5a2befdc6783d1c2e295d8c6b9024c782231
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.mirror = False
35         self.rank = 0.5
36         self.busy = False
37         self.pipeline = False
38         self.closed = True
39         self.connecting = False
40         self.request_queue = []
41         self.response_queue = []
42         self.proto = None
43         self.connector = None
44         self._errors = 0
45         self._completed = 0
46         self._downloadSpeeds = []
47         self._lastResponse = None
48         self._responseTimes = []
49     
50     def __repr__(self):
51         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
52         
53     #{ Manage the request queue
54     def connect(self):
55         """Connect to the peer."""
56         assert self.closed and not self.connecting
57         self.connecting = True
58         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
59         d.addCallback(self.connected)
60
61     def connected(self, proto):
62         """Begin processing the queued requests."""
63         self.closed = False
64         self.connecting = False
65         self.proto = proto
66         self.processQueue()
67         
68     def close(self):
69         """Close the connection to the peer."""
70         if not self.closed:
71             self.proto.transport.loseConnection()
72
73     def submitRequest(self, request):
74         """Add a new request to the queue.
75         
76         @type request: L{twisted.web2.client.http.ClientRequest}
77         @return: deferred that will fire with the completed request
78         """
79         request.submissionTime = datetime.now()
80         request.deferRequest = defer.Deferred()
81         self.request_queue.append(request)
82         self.rerank()
83         self.processQueue()
84         return request.deferRequest
85
86     def processQueue(self):
87         """Check the queue to see if new requests can be sent to the peer."""
88         if not self.request_queue:
89             return
90         if self.connecting:
91             return
92         if self.closed:
93             self.connect()
94             return
95         if self.busy and not self.pipeline:
96             return
97         if self.response_queue and not self.pipeline:
98             return
99
100         req = self.request_queue.pop(0)
101         self.response_queue.append(req)
102         self.rerank()
103         req.deferResponse = self.proto.submitRequest(req, False)
104         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
105
106     def requestComplete(self, resp):
107         """Process a completed request."""
108         self._processLastResponse()
109         req = self.response_queue.pop(0)
110         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
111         self._completed += 1
112         now = datetime.now()
113         self._responseTimes.append((now, now - req.submissionTime))
114         self._lastResponse = (now, resp.stream.length)
115         self.rerank()
116         req.deferRequest.callback(resp)
117
118     def requestError(self, error):
119         """Process a request that ended with an error."""
120         self._processLastResponse()
121         req = self.response_queue.pop(0)
122         log.msg('Download of %s generated error %r' % (req.uri, error))
123         self._completed += 1
124         self._errors += 1
125         self.rerank()
126         req.deferRequest.errback(error)
127         
128     def hashError(self, error):
129         """Log that a hash error occurred from the peer."""
130         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
131         self._errors += 1
132         self.rerank()
133
134     #{ IHTTPClientManager interface
135     def clientBusy(self, proto):
136         """Save the busy state."""
137         self.busy = True
138
139     def clientIdle(self, proto):
140         """Try to send a new request."""
141         self._processLastResponse()
142         self.busy = False
143         self.processQueue()
144         self.rerank()
145
146     def clientPipelining(self, proto):
147         """Try to send a new request."""
148         self.pipeline = True
149         self.processQueue()
150
151     def clientGone(self, proto):
152         """Mark sent requests as errors."""
153         self._processLastResponse()
154         for req in self.response_queue:
155             req.deferRequest.errback(ProtocolError('lost connection'))
156         self.busy = False
157         self.pipeline = False
158         self.closed = True
159         self.connecting = False
160         self.response_queue = []
161         self.proto = None
162         self.rerank()
163         if self.request_queue:
164             self.processQueue()
165             
166     #{ Downloading request interface
167     def setCommonHeaders(self):
168         """Get the common HTTP headers for all requests."""
169         headers = http_headers.Headers()
170         headers.setHeader('Host', self.host)
171         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
172                           (version.short(), twisted_version.short(), web2_version.short()))
173         return headers
174     
175     def get(self, path, method="GET", modtime=None):
176         """Add a new request to the queue.
177         
178         @type path: C{string}
179         @param path: the path to request from the peer
180         @type method: C{string}
181         @param method: the HTTP method to use, 'GET' or 'HEAD'
182             (optional, defaults to 'GET')
183         @type modtime: C{int}
184         @param modtime: the modification time to use for an 'If-Modified-Since'
185             header, as seconds since the epoch
186             (optional, defaults to not sending that header)
187         """
188         headers = self.setCommonHeaders()
189         if modtime:
190             headers.setHeader('If-Modified-Since', modtime)
191         return self.submitRequest(ClientRequest(method, path, headers, None))
192     
193     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
194         """Add a new request with a Range header to the queue.
195         
196         @type path: C{string}
197         @param path: the path to request from the peer
198         @type rangeStart: C{int}
199         @param rangeStart: the byte to begin the request at
200         @type rangeEnd: C{int}
201         @param rangeEnd: the byte to end the request at (inclusive)
202         @type method: C{string}
203         @param method: the HTTP method to use, 'GET' or 'HEAD'
204             (optional, defaults to 'GET')
205         """
206         headers = self.setCommonHeaders()
207         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
208         return self.submitRequest(ClientRequest(method, path, headers, None))
209     
210     #{ Peer information
211     def isIdle(self):
212         """Check whether the peer is idle or not."""
213         return not self.busy and not self.request_queue and not self.response_queue
214     
215     def _processLastResponse(self):
216         """Save the download time of the last request for speed calculations."""
217         if self._lastResponse is not None:
218             now = datetime.now()
219             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
220             self._lastResponse = None
221             
222     def downloadSpeed(self):
223         """Gets the latest average download speed for the peer.
224         
225         The average is over the last 10 responses that occurred in the last hour.
226         """
227         total_time = 0.0
228         total_download = 0
229         now = datetime.now()
230         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
231                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
232             self._downloadSpeeds.pop(0)
233
234         # If there are none, then you get 0
235         if not self._downloadSpeeds:
236             return 0.0
237         
238         for download in self._downloadSpeeds:
239             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
240             total_download += download[2]
241
242         return total_download / total_time
243     
244     def responseTime(self):
245         """Gets the latest average response time for the peer.
246         
247         Response time is the time from receiving the request, to the time
248         the download begins. The average is over the last 10 responses that
249         occurred in the last hour.
250         """
251         total_response = 0.0
252         now = datetime.now()
253         while self._responseTimes and (len(self._responseTimes) > 10 or 
254                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
255             self._responseTimes.pop(0)
256
257         # If there are none, give it the benefit of the doubt
258         if not self._responseTimes:
259             return 0.0
260
261         for response in self._responseTimes:
262             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
263
264         return total_response / len(self._responseTimes)
265     
266     def rerank(self):
267         """Determine the ranking value for the peer.
268         
269         The ranking value is composed of 5 numbers, each exponentially
270         decreasing from 1 to 0 based on:
271          - if a connection to the peer is open
272          - the number of pending requests
273          - the time to download a single piece
274          - the number of errors
275          - the response time
276         """
277         rank = 1.0
278         if self.closed:
279             rank *= 0.9
280         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
281         speed = self.downloadSpeed()
282         if speed > 0.0:
283             rank *= exp(-512.0*1024 / speed)
284         if self._completed:
285             rank *= exp(-float(self._errors) / self._completed)
286         rank *= exp(-self.responseTime() / 5.0)
287         self.rank = rank
288         
289 class TestClientManager(unittest.TestCase):
290     """Unit tests for the Peer."""
291     
292     client = None
293     pending_calls = []
294     
295     def gotResp(self, resp, num, expect):
296         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
297         if expect is not None:
298             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
299         def print_(n):
300             pass
301         def printdone(n):
302             pass
303         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
304     
305     def test_download(self):
306         """Tests a normal download."""
307         host = 'www.ietf.org'
308         self.client = Peer(host, 80)
309         self.timeout = 10
310         
311         d = self.client.get('/rfc/rfc0013.txt')
312         d.addCallback(self.gotResp, 1, 1070)
313         return d
314         
315     def test_head(self):
316         """Tests a 'HEAD' request."""
317         host = 'www.ietf.org'
318         self.client = Peer(host, 80)
319         self.timeout = 10
320         
321         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
322         d.addCallback(self.gotResp, 1, 0)
323         return d
324         
325     def test_multiple_downloads(self):
326         """Tests multiple downloads with queueing and connection closing."""
327         host = 'www.ietf.org'
328         self.client = Peer(host, 80)
329         self.timeout = 120
330         lastDefer = defer.Deferred()
331         
332         def newRequest(path, num, expect, last=False):
333             d = self.client.get(path)
334             d.addCallback(self.gotResp, num, expect)
335             if last:
336                 d.addBoth(lastDefer.callback)
337
338         # 3 quick requests
339         newRequest("/rfc/rfc0006.txt", 1, 1776)
340         newRequest("/rfc/rfc2362.txt", 2, 159833)
341         newRequest("/rfc/rfc0801.txt", 3, 40824)
342         
343         # This one will probably be queued
344         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
345         
346         # Connection should still be open, but idle
347         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
348         
349         #Connection should be closed
350         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
351         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
352         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
353         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
354         
355         # Now it should definitely be closed
356         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
357         return lastDefer
358         
359     def test_multiple_quick_downloads(self):
360         """Tests lots of multiple downloads with queueing."""
361         host = 'www.ietf.org'
362         self.client = Peer(host, 80)
363         self.timeout = 30
364         lastDefer = defer.Deferred()
365         
366         def newRequest(path, num, expect, last=False):
367             d = self.client.get(path)
368             d.addCallback(self.gotResp, num, expect)
369             if last:
370                 d.addBoth(lastDefer.callback)
371                 
372         newRequest("/rfc/rfc0006.txt", 1, 1776)
373         newRequest("/rfc/rfc2362.txt", 2, 159833)
374         newRequest("/rfc/rfc0801.txt", 3, 40824)
375         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
376         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
377         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
378         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
379         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
380         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
381         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
382         return lastDefer
383         
384     def checkInfo(self):
385         log.msg('Rank is: %r' % self.client.rank)
386         log.msg('Download speed is: %r' % self.client.downloadSpeed())
387         log.msg('Response Time is: %r' % self.client.responseTime())
388         
389     def test_peer_info(self):
390         """Test retrieving the peer info during a download."""
391         host = 'www.ietf.org'
392         self.client = Peer(host, 80)
393         self.timeout = 120
394         lastDefer = defer.Deferred()
395         
396         def newRequest(path, num, expect, last=False):
397             d = self.client.get(path)
398             d.addCallback(self.gotResp, num, expect)
399             if last:
400                 d.addBoth(lastDefer.callback)
401                 
402         newRequest("/rfc/rfc0006.txt", 1, 1776)
403         newRequest("/rfc/rfc2362.txt", 2, 159833)
404         newRequest("/rfc/rfc0801.txt", 3, 40824)
405         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
406         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
407         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
408         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
409         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
410         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
411         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
412         
413         for i in xrange(2, 122, 2):
414             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
415         
416         return lastDefer
417         
418     def test_range(self):
419         """Test a Range request."""
420         host = 'www.ietf.org'
421         self.client = Peer(host, 80)
422         self.timeout = 10
423         
424         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
425         d.addCallback(self.gotResp, 1, 100)
426         return d
427         
428     def tearDown(self):
429         for p in self.pending_calls:
430             if p.active():
431                 p.cancel()
432         self.pending_calls = []
433         if self.client:
434             self.client.close()
435             self.client = None