Store the peer rank as an attribute and recalculate it automatically.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.rank = 0.5
35         self.busy = False
36         self.pipeline = False
37         self.closed = True
38         self.connecting = False
39         self.request_queue = []
40         self.response_queue = []
41         self.proto = None
42         self.connector = None
43         self._errors = 0
44         self._completed = 0
45         self._downloadSpeeds = []
46         self._lastResponse = None
47         self._responseTimes = []
48         
49     #{ Manage the request queue
50     def connect(self):
51         """Connect to the peer."""
52         assert self.closed and not self.connecting
53         self.connecting = True
54         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
55         d.addCallback(self.connected)
56
57     def connected(self, proto):
58         """Begin processing the queued requests."""
59         self.closed = False
60         self.connecting = False
61         self.proto = proto
62         self.processQueue()
63         
64     def close(self):
65         """Close the connection to the peer."""
66         if not self.closed:
67             self.proto.transport.loseConnection()
68
69     def submitRequest(self, request):
70         """Add a new request to the queue.
71         
72         @type request: L{twisted.web2.client.http.ClientRequest}
73         @return: deferred that will fire with the completed request
74         """
75         request.submissionTime = datetime.now()
76         request.deferRequest = defer.Deferred()
77         self.request_queue.append(request)
78         self.rerank()
79         self.processQueue()
80         return request.deferRequest
81
82     def processQueue(self):
83         """Check the queue to see if new requests can be sent to the peer."""
84         if not self.request_queue:
85             return
86         if self.connecting:
87             return
88         if self.closed:
89             self.connect()
90             return
91         if self.busy and not self.pipeline:
92             return
93         if self.response_queue and not self.pipeline:
94             return
95
96         req = self.request_queue.pop(0)
97         self.response_queue.append(req)
98         self.rerank()
99         req.deferResponse = self.proto.submitRequest(req, False)
100         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
101
102     def requestComplete(self, resp):
103         """Process a completed request."""
104         self._processLastResponse()
105         req = self.response_queue.pop(0)
106         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
107         self._completed += 1
108         if resp.code >= 400:
109             self._errors += 1
110         now = datetime.now()
111         self._responseTimes.append((now, now - req.submissionTime))
112         self._lastResponse = (now, resp.stream.length)
113         self.rerank()
114         req.deferRequest.callback(resp)
115
116     def requestError(self, error):
117         """Process a request that ended with an error."""
118         self._processLastResponse()
119         req = self.response_queue.pop(0)
120         log.msg('Download of %s generated error %r' % (req.uri, error))
121         self._completed += 1
122         self._errors += 1
123         self.rerank()
124         req.deferRequest.errback(error)
125         
126     def hashError(self, error):
127         """Log that a hash error occurred from the peer."""
128         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
129         self._errors += 1
130         self.rerank()
131
132     #{ IHTTPClientManager interface
133     def clientBusy(self, proto):
134         """Save the busy state."""
135         self.busy = True
136
137     def clientIdle(self, proto):
138         """Try to send a new request."""
139         self._processLastResponse()
140         self.busy = False
141         self.processQueue()
142         self.rerank()
143
144     def clientPipelining(self, proto):
145         """Try to send a new request."""
146         self.pipeline = True
147         self.processQueue()
148
149     def clientGone(self, proto):
150         """Mark sent requests as errors."""
151         self._processLastResponse()
152         for req in self.response_queue:
153             req.deferRequest.errback(ProtocolError('lost connection'))
154         self.busy = False
155         self.pipeline = False
156         self.closed = True
157         self.connecting = False
158         self.response_queue = []
159         self.proto = None
160         self.rerank()
161         if self.request_queue:
162             self.processQueue()
163             
164     #{ Downloading request interface
165     def setCommonHeaders(self):
166         """Get the common HTTP headers for all requests."""
167         headers = http_headers.Headers()
168         headers.setHeader('Host', self.host)
169         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
170                           (version.short(), twisted_version.short(), web2_version.short()))
171         return headers
172     
173     def get(self, path, method="GET", modtime=None):
174         """Add a new request to the queue.
175         
176         @type path: C{string}
177         @param path: the path to request from the peer
178         @type method: C{string}
179         @param method: the HTTP method to use, 'GET' or 'HEAD'
180             (optional, defaults to 'GET')
181         @type modtime: C{int}
182         @param modtime: the modification time to use for an 'If-Modified-Since'
183             header, as seconds since the epoch
184             (optional, defaults to not sending that header)
185         """
186         headers = self.setCommonHeaders()
187         if modtime:
188             headers.setHeader('If-Modified-Since', modtime)
189         return self.submitRequest(ClientRequest(method, path, headers, None))
190     
191     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
192         """Add a new request with a Range header to the queue.
193         
194         @type path: C{string}
195         @param path: the path to request from the peer
196         @type rangeStart: C{int}
197         @param rangeStart: the byte to begin the request at
198         @type rangeEnd: C{int}
199         @param rangeEnd: the byte to end the request at (inclusive)
200         @type method: C{string}
201         @param method: the HTTP method to use, 'GET' or 'HEAD'
202             (optional, defaults to 'GET')
203         """
204         headers = self.setCommonHeaders()
205         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
206         return self.submitRequest(ClientRequest(method, path, headers, None))
207     
208     #{ Peer information
209     def isIdle(self):
210         """Check whether the peer is idle or not."""
211         return not self.busy and not self.request_queue and not self.response_queue
212     
213     def _processLastResponse(self):
214         """Save the download time of the last request for speed calculations."""
215         if self._lastResponse is not None:
216             now = datetime.now()
217             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
218             self._lastResponse = None
219             
220     def downloadSpeed(self):
221         """Gets the latest average download speed for the peer.
222         
223         The average is over the last 10 responses that occurred in the last hour.
224         """
225         total_time = 0.0
226         total_download = 0
227         now = datetime.now()
228         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
229                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
230             self._downloadSpeeds.pop(0)
231
232         # If there are none, then you get 0
233         if not self._downloadSpeeds:
234             return 0.0
235         
236         for download in self._downloadSpeeds:
237             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
238             total_download += download[2]
239
240         return total_download / total_time
241     
242     def responseTime(self):
243         """Gets the latest average response time for the peer.
244         
245         Response time is the time from receiving the request, to the time
246         the download begins. The average is over the last 10 responses that
247         occurred in the last hour.
248         """
249         total_response = 0.0
250         now = datetime.now()
251         while self._responseTimes and (len(self._responseTimes) > 10 or 
252                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
253             self._responseTimes.pop(0)
254
255         # If there are none, give it the benefit of the doubt
256         if not self._responseTimes:
257             return 0.0
258
259         for response in self._responseTimes:
260             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
261
262         return total_response / len(self._responseTimes)
263     
264     def rerank(self):
265         """Determine the ranking value for the peer.
266         
267         The ranking value is composed of 5 numbers, each exponentially
268         decreasing from 1 to 0 based on:
269          - if a connection to the peer is open
270          - the number of pending requests
271          - the time to download a single piece
272          - the number of errors
273          - the response time
274         """
275         rank = 1.0
276         if self.closed:
277             rank *= 0.9
278         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
279         speed = self.downloadSpeed()
280         if speed > 0.0:
281             rank *= exp(-512.0*1024 / speed)
282         if self._completed:
283             rank *= exp(-float(self._errors) / self._completed)
284         rank *= exp(-self.responseTime() / 5.0)
285         self.rank = rank
286         
287 class TestClientManager(unittest.TestCase):
288     """Unit tests for the Peer."""
289     
290     client = None
291     pending_calls = []
292     
293     def gotResp(self, resp, num, expect):
294         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
295         if expect is not None:
296             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
297         def print_(n):
298             pass
299         def printdone(n):
300             pass
301         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
302     
303     def test_download(self):
304         """Tests a normal download."""
305         host = 'www.ietf.org'
306         self.client = Peer(host, 80)
307         self.timeout = 10
308         
309         d = self.client.get('/rfc/rfc0013.txt')
310         d.addCallback(self.gotResp, 1, 1070)
311         return d
312         
313     def test_head(self):
314         """Tests a 'HEAD' request."""
315         host = 'www.ietf.org'
316         self.client = Peer(host, 80)
317         self.timeout = 10
318         
319         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
320         d.addCallback(self.gotResp, 1, 0)
321         return d
322         
323     def test_multiple_downloads(self):
324         """Tests multiple downloads with queueing and connection closing."""
325         host = 'www.ietf.org'
326         self.client = Peer(host, 80)
327         self.timeout = 120
328         lastDefer = defer.Deferred()
329         
330         def newRequest(path, num, expect, last=False):
331             d = self.client.get(path)
332             d.addCallback(self.gotResp, num, expect)
333             if last:
334                 d.addBoth(lastDefer.callback)
335
336         # 3 quick requests
337         newRequest("/rfc/rfc0006.txt", 1, 1776)
338         newRequest("/rfc/rfc2362.txt", 2, 159833)
339         newRequest("/rfc/rfc0801.txt", 3, 40824)
340         
341         # This one will probably be queued
342         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
343         
344         # Connection should still be open, but idle
345         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
346         
347         #Connection should be closed
348         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
349         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
350         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
351         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
352         
353         # Now it should definitely be closed
354         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
355         return lastDefer
356         
357     def test_multiple_quick_downloads(self):
358         """Tests lots of multiple downloads with queueing."""
359         host = 'www.ietf.org'
360         self.client = Peer(host, 80)
361         self.timeout = 30
362         lastDefer = defer.Deferred()
363         
364         def newRequest(path, num, expect, last=False):
365             d = self.client.get(path)
366             d.addCallback(self.gotResp, num, expect)
367             if last:
368                 d.addBoth(lastDefer.callback)
369                 
370         newRequest("/rfc/rfc0006.txt", 1, 1776)
371         newRequest("/rfc/rfc2362.txt", 2, 159833)
372         newRequest("/rfc/rfc0801.txt", 3, 40824)
373         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
374         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
375         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
376         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
377         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
378         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
379         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
380         return lastDefer
381         
382     def checkInfo(self):
383         log.msg('Rank is: %r' % self.client.rank)
384         log.msg('Download speed is: %r' % self.client.downloadSpeed())
385         log.msg('Response Time is: %r' % self.client.responseTime())
386         
387     def test_peer_info(self):
388         """Test retrieving the peer info during a download."""
389         host = 'www.ietf.org'
390         self.client = Peer(host, 80)
391         self.timeout = 120
392         lastDefer = defer.Deferred()
393         
394         def newRequest(path, num, expect, last=False):
395             d = self.client.get(path)
396             d.addCallback(self.gotResp, num, expect)
397             if last:
398                 d.addBoth(lastDefer.callback)
399                 
400         newRequest("/rfc/rfc0006.txt", 1, 1776)
401         newRequest("/rfc/rfc2362.txt", 2, 159833)
402         newRequest("/rfc/rfc0801.txt", 3, 40824)
403         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
404         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
405         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
406         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
407         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
408         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
409         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
410         
411         for i in xrange(2, 122, 2):
412             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
413         
414         return lastDefer
415         
416     def test_range(self):
417         """Test a Range request."""
418         host = 'www.ietf.org'
419         self.client = Peer(host, 80)
420         self.timeout = 10
421         
422         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
423         d.addCallback(self.gotResp, 1, 100)
424         return d
425         
426     def tearDown(self):
427         for p in self.pending_calls:
428             if p.active():
429                 p.cancel()
430         self.pending_calls = []
431         if self.client:
432             self.client.close()
433             self.client = None