]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p/HTTPDownloader.py
Better handling and logging for intermittent HTTP client submission errors.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
18
19 from apt_p2p_conf import version
20
21 class LoggingHTTPClientProtocol(HTTPClientProtocol):
22     """A modified client protocol that logs the number of bytes received."""
23     
24     def __init__(self, factory, stats = None, mirror = False):
25         HTTPClientProtocol.__init__(self, factory)
26         self.stats = stats
27         self.mirror = mirror
28     
29     def lineReceived(self, line):
30         if self.stats:
31             self.stats.receivedBytes(len(line) + 2, self.mirror)
32         HTTPClientProtocol.lineReceived(self, line)
33
34     def rawDataReceived(self, data):
35         if self.stats:
36             self.stats.receivedBytes(len(data), self.mirror)
37         HTTPClientProtocol.rawDataReceived(self, data)
38
39 class Peer(ClientFactory):
40     """A manager for all HTTP requests to a single peer.
41     
42     Controls all requests that go to a single peer (host and port).
43     This includes buffering requests until they can be sent and reconnecting
44     in the event of the connection being closed.
45     
46     """
47
48     implements(IHTTPClientManager)
49     
50     def __init__(self, host, port = 80, stats = None):
51         self.host = host
52         self.port = port
53         self.stats = stats
54         self.mirror = False
55         self.rank = 0.1
56         self.busy = False
57         self.pipeline = False
58         self.closed = True
59         self.connecting = False
60         self.request_queue = []
61         self.outstanding = 0
62         self.proto = None
63         self.connector = None
64         self._errors = 0
65         self._completed = 0
66         self._downloadSpeeds = []
67         self._lastResponse = None
68         self._responseTimes = []
69     
70     def __repr__(self):
71         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
72         
73     #{ Manage the request queue
74     def connect(self):
75         """Connect to the peer."""
76         assert self.closed and not self.connecting
77         self.connecting = True
78         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
79                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
80         d.addCallbacks(self.connected, self.connectionError)
81
82     def connected(self, proto):
83         """Begin processing the queued requests."""
84         self.closed = False
85         self.connecting = False
86         self.proto = proto
87         reactor.callLater(0, self.processQueue)
88         
89     def connectionError(self, err):
90         """Cancel the requests."""
91         log.msg('Failed to connect to the peer by HTTP.')
92         log.err(err)
93
94         # Remove one request so that we don't loop indefinitely
95         if self.request_queue:
96             req, deferRequest, submissionTime = self.request_queue.pop(0)
97             deferRequest.errback(err)
98             
99         self._completed += 1
100         self._errors += 1
101         self.rerank()
102         if self.connecting:
103             self.connecting = False
104             self.clientGone(None)
105         
106     def close(self):
107         """Close the connection to the peer."""
108         if not self.closed:
109             self.proto.transport.loseConnection()
110
111     def submitRequest(self, request):
112         """Add a new request to the queue.
113         
114         @type request: L{twisted.web2.client.http.ClientRequest}
115         @return: deferred that will fire with the completed request
116         """
117         submissionTime = datetime.now()
118         deferRequest = defer.Deferred()
119         self.request_queue.append((request, deferRequest, submissionTime))
120         self.rerank()
121         reactor.callLater(0, self.processQueue)
122         return deferRequest
123
124     def processQueue(self):
125         """Check the queue to see if new requests can be sent to the peer."""
126         if not self.request_queue:
127             return
128         if self.connecting:
129             return
130         if self.closed:
131             self.connect()
132             return
133         if self.busy and not self.pipeline:
134             return
135         if self.outstanding and not self.pipeline:
136             return
137         if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
138                  and not self.proto.inRequests)
139                  or self.proto.readPersistent is PERSIST_PIPELINE):
140             log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
141                     (self.proto.readPersistent, self.proto.inRequests))
142             return
143
144         req, deferRequest, submissionTime = self.request_queue.pop(0)
145         try:
146             deferResponse = self.proto.submitRequest(req, False)
147         except:
148             # Try again later
149             log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
150             log.err()
151             self.request_queue.insert(0, (request, deferRequest, submissionTime))
152             ractor.callLater(1, self.processQueue)
153             return
154             
155         self.outstanding += 1
156         self.rerank()
157         deferResponse.addCallbacks(self.requestComplete, self.requestError,
158                                    callbackArgs = (req, deferRequest, submissionTime),
159                                    errbackArgs = (req, deferRequest))
160
161     def requestComplete(self, resp, req, deferRequest, submissionTime):
162         """Process a completed request."""
163         self._processLastResponse()
164         self.outstanding -= 1
165         assert self.outstanding >= 0
166         log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
167         self._completed += 1
168         now = datetime.now()
169         self._responseTimes.append((now, now - submissionTime))
170         self._lastResponse = (now, resp.stream.length)
171         self.rerank()
172         deferRequest.callback(resp)
173
174     def requestError(self, error, req, deferRequest):
175         """Process a request that ended with an error."""
176         self._processLastResponse()
177         self.outstanding -= 1
178         assert self.outstanding >= 0
179         log.msg('Download of %s generated error %r' % (req.uri, error))
180         self._completed += 1
181         self._errors += 1
182         self.rerank()
183         deferRequest.errback(error)
184         
185     def hashError(self, error):
186         """Log that a hash error occurred from the peer."""
187         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
188         self._errors += 1
189         self.rerank()
190
191     #{ IHTTPClientManager interface
192     def clientBusy(self, proto):
193         """Save the busy state."""
194         self.busy = True
195
196     def clientIdle(self, proto):
197         """Try to send a new request."""
198         self._processLastResponse()
199         self.busy = False
200         reactor.callLater(0, self.processQueue)
201         self.rerank()
202
203     def clientPipelining(self, proto):
204         """Try to send a new request."""
205         self.pipeline = True
206         reactor.callLater(0, self.processQueue)
207
208     def clientGone(self, proto):
209         """Mark sent requests as errors."""
210         self._processLastResponse()
211         self.busy = False
212         self.pipeline = False
213         self.closed = True
214         self.connecting = False
215         self.proto = None
216         self.rerank()
217         if self.request_queue:
218             reactor.callLater(0, self.processQueue)
219             
220     #{ Downloading request interface
221     def setCommonHeaders(self):
222         """Get the common HTTP headers for all requests."""
223         headers = http_headers.Headers()
224         headers.setHeader('Host', self.host)
225         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
226                           (version.short(), twisted_version.short(), web2_version.short()))
227         return headers
228     
229     def get(self, path, method="GET", modtime=None):
230         """Add a new request to the queue.
231         
232         @type path: C{string}
233         @param path: the path to request from the peer
234         @type method: C{string}
235         @param method: the HTTP method to use, 'GET' or 'HEAD'
236             (optional, defaults to 'GET')
237         @type modtime: C{int}
238         @param modtime: the modification time to use for an 'If-Modified-Since'
239             header, as seconds since the epoch
240             (optional, defaults to not sending that header)
241         """
242         headers = self.setCommonHeaders()
243         if modtime:
244             headers.setHeader('If-Modified-Since', modtime)
245         return self.submitRequest(ClientRequest(method, path, headers, None))
246     
247     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
248         """Add a new request with a Range header to the queue.
249         
250         @type path: C{string}
251         @param path: the path to request from the peer
252         @type rangeStart: C{int}
253         @param rangeStart: the byte to begin the request at
254         @type rangeEnd: C{int}
255         @param rangeEnd: the byte to end the request at (inclusive)
256         @type method: C{string}
257         @param method: the HTTP method to use, 'GET' or 'HEAD'
258             (optional, defaults to 'GET')
259         """
260         headers = self.setCommonHeaders()
261         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
262         return self.submitRequest(ClientRequest(method, path, headers, None))
263     
264     #{ Peer information
265     def isIdle(self):
266         """Check whether the peer is idle or not."""
267         return not self.busy and not self.request_queue and not self.outstanding
268     
269     def _processLastResponse(self):
270         """Save the download time of the last request for speed calculations."""
271         if self._lastResponse is not None:
272             now = datetime.now()
273             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
274             self._lastResponse = None
275             
276     def downloadSpeed(self):
277         """Gets the latest average download speed for the peer.
278         
279         The average is over the last 10 responses that occurred in the last hour.
280         """
281         total_time = 0.0
282         total_download = 0
283         now = datetime.now()
284         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
285                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
286             self._downloadSpeeds.pop(0)
287
288         # If there are none, then you get 0
289         if not self._downloadSpeeds:
290             return 0.0
291         
292         for download in self._downloadSpeeds:
293             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
294             total_download += download[2]
295
296         return total_download / total_time
297     
298     def responseTime(self):
299         """Gets the latest average response time for the peer.
300         
301         Response time is the time from receiving the request, to the time
302         the download begins. The average is over the last 10 responses that
303         occurred in the last hour.
304         """
305         total_response = 0.0
306         now = datetime.now()
307         while self._responseTimes and (len(self._responseTimes) > 10 or 
308                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
309             self._responseTimes.pop(0)
310
311         # If there are none, give it the benefit of the doubt
312         if not self._responseTimes:
313             return 0.0
314
315         for response in self._responseTimes:
316             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
317
318         return total_response / len(self._responseTimes)
319     
320     def rerank(self):
321         """Determine the ranking value for the peer.
322         
323         The ranking value is composed of 5 numbers, each exponentially
324         decreasing from 1 to 0 based on:
325          - if a connection to the peer is open
326          - the number of pending requests
327          - the time to download a single piece
328          - the number of errors
329          - the response time
330         """
331         rank = 1.0
332         if self.closed:
333             rank *= 0.9
334         rank *= exp(-(len(self.request_queue) + self.outstanding))
335         speed = self.downloadSpeed()
336         if speed > 0.0:
337             rank *= exp(-512.0*1024 / speed)
338         if self._completed:
339             rank *= exp(-10.0 * self._errors / self._completed)
340         rank *= exp(-self.responseTime() / 5.0)
341         self.rank = rank
342         
343 class TestClientManager(unittest.TestCase):
344     """Unit tests for the Peer."""
345     
346     client = None
347     pending_calls = []
348     length = []
349     
350     def gotResp(self, resp, num, expect):
351         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
352         if expect is not None:
353             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
354         while len(self.length) <= num:
355             self.length.append(0)
356         self.length[num] = 0
357         def addData(data, self = self, num = num):
358             self.length[num] += len(data)
359         def checkLength(resp, self = self, num = num, length = resp.stream.length):
360             self.failUnlessEqual(self.length[num], length)
361             return resp
362         df = stream_mod.readStream(resp.stream, addData)
363         df.addCallback(checkLength)
364         return df
365     
366     def test_download(self):
367         """Tests a normal download."""
368         host = 'www.ietf.org'
369         self.client = Peer(host, 80)
370         self.timeout = 10
371         
372         d = self.client.get('/rfc/rfc0013.txt')
373         d.addCallback(self.gotResp, 1, 1070)
374         return d
375         
376     def test_head(self):
377         """Tests a 'HEAD' request."""
378         host = 'www.ietf.org'
379         self.client = Peer(host, 80)
380         self.timeout = 10
381         
382         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
383         d.addCallback(self.gotResp, 1, 0)
384         return d
385         
386     def test_multiple_downloads(self):
387         """Tests multiple downloads with queueing and connection closing."""
388         host = 'www.ietf.org'
389         self.client = Peer(host, 80)
390         self.timeout = 120
391         lastDefer = defer.Deferred()
392         
393         def newRequest(path, num, expect, last=False):
394             d = self.client.get(path)
395             d.addCallback(self.gotResp, num, expect)
396             if last:
397                 d.addBoth(lastDefer.callback)
398
399         # 3 quick requests
400         newRequest("/rfc/rfc0006.txt", 1, 1776)
401         newRequest("/rfc/rfc2362.txt", 2, 159833)
402         newRequest("/rfc/rfc0801.txt", 3, 40824)
403         
404         # This one will probably be queued
405         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
406         
407         # Connection should still be open, but idle
408         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
409         
410         #Connection should be closed
411         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
412         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
413         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
414         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
415         
416         # Now it should definitely be closed
417         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
418         return lastDefer
419         
420     def test_multiple_quick_downloads(self):
421         """Tests lots of multiple downloads with queueing."""
422         host = 'www.ietf.org'
423         self.client = Peer(host, 80)
424         self.timeout = 30
425         lastDefer = defer.Deferred()
426         
427         def newRequest(path, num, expect, last=False):
428             d = self.client.get(path)
429             d.addCallback(self.gotResp, num, expect)
430             if last:
431                 d.addBoth(lastDefer.callback)
432                 
433         newRequest("/rfc/rfc0006.txt", 1, 1776)
434         newRequest("/rfc/rfc2362.txt", 2, 159833)
435         newRequest("/rfc/rfc0801.txt", 3, 40824)
436         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
437         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
438         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
439         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
440         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
441         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
442         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
443         return lastDefer
444         
445     def checkInfo(self):
446         log.msg('Rank is: %r' % self.client.rank)
447         log.msg('Download speed is: %r' % self.client.downloadSpeed())
448         log.msg('Response Time is: %r' % self.client.responseTime())
449         
450     def test_peer_info(self):
451         """Test retrieving the peer info during a download."""
452         host = 'www.ietf.org'
453         self.client = Peer(host, 80)
454         self.timeout = 120
455         lastDefer = defer.Deferred()
456         
457         def newRequest(path, num, expect, last=False):
458             d = self.client.get(path)
459             d.addCallback(self.gotResp, num, expect)
460             if last:
461                 d.addBoth(lastDefer.callback)
462                 
463         newRequest("/rfc/rfc0006.txt", 1, 1776)
464         newRequest("/rfc/rfc2362.txt", 2, 159833)
465         newRequest("/rfc/rfc0801.txt", 3, 40824)
466         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
467         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
468         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
469         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
470         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
471         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
472         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
473         
474         for i in xrange(2, 122, 2):
475             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
476         
477         return lastDefer
478         
479     def test_range(self):
480         """Test a Range request."""
481         host = 'www.ietf.org'
482         self.client = Peer(host, 80)
483         self.timeout = 10
484         
485         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
486         d.addCallback(self.gotResp, 1, 100)
487         return d
488         
489     def tearDown(self):
490         for p in self.pending_calls:
491             if p.active():
492                 p.cancel()
493         self.pending_calls = []
494         if self.client:
495             self.client.close()
496             self.client = None