]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p/HTTPDownloader.py
HTTP client no longer keeps a response queue of requests.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21     """A modified client protocol that logs the number of bytes received."""
22     
23     def __init__(self, factory, stats = None, mirror = False):
24         HTTPClientProtocol.__init__(self, factory)
25         self.stats = stats
26         self.mirror = mirror
27     
28     def lineReceived(self, line):
29         if self.stats:
30             self.stats.receivedBytes(len(line) + 2, self.mirror)
31         HTTPClientProtocol.lineReceived(self, line)
32
33     def rawDataReceived(self, data):
34         if self.stats:
35             self.stats.receivedBytes(len(data), self.mirror)
36         HTTPClientProtocol.rawDataReceived(self, data)
37
38 class Peer(ClientFactory):
39     """A manager for all HTTP requests to a single peer.
40     
41     Controls all requests that go to a single peer (host and port).
42     This includes buffering requests until they can be sent and reconnecting
43     in the event of the connection being closed.
44     
45     """
46
47     implements(IHTTPClientManager)
48     
49     def __init__(self, host, port = 80, stats = None):
50         self.host = host
51         self.port = port
52         self.stats = stats
53         self.mirror = False
54         self.rank = 0.1
55         self.busy = False
56         self.pipeline = False
57         self.closed = True
58         self.connecting = False
59         self.request_queue = []
60         self.outstanding = 0
61         self.proto = None
62         self.connector = None
63         self._errors = 0
64         self._completed = 0
65         self._downloadSpeeds = []
66         self._lastResponse = None
67         self._responseTimes = []
68     
69     def __repr__(self):
70         return "(%r, %r, %r)" % (self.host, self.port, self.rank)
71         
72     #{ Manage the request queue
73     def connect(self):
74         """Connect to the peer."""
75         assert self.closed and not self.connecting
76         self.connecting = True
77         d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78                                    stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79         d.addCallbacks(self.connected, self.connectionError)
80
81     def connected(self, proto):
82         """Begin processing the queued requests."""
83         self.closed = False
84         self.connecting = False
85         self.proto = proto
86         reactor.callLater(0, self.processQueue)
87         
88     def connectionError(self, err):
89         """Cancel the requests."""
90         log.msg('Failed to connect to the peer by HTTP.')
91         log.err(err)
92
93         # Remove one request so that we don't loop indefinitely
94         if self.request_queue:
95             req = self.request_queue.pop(0)
96             req.deferRequest.errback(err)
97             
98         self._completed += 1
99         self._errors += 1
100         self.rerank()
101         if self.connecting:
102             self.connecting = False
103             self.clientGone(None)
104         
105     def close(self):
106         """Close the connection to the peer."""
107         if not self.closed:
108             self.proto.transport.loseConnection()
109
110     def submitRequest(self, request):
111         """Add a new request to the queue.
112         
113         @type request: L{twisted.web2.client.http.ClientRequest}
114         @return: deferred that will fire with the completed request
115         """
116         request.submissionTime = datetime.now()
117         request.deferRequest = defer.Deferred()
118         self.request_queue.append(request)
119         self.rerank()
120         reactor.callLater(0, self.processQueue)
121         return request.deferRequest
122
123     def processQueue(self):
124         """Check the queue to see if new requests can be sent to the peer."""
125         if not self.request_queue:
126             return
127         if self.connecting:
128             return
129         if self.closed:
130             self.connect()
131             return
132         if self.busy and not self.pipeline:
133             return
134         if self.outstanding and not self.pipeline:
135             return
136
137         req = self.request_queue.pop(0)
138         self.outstanding += 1
139         self.rerank()
140         req.deferResponse = self.proto.submitRequest(req, False)
141         req.deferResponse.addCallbacks(self.requestComplete, self.requestError,
142                                        callbackArgs = (req, ), errbackArgs = (req, ))
143
144     def requestComplete(self, resp, req):
145         """Process a completed request."""
146         self._processLastResponse()
147         self.outstanding -= 1
148         assert self.outstanding >= 0
149         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
150         self._completed += 1
151         now = datetime.now()
152         self._responseTimes.append((now, now - req.submissionTime))
153         self._lastResponse = (now, resp.stream.length)
154         self.rerank()
155         req.deferRequest.callback(resp)
156
157     def requestError(self, error, req):
158         """Process a request that ended with an error."""
159         self._processLastResponse()
160         self.outstanding -= 1
161         assert self.outstanding >= 0
162         log.msg('Download of %s generated error %r' % (req.uri, error))
163         self._completed += 1
164         self._errors += 1
165         self.rerank()
166         req.deferRequest.errback(error)
167         
168     def hashError(self, error):
169         """Log that a hash error occurred from the peer."""
170         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
171         self._errors += 1
172         self.rerank()
173
174     #{ IHTTPClientManager interface
175     def clientBusy(self, proto):
176         """Save the busy state."""
177         self.busy = True
178
179     def clientIdle(self, proto):
180         """Try to send a new request."""
181         self._processLastResponse()
182         self.busy = False
183         reactor.callLater(0, self.processQueue)
184         self.rerank()
185
186     def clientPipelining(self, proto):
187         """Try to send a new request."""
188         self.pipeline = True
189         reactor.callLater(0, self.processQueue)
190
191     def clientGone(self, proto):
192         """Mark sent requests as errors."""
193         self._processLastResponse()
194         self.busy = False
195         self.pipeline = False
196         self.closed = True
197         self.connecting = False
198         self.proto = None
199         self.rerank()
200         if self.request_queue:
201             reactor.callLater(0, self.processQueue)
202             
203     #{ Downloading request interface
204     def setCommonHeaders(self):
205         """Get the common HTTP headers for all requests."""
206         headers = http_headers.Headers()
207         headers.setHeader('Host', self.host)
208         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
209                           (version.short(), twisted_version.short(), web2_version.short()))
210         return headers
211     
212     def get(self, path, method="GET", modtime=None):
213         """Add a new request to the queue.
214         
215         @type path: C{string}
216         @param path: the path to request from the peer
217         @type method: C{string}
218         @param method: the HTTP method to use, 'GET' or 'HEAD'
219             (optional, defaults to 'GET')
220         @type modtime: C{int}
221         @param modtime: the modification time to use for an 'If-Modified-Since'
222             header, as seconds since the epoch
223             (optional, defaults to not sending that header)
224         """
225         headers = self.setCommonHeaders()
226         if modtime:
227             headers.setHeader('If-Modified-Since', modtime)
228         return self.submitRequest(ClientRequest(method, path, headers, None))
229     
230     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
231         """Add a new request with a Range header to the queue.
232         
233         @type path: C{string}
234         @param path: the path to request from the peer
235         @type rangeStart: C{int}
236         @param rangeStart: the byte to begin the request at
237         @type rangeEnd: C{int}
238         @param rangeEnd: the byte to end the request at (inclusive)
239         @type method: C{string}
240         @param method: the HTTP method to use, 'GET' or 'HEAD'
241             (optional, defaults to 'GET')
242         """
243         headers = self.setCommonHeaders()
244         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
245         return self.submitRequest(ClientRequest(method, path, headers, None))
246     
247     #{ Peer information
248     def isIdle(self):
249         """Check whether the peer is idle or not."""
250         return not self.busy and not self.request_queue and not self.outstanding
251     
252     def _processLastResponse(self):
253         """Save the download time of the last request for speed calculations."""
254         if self._lastResponse is not None:
255             now = datetime.now()
256             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
257             self._lastResponse = None
258             
259     def downloadSpeed(self):
260         """Gets the latest average download speed for the peer.
261         
262         The average is over the last 10 responses that occurred in the last hour.
263         """
264         total_time = 0.0
265         total_download = 0
266         now = datetime.now()
267         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
268                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
269             self._downloadSpeeds.pop(0)
270
271         # If there are none, then you get 0
272         if not self._downloadSpeeds:
273             return 0.0
274         
275         for download in self._downloadSpeeds:
276             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
277             total_download += download[2]
278
279         return total_download / total_time
280     
281     def responseTime(self):
282         """Gets the latest average response time for the peer.
283         
284         Response time is the time from receiving the request, to the time
285         the download begins. The average is over the last 10 responses that
286         occurred in the last hour.
287         """
288         total_response = 0.0
289         now = datetime.now()
290         while self._responseTimes and (len(self._responseTimes) > 10 or 
291                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
292             self._responseTimes.pop(0)
293
294         # If there are none, give it the benefit of the doubt
295         if not self._responseTimes:
296             return 0.0
297
298         for response in self._responseTimes:
299             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
300
301         return total_response / len(self._responseTimes)
302     
303     def rerank(self):
304         """Determine the ranking value for the peer.
305         
306         The ranking value is composed of 5 numbers, each exponentially
307         decreasing from 1 to 0 based on:
308          - if a connection to the peer is open
309          - the number of pending requests
310          - the time to download a single piece
311          - the number of errors
312          - the response time
313         """
314         rank = 1.0
315         if self.closed:
316             rank *= 0.9
317         rank *= exp(-(len(self.request_queue) + self.outstanding))
318         speed = self.downloadSpeed()
319         if speed > 0.0:
320             rank *= exp(-512.0*1024 / speed)
321         if self._completed:
322             rank *= exp(-10.0 * self._errors / self._completed)
323         rank *= exp(-self.responseTime() / 5.0)
324         self.rank = rank
325         
326 class TestClientManager(unittest.TestCase):
327     """Unit tests for the Peer."""
328     
329     client = None
330     pending_calls = []
331     length = []
332     
333     def gotResp(self, resp, num, expect):
334         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
335         if expect is not None:
336             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
337         while len(self.length) <= num:
338             self.length.append(0)
339         self.length[num] = 0
340         def addData(data, self = self, num = num):
341             self.length[num] += len(data)
342         def checkLength(resp, self = self, num = num, length = resp.stream.length):
343             self.failUnlessEqual(self.length[num], length)
344             return resp
345         df = stream_mod.readStream(resp.stream, addData)
346         df.addCallback(checkLength)
347         return df
348     
349     def test_download(self):
350         """Tests a normal download."""
351         host = 'www.ietf.org'
352         self.client = Peer(host, 80)
353         self.timeout = 10
354         
355         d = self.client.get('/rfc/rfc0013.txt')
356         d.addCallback(self.gotResp, 1, 1070)
357         return d
358         
359     def test_head(self):
360         """Tests a 'HEAD' request."""
361         host = 'www.ietf.org'
362         self.client = Peer(host, 80)
363         self.timeout = 10
364         
365         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
366         d.addCallback(self.gotResp, 1, 0)
367         return d
368         
369     def test_multiple_downloads(self):
370         """Tests multiple downloads with queueing and connection closing."""
371         host = 'www.ietf.org'
372         self.client = Peer(host, 80)
373         self.timeout = 120
374         lastDefer = defer.Deferred()
375         
376         def newRequest(path, num, expect, last=False):
377             d = self.client.get(path)
378             d.addCallback(self.gotResp, num, expect)
379             if last:
380                 d.addBoth(lastDefer.callback)
381
382         # 3 quick requests
383         newRequest("/rfc/rfc0006.txt", 1, 1776)
384         newRequest("/rfc/rfc2362.txt", 2, 159833)
385         newRequest("/rfc/rfc0801.txt", 3, 40824)
386         
387         # This one will probably be queued
388         self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
389         
390         # Connection should still be open, but idle
391         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
392         
393         #Connection should be closed
394         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
395         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
396         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
397         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
398         
399         # Now it should definitely be closed
400         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
401         return lastDefer
402         
403     def test_multiple_quick_downloads(self):
404         """Tests lots of multiple downloads with queueing."""
405         host = 'www.ietf.org'
406         self.client = Peer(host, 80)
407         self.timeout = 30
408         lastDefer = defer.Deferred()
409         
410         def newRequest(path, num, expect, last=False):
411             d = self.client.get(path)
412             d.addCallback(self.gotResp, num, expect)
413             if last:
414                 d.addBoth(lastDefer.callback)
415                 
416         newRequest("/rfc/rfc0006.txt", 1, 1776)
417         newRequest("/rfc/rfc2362.txt", 2, 159833)
418         newRequest("/rfc/rfc0801.txt", 3, 40824)
419         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
420         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
421         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
422         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
423         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
424         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
425         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
426         return lastDefer
427         
428     def checkInfo(self):
429         log.msg('Rank is: %r' % self.client.rank)
430         log.msg('Download speed is: %r' % self.client.downloadSpeed())
431         log.msg('Response Time is: %r' % self.client.responseTime())
432         
433     def test_peer_info(self):
434         """Test retrieving the peer info during a download."""
435         host = 'www.ietf.org'
436         self.client = Peer(host, 80)
437         self.timeout = 120
438         lastDefer = defer.Deferred()
439         
440         def newRequest(path, num, expect, last=False):
441             d = self.client.get(path)
442             d.addCallback(self.gotResp, num, expect)
443             if last:
444                 d.addBoth(lastDefer.callback)
445                 
446         newRequest("/rfc/rfc0006.txt", 1, 1776)
447         newRequest("/rfc/rfc2362.txt", 2, 159833)
448         newRequest("/rfc/rfc0801.txt", 3, 40824)
449         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
450         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
451         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
452         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
453         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
454         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
455         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
456         
457         for i in xrange(2, 122, 2):
458             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
459         
460         return lastDefer
461         
462     def test_range(self):
463         """Test a Range request."""
464         host = 'www.ietf.org'
465         self.client = Peer(host, 80)
466         self.timeout = 10
467         
468         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
469         d.addCallback(self.gotResp, 1, 100)
470         return d
471         
472     def tearDown(self):
473         for p in self.pending_calls:
474             if p.active():
475                 p.cancel()
476         self.pending_calls = []
477         if self.client:
478             self.client.close()
479             self.client = None