Multiple peer downloading is mostly working now.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.rank = 0.5
35         self.busy = False
36         self.pipeline = False
37         self.closed = True
38         self.connecting = False
39         self.request_queue = []
40         self.response_queue = []
41         self.proto = None
42         self.connector = None
43         self._errors = 0
44         self._completed = 0
45         self._downloadSpeeds = []
46         self._lastResponse = None
47         self._responseTimes = []
48     
49     def __repr__(self):
50         return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank)
51         
52     #{ Manage the request queue
53     def connect(self):
54         """Connect to the peer."""
55         assert self.closed and not self.connecting
56         self.connecting = True
57         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
58         d.addCallback(self.connected)
59
60     def connected(self, proto):
61         """Begin processing the queued requests."""
62         self.closed = False
63         self.connecting = False
64         self.proto = proto
65         self.processQueue()
66         
67     def close(self):
68         """Close the connection to the peer."""
69         if not self.closed:
70             self.proto.transport.loseConnection()
71
72     def submitRequest(self, request):
73         """Add a new request to the queue.
74         
75         @type request: L{twisted.web2.client.http.ClientRequest}
76         @return: deferred that will fire with the completed request
77         """
78         request.submissionTime = datetime.now()
79         request.deferRequest = defer.Deferred()
80         self.request_queue.append(request)
81         self.rerank()
82         self.processQueue()
83         return request.deferRequest
84
85     def processQueue(self):
86         """Check the queue to see if new requests can be sent to the peer."""
87         if not self.request_queue:
88             return
89         if self.connecting:
90             return
91         if self.closed:
92             self.connect()
93             return
94         if self.busy and not self.pipeline:
95             return
96         if self.response_queue and not self.pipeline:
97             return
98
99         req = self.request_queue.pop(0)
100         self.response_queue.append(req)
101         self.rerank()
102         req.deferResponse = self.proto.submitRequest(req, False)
103         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
104
105     def requestComplete(self, resp):
106         """Process a completed request."""
107         self._processLastResponse()
108         req = self.response_queue.pop(0)
109         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
110         self._completed += 1
111         now = datetime.now()
112         self._responseTimes.append((now, now - req.submissionTime))
113         self._lastResponse = (now, resp.stream.length)
114         self.rerank()
115         req.deferRequest.callback(resp)
116
117     def requestError(self, error):
118         """Process a request that ended with an error."""
119         self._processLastResponse()
120         req = self.response_queue.pop(0)
121         log.msg('Download of %s generated error %r' % (req.uri, error))
122         self._completed += 1
123         self._errors += 1
124         self.rerank()
125         req.deferRequest.errback(error)
126         
127     def hashError(self, error):
128         """Log that a hash error occurred from the peer."""
129         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
130         self._errors += 1
131         self.rerank()
132
133     #{ IHTTPClientManager interface
134     def clientBusy(self, proto):
135         """Save the busy state."""
136         self.busy = True
137
138     def clientIdle(self, proto):
139         """Try to send a new request."""
140         self._processLastResponse()
141         self.busy = False
142         self.processQueue()
143         self.rerank()
144
145     def clientPipelining(self, proto):
146         """Try to send a new request."""
147         self.pipeline = True
148         self.processQueue()
149
150     def clientGone(self, proto):
151         """Mark sent requests as errors."""
152         self._processLastResponse()
153         for req in self.response_queue:
154             req.deferRequest.errback(ProtocolError('lost connection'))
155         self.busy = False
156         self.pipeline = False
157         self.closed = True
158         self.connecting = False
159         self.response_queue = []
160         self.proto = None
161         self.rerank()
162         if self.request_queue:
163             self.processQueue()
164             
165     #{ Downloading request interface
166     def setCommonHeaders(self):
167         """Get the common HTTP headers for all requests."""
168         headers = http_headers.Headers()
169         headers.setHeader('Host', self.host)
170         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
171                           (version.short(), twisted_version.short(), web2_version.short()))
172         return headers
173     
174     def get(self, path, method="GET", modtime=None):
175         """Add a new request to the queue.
176         
177         @type path: C{string}
178         @param path: the path to request from the peer
179         @type method: C{string}
180         @param method: the HTTP method to use, 'GET' or 'HEAD'
181             (optional, defaults to 'GET')
182         @type modtime: C{int}
183         @param modtime: the modification time to use for an 'If-Modified-Since'
184             header, as seconds since the epoch
185             (optional, defaults to not sending that header)
186         """
187         headers = self.setCommonHeaders()
188         if modtime:
189             headers.setHeader('If-Modified-Since', modtime)
190         return self.submitRequest(ClientRequest(method, path, headers, None))
191     
192     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
193         """Add a new request with a Range header to the queue.
194         
195         @type path: C{string}
196         @param path: the path to request from the peer
197         @type rangeStart: C{int}
198         @param rangeStart: the byte to begin the request at
199         @type rangeEnd: C{int}
200         @param rangeEnd: the byte to end the request at (inclusive)
201         @type method: C{string}
202         @param method: the HTTP method to use, 'GET' or 'HEAD'
203             (optional, defaults to 'GET')
204         """
205         headers = self.setCommonHeaders()
206         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
207         return self.submitRequest(ClientRequest(method, path, headers, None))
208     
209     #{ Peer information
210     def isIdle(self):
211         """Check whether the peer is idle or not."""
212         return not self.busy and not self.request_queue and not self.response_queue
213     
214     def _processLastResponse(self):
215         """Save the download time of the last request for speed calculations."""
216         if self._lastResponse is not None:
217             now = datetime.now()
218             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
219             self._lastResponse = None
220             
221     def downloadSpeed(self):
222         """Gets the latest average download speed for the peer.
223         
224         The average is over the last 10 responses that occurred in the last hour.
225         """
226         total_time = 0.0
227         total_download = 0
228         now = datetime.now()
229         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
230                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
231             self._downloadSpeeds.pop(0)
232
233         # If there are none, then you get 0
234         if not self._downloadSpeeds:
235             return 0.0
236         
237         for download in self._downloadSpeeds:
238             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
239             total_download += download[2]
240
241         return total_download / total_time
242     
243     def responseTime(self):
244         """Gets the latest average response time for the peer.
245         
246         Response time is the time from receiving the request, to the time
247         the download begins. The average is over the last 10 responses that
248         occurred in the last hour.
249         """
250         total_response = 0.0
251         now = datetime.now()
252         while self._responseTimes and (len(self._responseTimes) > 10 or 
253                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
254             self._responseTimes.pop(0)
255
256         # If there are none, give it the benefit of the doubt
257         if not self._responseTimes:
258             return 0.0
259
260         for response in self._responseTimes:
261             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
262
263         return total_response / len(self._responseTimes)
264     
265     def rerank(self):
266         """Determine the ranking value for the peer.
267         
268         The ranking value is composed of 5 numbers, each exponentially
269         decreasing from 1 to 0 based on:
270          - if a connection to the peer is open
271          - the number of pending requests
272          - the time to download a single piece
273          - the number of errors
274          - the response time
275         """
276         rank = 1.0
277         if self.closed:
278             rank *= 0.9
279         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
280         speed = self.downloadSpeed()
281         if speed > 0.0:
282             rank *= exp(-512.0*1024 / speed)
283         if self._completed:
284             rank *= exp(-float(self._errors) / self._completed)
285         rank *= exp(-self.responseTime() / 5.0)
286         self.rank = rank
287         
288 class TestClientManager(unittest.TestCase):
289     """Unit tests for the Peer."""
290     
291     client = None
292     pending_calls = []
293     
294     def gotResp(self, resp, num, expect):
295         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
296         if expect is not None:
297             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
298         def print_(n):
299             pass
300         def printdone(n):
301             pass
302         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
303     
304     def test_download(self):
305         """Tests a normal download."""
306         host = 'www.ietf.org'
307         self.client = Peer(host, 80)
308         self.timeout = 10
309         
310         d = self.client.get('/rfc/rfc0013.txt')
311         d.addCallback(self.gotResp, 1, 1070)
312         return d
313         
314     def test_head(self):
315         """Tests a 'HEAD' request."""
316         host = 'www.ietf.org'
317         self.client = Peer(host, 80)
318         self.timeout = 10
319         
320         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
321         d.addCallback(self.gotResp, 1, 0)
322         return d
323         
324     def test_multiple_downloads(self):
325         """Tests multiple downloads with queueing and connection closing."""
326         host = 'www.ietf.org'
327         self.client = Peer(host, 80)
328         self.timeout = 120
329         lastDefer = defer.Deferred()
330         
331         def newRequest(path, num, expect, last=False):
332             d = self.client.get(path)
333             d.addCallback(self.gotResp, num, expect)
334             if last:
335                 d.addBoth(lastDefer.callback)
336
337         # 3 quick requests
338         newRequest("/rfc/rfc0006.txt", 1, 1776)
339         newRequest("/rfc/rfc2362.txt", 2, 159833)
340         newRequest("/rfc/rfc0801.txt", 3, 40824)
341         
342         # This one will probably be queued
343         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
344         
345         # Connection should still be open, but idle
346         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
347         
348         #Connection should be closed
349         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
350         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
351         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
352         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
353         
354         # Now it should definitely be closed
355         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
356         return lastDefer
357         
358     def test_multiple_quick_downloads(self):
359         """Tests lots of multiple downloads with queueing."""
360         host = 'www.ietf.org'
361         self.client = Peer(host, 80)
362         self.timeout = 30
363         lastDefer = defer.Deferred()
364         
365         def newRequest(path, num, expect, last=False):
366             d = self.client.get(path)
367             d.addCallback(self.gotResp, num, expect)
368             if last:
369                 d.addBoth(lastDefer.callback)
370                 
371         newRequest("/rfc/rfc0006.txt", 1, 1776)
372         newRequest("/rfc/rfc2362.txt", 2, 159833)
373         newRequest("/rfc/rfc0801.txt", 3, 40824)
374         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
375         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
376         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
377         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
378         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
379         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
380         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
381         return lastDefer
382         
383     def checkInfo(self):
384         log.msg('Rank is: %r' % self.client.rank)
385         log.msg('Download speed is: %r' % self.client.downloadSpeed())
386         log.msg('Response Time is: %r' % self.client.responseTime())
387         
388     def test_peer_info(self):
389         """Test retrieving the peer info during a download."""
390         host = 'www.ietf.org'
391         self.client = Peer(host, 80)
392         self.timeout = 120
393         lastDefer = defer.Deferred()
394         
395         def newRequest(path, num, expect, last=False):
396             d = self.client.get(path)
397             d.addCallback(self.gotResp, num, expect)
398             if last:
399                 d.addBoth(lastDefer.callback)
400                 
401         newRequest("/rfc/rfc0006.txt", 1, 1776)
402         newRequest("/rfc/rfc2362.txt", 2, 159833)
403         newRequest("/rfc/rfc0801.txt", 3, 40824)
404         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
405         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
406         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
407         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
408         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
409         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
410         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
411         
412         for i in xrange(2, 122, 2):
413             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
414         
415         return lastDefer
416         
417     def test_range(self):
418         """Test a Range request."""
419         host = 'www.ietf.org'
420         self.client = Peer(host, 80)
421         self.timeout = 10
422         
423         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
424         d.addCallback(self.gotResp, 1, 100)
425         return d
426         
427     def tearDown(self):
428         for p in self.pending_calls:
429             if p.active():
430                 p.cancel()
431         self.pending_calls = []
432         if self.client:
433             self.client.close()
434             self.client = None