More multiple peer downloading work, finished but still untested.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.rank = 0.5
35         self.busy = False
36         self.pipeline = False
37         self.closed = True
38         self.connecting = False
39         self.request_queue = []
40         self.response_queue = []
41         self.proto = None
42         self.connector = None
43         self._errors = 0
44         self._completed = 0
45         self._downloadSpeeds = []
46         self._lastResponse = None
47         self._responseTimes = []
48         
49     #{ Manage the request queue
50     def connect(self):
51         """Connect to the peer."""
52         assert self.closed and not self.connecting
53         self.connecting = True
54         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
55         d.addCallback(self.connected)
56
57     def connected(self, proto):
58         """Begin processing the queued requests."""
59         self.closed = False
60         self.connecting = False
61         self.proto = proto
62         self.processQueue()
63         
64     def close(self):
65         """Close the connection to the peer."""
66         if not self.closed:
67             self.proto.transport.loseConnection()
68
69     def submitRequest(self, request):
70         """Add a new request to the queue.
71         
72         @type request: L{twisted.web2.client.http.ClientRequest}
73         @return: deferred that will fire with the completed request
74         """
75         request.submissionTime = datetime.now()
76         request.deferRequest = defer.Deferred()
77         self.request_queue.append(request)
78         self.rerank()
79         self.processQueue()
80         return request.deferRequest
81
82     def processQueue(self):
83         """Check the queue to see if new requests can be sent to the peer."""
84         if not self.request_queue:
85             return
86         if self.connecting:
87             return
88         if self.closed:
89             self.connect()
90             return
91         if self.busy and not self.pipeline:
92             return
93         if self.response_queue and not self.pipeline:
94             return
95
96         req = self.request_queue.pop(0)
97         self.response_queue.append(req)
98         self.rerank()
99         req.deferResponse = self.proto.submitRequest(req, False)
100         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
101
102     def requestComplete(self, resp):
103         """Process a completed request."""
104         self._processLastResponse()
105         req = self.response_queue.pop(0)
106         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
107         self._completed += 1
108         now = datetime.now()
109         self._responseTimes.append((now, now - req.submissionTime))
110         self._lastResponse = (now, resp.stream.length)
111         self.rerank()
112         req.deferRequest.callback(resp)
113
114     def requestError(self, error):
115         """Process a request that ended with an error."""
116         self._processLastResponse()
117         req = self.response_queue.pop(0)
118         log.msg('Download of %s generated error %r' % (req.uri, error))
119         self._completed += 1
120         self._errors += 1
121         self.rerank()
122         req.deferRequest.errback(error)
123         
124     def hashError(self, error):
125         """Log that a hash error occurred from the peer."""
126         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
127         self._errors += 1
128         self.rerank()
129
130     #{ IHTTPClientManager interface
131     def clientBusy(self, proto):
132         """Save the busy state."""
133         self.busy = True
134
135     def clientIdle(self, proto):
136         """Try to send a new request."""
137         self._processLastResponse()
138         self.busy = False
139         self.processQueue()
140         self.rerank()
141
142     def clientPipelining(self, proto):
143         """Try to send a new request."""
144         self.pipeline = True
145         self.processQueue()
146
147     def clientGone(self, proto):
148         """Mark sent requests as errors."""
149         self._processLastResponse()
150         for req in self.response_queue:
151             req.deferRequest.errback(ProtocolError('lost connection'))
152         self.busy = False
153         self.pipeline = False
154         self.closed = True
155         self.connecting = False
156         self.response_queue = []
157         self.proto = None
158         self.rerank()
159         if self.request_queue:
160             self.processQueue()
161             
162     #{ Downloading request interface
163     def setCommonHeaders(self):
164         """Get the common HTTP headers for all requests."""
165         headers = http_headers.Headers()
166         headers.setHeader('Host', self.host)
167         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
168                           (version.short(), twisted_version.short(), web2_version.short()))
169         return headers
170     
171     def get(self, path, method="GET", modtime=None):
172         """Add a new request to the queue.
173         
174         @type path: C{string}
175         @param path: the path to request from the peer
176         @type method: C{string}
177         @param method: the HTTP method to use, 'GET' or 'HEAD'
178             (optional, defaults to 'GET')
179         @type modtime: C{int}
180         @param modtime: the modification time to use for an 'If-Modified-Since'
181             header, as seconds since the epoch
182             (optional, defaults to not sending that header)
183         """
184         headers = self.setCommonHeaders()
185         if modtime:
186             headers.setHeader('If-Modified-Since', modtime)
187         return self.submitRequest(ClientRequest(method, path, headers, None))
188     
189     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
190         """Add a new request with a Range header to the queue.
191         
192         @type path: C{string}
193         @param path: the path to request from the peer
194         @type rangeStart: C{int}
195         @param rangeStart: the byte to begin the request at
196         @type rangeEnd: C{int}
197         @param rangeEnd: the byte to end the request at (inclusive)
198         @type method: C{string}
199         @param method: the HTTP method to use, 'GET' or 'HEAD'
200             (optional, defaults to 'GET')
201         """
202         headers = self.setCommonHeaders()
203         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
204         return self.submitRequest(ClientRequest(method, path, headers, None))
205     
206     #{ Peer information
207     def isIdle(self):
208         """Check whether the peer is idle or not."""
209         return not self.busy and not self.request_queue and not self.response_queue
210     
211     def _processLastResponse(self):
212         """Save the download time of the last request for speed calculations."""
213         if self._lastResponse is not None:
214             now = datetime.now()
215             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
216             self._lastResponse = None
217             
218     def downloadSpeed(self):
219         """Gets the latest average download speed for the peer.
220         
221         The average is over the last 10 responses that occurred in the last hour.
222         """
223         total_time = 0.0
224         total_download = 0
225         now = datetime.now()
226         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
227                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
228             self._downloadSpeeds.pop(0)
229
230         # If there are none, then you get 0
231         if not self._downloadSpeeds:
232             return 0.0
233         
234         for download in self._downloadSpeeds:
235             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
236             total_download += download[2]
237
238         return total_download / total_time
239     
240     def responseTime(self):
241         """Gets the latest average response time for the peer.
242         
243         Response time is the time from receiving the request, to the time
244         the download begins. The average is over the last 10 responses that
245         occurred in the last hour.
246         """
247         total_response = 0.0
248         now = datetime.now()
249         while self._responseTimes and (len(self._responseTimes) > 10 or 
250                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
251             self._responseTimes.pop(0)
252
253         # If there are none, give it the benefit of the doubt
254         if not self._responseTimes:
255             return 0.0
256
257         for response in self._responseTimes:
258             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
259
260         return total_response / len(self._responseTimes)
261     
262     def rerank(self):
263         """Determine the ranking value for the peer.
264         
265         The ranking value is composed of 5 numbers, each exponentially
266         decreasing from 1 to 0 based on:
267          - if a connection to the peer is open
268          - the number of pending requests
269          - the time to download a single piece
270          - the number of errors
271          - the response time
272         """
273         rank = 1.0
274         if self.closed:
275             rank *= 0.9
276         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
277         speed = self.downloadSpeed()
278         if speed > 0.0:
279             rank *= exp(-512.0*1024 / speed)
280         if self._completed:
281             rank *= exp(-float(self._errors) / self._completed)
282         rank *= exp(-self.responseTime() / 5.0)
283         self.rank = rank
284         
285 class TestClientManager(unittest.TestCase):
286     """Unit tests for the Peer."""
287     
288     client = None
289     pending_calls = []
290     
291     def gotResp(self, resp, num, expect):
292         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
293         if expect is not None:
294             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
295         def print_(n):
296             pass
297         def printdone(n):
298             pass
299         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
300     
301     def test_download(self):
302         """Tests a normal download."""
303         host = 'www.ietf.org'
304         self.client = Peer(host, 80)
305         self.timeout = 10
306         
307         d = self.client.get('/rfc/rfc0013.txt')
308         d.addCallback(self.gotResp, 1, 1070)
309         return d
310         
311     def test_head(self):
312         """Tests a 'HEAD' request."""
313         host = 'www.ietf.org'
314         self.client = Peer(host, 80)
315         self.timeout = 10
316         
317         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
318         d.addCallback(self.gotResp, 1, 0)
319         return d
320         
321     def test_multiple_downloads(self):
322         """Tests multiple downloads with queueing and connection closing."""
323         host = 'www.ietf.org'
324         self.client = Peer(host, 80)
325         self.timeout = 120
326         lastDefer = defer.Deferred()
327         
328         def newRequest(path, num, expect, last=False):
329             d = self.client.get(path)
330             d.addCallback(self.gotResp, num, expect)
331             if last:
332                 d.addBoth(lastDefer.callback)
333
334         # 3 quick requests
335         newRequest("/rfc/rfc0006.txt", 1, 1776)
336         newRequest("/rfc/rfc2362.txt", 2, 159833)
337         newRequest("/rfc/rfc0801.txt", 3, 40824)
338         
339         # This one will probably be queued
340         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
341         
342         # Connection should still be open, but idle
343         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
344         
345         #Connection should be closed
346         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
347         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
348         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
349         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
350         
351         # Now it should definitely be closed
352         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
353         return lastDefer
354         
355     def test_multiple_quick_downloads(self):
356         """Tests lots of multiple downloads with queueing."""
357         host = 'www.ietf.org'
358         self.client = Peer(host, 80)
359         self.timeout = 30
360         lastDefer = defer.Deferred()
361         
362         def newRequest(path, num, expect, last=False):
363             d = self.client.get(path)
364             d.addCallback(self.gotResp, num, expect)
365             if last:
366                 d.addBoth(lastDefer.callback)
367                 
368         newRequest("/rfc/rfc0006.txt", 1, 1776)
369         newRequest("/rfc/rfc2362.txt", 2, 159833)
370         newRequest("/rfc/rfc0801.txt", 3, 40824)
371         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
372         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
373         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
374         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
375         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
376         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
377         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
378         return lastDefer
379         
380     def checkInfo(self):
381         log.msg('Rank is: %r' % self.client.rank)
382         log.msg('Download speed is: %r' % self.client.downloadSpeed())
383         log.msg('Response Time is: %r' % self.client.responseTime())
384         
385     def test_peer_info(self):
386         """Test retrieving the peer info during a download."""
387         host = 'www.ietf.org'
388         self.client = Peer(host, 80)
389         self.timeout = 120
390         lastDefer = defer.Deferred()
391         
392         def newRequest(path, num, expect, last=False):
393             d = self.client.get(path)
394             d.addCallback(self.gotResp, num, expect)
395             if last:
396                 d.addBoth(lastDefer.callback)
397                 
398         newRequest("/rfc/rfc0006.txt", 1, 1776)
399         newRequest("/rfc/rfc2362.txt", 2, 159833)
400         newRequest("/rfc/rfc0801.txt", 3, 40824)
401         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
402         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
403         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
404         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
405         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
406         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
407         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
408         
409         for i in xrange(2, 122, 2):
410             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
411         
412         return lastDefer
413         
414     def test_range(self):
415         """Test a Range request."""
416         host = 'www.ietf.org'
417         self.client = Peer(host, 80)
418         self.timeout = 10
419         
420         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
421         d.addCallback(self.gotResp, 1, 100)
422         return d
423         
424     def tearDown(self):
425         for p in self.pending_calls:
426             if p.active():
427                 p.cancel()
428         self.pending_calls = []
429         if self.client:
430             self.client.close()
431             self.client = None