eb369326b7f309e40c22bc39485fe8ae1fc6c671
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.busy = False
35         self.pipeline = False
36         self.closed = True
37         self.connecting = False
38         self.request_queue = []
39         self.response_queue = []
40         self.proto = None
41         self.connector = None
42         self._errors = 0
43         self._completed = 0
44         self._downloadSpeeds = []
45         self._lastResponse = None
46         self._responseTimes = []
47         
48     #{ Manage the request queue
49     def connect(self):
50         """Connect to the peer."""
51         assert self.closed and not self.connecting
52         self.connecting = True
53         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
54         d.addCallback(self.connected)
55
56     def connected(self, proto):
57         """Begin processing the queued requests."""
58         self.closed = False
59         self.connecting = False
60         self.proto = proto
61         self.processQueue()
62         
63     def close(self):
64         """Close the connection to the peer."""
65         if not self.closed:
66             self.proto.transport.loseConnection()
67
68     def submitRequest(self, request):
69         """Add a new request to the queue.
70         
71         @type request: L{twisted.web2.client.http.ClientRequest}
72         @return: deferred that will fire with the completed request
73         """
74         request.submissionTime = datetime.now()
75         request.deferRequest = defer.Deferred()
76         self.request_queue.append(request)
77         self.processQueue()
78         return request.deferRequest
79
80     def processQueue(self):
81         """Check the queue to see if new requests can be sent to the peer."""
82         if not self.request_queue:
83             return
84         if self.connecting:
85             return
86         if self.closed:
87             self.connect()
88             return
89         if self.busy and not self.pipeline:
90             return
91         if self.response_queue and not self.pipeline:
92             return
93
94         req = self.request_queue.pop(0)
95         self.response_queue.append(req)
96         req.deferResponse = self.proto.submitRequest(req, False)
97         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
98
99     def requestComplete(self, resp):
100         """Process a completed request."""
101         self._processLastResponse()
102         req = self.response_queue.pop(0)
103         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
104         self._completed += 1
105         if resp.code >= 400:
106             self._errors += 1
107         now = datetime.now()
108         self._responseTimes.append((now, now - req.submissionTime))
109         self._lastResponse = (now, resp.stream.length)
110         req.deferRequest.callback(resp)
111
112     def requestError(self, error):
113         """Process a request that ended with an error."""
114         self._processLastResponse()
115         req = self.response_queue.pop(0)
116         log.msg('Download of %s generated error %r' % (req.uri, error))
117         self._completed += 1
118         self._errors += 1
119         req.deferRequest.errback(error)
120         
121     def hashError(self, error):
122         """Log that a hash error occurred from the peer."""
123         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
124         self._errors += 1
125
126     #{ IHTTPClientManager interface
127     def clientBusy(self, proto):
128         """Save the busy state."""
129         self.busy = True
130
131     def clientIdle(self, proto):
132         """Try to send a new request."""
133         self._processLastResponse()
134         self.busy = False
135         self.processQueue()
136
137     def clientPipelining(self, proto):
138         """Try to send a new request."""
139         self.pipeline = True
140         self.processQueue()
141
142     def clientGone(self, proto):
143         """Mark sent requests as errors."""
144         self._processLastResponse()
145         for req in self.response_queue:
146             req.deferRequest.errback(ProtocolError('lost connection'))
147         self.busy = False
148         self.pipeline = False
149         self.closed = True
150         self.connecting = False
151         self.response_queue = []
152         self.proto = None
153         if self.request_queue:
154             self.processQueue()
155             
156     #{ Downloading request interface
157     def setCommonHeaders(self):
158         """Get the common HTTP headers for all requests."""
159         headers = http_headers.Headers()
160         headers.setHeader('Host', self.host)
161         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
162                           (version.short(), twisted_version.short(), web2_version.short()))
163         return headers
164     
165     def get(self, path, method="GET", modtime=None):
166         """Add a new request to the queue.
167         
168         @type path: C{string}
169         @param path: the path to request from the peer
170         @type method: C{string}
171         @param method: the HTTP method to use, 'GET' or 'HEAD'
172             (optional, defaults to 'GET')
173         @type modtime: C{int}
174         @param modtime: the modification time to use for an 'If-Modified-Since'
175             header, as seconds since the epoch
176             (optional, defaults to not sending that header)
177         """
178         headers = self.setCommonHeaders()
179         if modtime:
180             headers.setHeader('If-Modified-Since', modtime)
181         return self.submitRequest(ClientRequest(method, path, headers, None))
182     
183     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
184         """Add a new request with a Range header to the queue.
185         
186         @type path: C{string}
187         @param path: the path to request from the peer
188         @type rangeStart: C{int}
189         @param rangeStart: the byte to begin the request at
190         @type rangeEnd: C{int}
191         @param rangeEnd: the byte to end the request at (inclusive)
192         @type method: C{string}
193         @param method: the HTTP method to use, 'GET' or 'HEAD'
194             (optional, defaults to 'GET')
195         """
196         headers = self.setCommonHeaders()
197         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
198         return self.submitRequest(ClientRequest(method, path, headers, None))
199     
200     #{ Peer information
201     def isIdle(self):
202         """Check whether the peer is idle or not."""
203         return not self.busy and not self.request_queue and not self.response_queue
204     
205     def _processLastResponse(self):
206         """Save the download time of the last request for speed calculations."""
207         if self._lastResponse is not None:
208             now = datetime.now()
209             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
210             self._lastResponse = None
211             
212     def downloadSpeed(self):
213         """Gets the latest average download speed for the peer.
214         
215         The average is over the last 10 responses that occurred in the last hour.
216         """
217         total_time = 0.0
218         total_download = 0
219         now = datetime.now()
220         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
221                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
222             self._downloadSpeeds.pop(0)
223
224         # If there are none, then you get 0
225         if not self._downloadSpeeds:
226             return 0.0
227         
228         for download in self._downloadSpeeds:
229             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
230             total_download += download[2]
231
232         return total_download / total_time
233     
234     def responseTime(self):
235         """Gets the latest average response time for the peer.
236         
237         Response time is the time from receiving the request, to the time
238         the download begins. The average is over the last 10 responses that
239         occurred in the last hour.
240         """
241         total_response = 0.0
242         now = datetime.now()
243         while self._responseTimes and (len(self._responseTimes) > 10 or 
244                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
245             self._responseTimes.pop(0)
246
247         # If there are none, give it the benefit of the doubt
248         if not self._responseTimes:
249             return 0.0
250
251         for response in self._responseTimes:
252             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
253
254         return total_response / len(self._responseTimes)
255     
256     def rank(self, fastest):
257         """Determine the ranking value for the peer.
258         
259         The ranking value is composed of 5 numbers:
260          - 1 if a connection to the peer is open, 0.9 otherwise
261          - 1 if there are no pending requests, to 0 if there are a maximum
262          - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
263          - 1 if all requests are good, 0 if all produced errors
264          - an exponentially decreasing number based on the response time
265         """
266         rank = 1.0
267         if self.closed:
268             rank *= 0.9
269         rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
270         if fastest > 0.0:
271             rank *= min(1.0, self.downloadSpeed() / fastest)
272         if self._completed:
273             rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
274         rank *= exp(-self.responseTime() / 5.0)
275         return rank
276         
277 class TestClientManager(unittest.TestCase):
278     """Unit tests for the Peer."""
279     
280     client = None
281     pending_calls = []
282     
283     def gotResp(self, resp, num, expect):
284         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
285         if expect is not None:
286             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
287         def print_(n):
288             pass
289         def printdone(n):
290             pass
291         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
292     
293     def test_download(self):
294         """Tests a normal download."""
295         host = 'www.ietf.org'
296         self.client = Peer(host, 80)
297         self.timeout = 10
298         
299         d = self.client.get('/rfc/rfc0013.txt')
300         d.addCallback(self.gotResp, 1, 1070)
301         return d
302         
303     def test_head(self):
304         """Tests a 'HEAD' request."""
305         host = 'www.ietf.org'
306         self.client = Peer(host, 80)
307         self.timeout = 10
308         
309         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
310         d.addCallback(self.gotResp, 1, 0)
311         return d
312         
313     def test_multiple_downloads(self):
314         """Tests multiple downloads with queueing and connection closing."""
315         host = 'www.ietf.org'
316         self.client = Peer(host, 80)
317         self.timeout = 120
318         lastDefer = defer.Deferred()
319         
320         def newRequest(path, num, expect, last=False):
321             d = self.client.get(path)
322             d.addCallback(self.gotResp, num, expect)
323             if last:
324                 d.addBoth(lastDefer.callback)
325
326         # 3 quick requests
327         newRequest("/rfc/rfc0006.txt", 1, 1776)
328         newRequest("/rfc/rfc2362.txt", 2, 159833)
329         newRequest("/rfc/rfc0801.txt", 3, 40824)
330         
331         # This one will probably be queued
332         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
333         
334         # Connection should still be open, but idle
335         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
336         
337         #Connection should be closed
338         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
339         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
340         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
341         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
342         
343         # Now it should definitely be closed
344         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
345         return lastDefer
346         
347     def test_multiple_quick_downloads(self):
348         """Tests lots of multiple downloads with queueing."""
349         host = 'www.ietf.org'
350         self.client = Peer(host, 80)
351         self.timeout = 30
352         lastDefer = defer.Deferred()
353         
354         def newRequest(path, num, expect, last=False):
355             d = self.client.get(path)
356             d.addCallback(self.gotResp, num, expect)
357             if last:
358                 d.addBoth(lastDefer.callback)
359                 
360         newRequest("/rfc/rfc0006.txt", 1, 1776)
361         newRequest("/rfc/rfc2362.txt", 2, 159833)
362         newRequest("/rfc/rfc0801.txt", 3, 40824)
363         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
364         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
365         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
366         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
367         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
368         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
369         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
370         return lastDefer
371         
372     def checkInfo(self):
373         log.msg('Rank is: %r' % self.client.rank(250.0*1024))
374         log.msg('Download speed is: %r' % self.client.downloadSpeed())
375         log.msg('Response Time is: %r' % self.client.responseTime())
376         
377     def test_peer_info(self):
378         """Test retrieving the peer info during a download."""
379         host = 'www.ietf.org'
380         self.client = Peer(host, 80)
381         self.timeout = 120
382         lastDefer = defer.Deferred()
383         
384         def newRequest(path, num, expect, last=False):
385             d = self.client.get(path)
386             d.addCallback(self.gotResp, num, expect)
387             if last:
388                 d.addBoth(lastDefer.callback)
389                 
390         newRequest("/rfc/rfc0006.txt", 1, 1776)
391         newRequest("/rfc/rfc2362.txt", 2, 159833)
392         newRequest("/rfc/rfc0801.txt", 3, 40824)
393         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
394         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
395         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
396         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
397         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
398         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
399         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
400         
401         for i in xrange(2, 122, 2):
402             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
403         
404         return lastDefer
405         
406     def test_range(self):
407         """Test a Range request."""
408         host = 'www.ietf.org'
409         self.client = Peer(host, 80)
410         self.timeout = 10
411         
412         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
413         d.addCallback(self.gotResp, 1, 100)
414         return d
415         
416     def tearDown(self):
417         for p in self.pending_calls:
418             if p.active():
419                 p.cancel()
420         self.pending_calls = []
421         if self.client:
422             self.client.close()
423             self.client = None