]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p/HTTPDownloader.py
Make all the peer rankings exponentially decreasing.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
1
2 """Manage all download requests to a single site."""
3
4 from math import exp
5 from datetime import datetime, timedelta
6
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
17
18 from apt_p2p_conf import version
19
20 class Peer(ClientFactory):
21     """A manager for all HTTP requests to a single peer.
22     
23     Controls all requests that go to a single peer (host and port).
24     This includes buffering requests until they can be sent and reconnecting
25     in the event of the connection being closed.
26     
27     """
28
29     implements(IHTTPClientManager)
30     
31     def __init__(self, host, port=80):
32         self.host = host
33         self.port = port
34         self.busy = False
35         self.pipeline = False
36         self.closed = True
37         self.connecting = False
38         self.request_queue = []
39         self.response_queue = []
40         self.proto = None
41         self.connector = None
42         self._errors = 0
43         self._completed = 0
44         self._downloadSpeeds = []
45         self._lastResponse = None
46         self._responseTimes = []
47         
48     #{ Manage the request queue
49     def connect(self):
50         """Connect to the peer."""
51         assert self.closed and not self.connecting
52         self.connecting = True
53         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
54         d.addCallback(self.connected)
55
56     def connected(self, proto):
57         """Begin processing the queued requests."""
58         self.closed = False
59         self.connecting = False
60         self.proto = proto
61         self.processQueue()
62         
63     def close(self):
64         """Close the connection to the peer."""
65         if not self.closed:
66             self.proto.transport.loseConnection()
67
68     def submitRequest(self, request):
69         """Add a new request to the queue.
70         
71         @type request: L{twisted.web2.client.http.ClientRequest}
72         @return: deferred that will fire with the completed request
73         """
74         request.submissionTime = datetime.now()
75         request.deferRequest = defer.Deferred()
76         self.request_queue.append(request)
77         self.processQueue()
78         return request.deferRequest
79
80     def processQueue(self):
81         """Check the queue to see if new requests can be sent to the peer."""
82         if not self.request_queue:
83             return
84         if self.connecting:
85             return
86         if self.closed:
87             self.connect()
88             return
89         if self.busy and not self.pipeline:
90             return
91         if self.response_queue and not self.pipeline:
92             return
93
94         req = self.request_queue.pop(0)
95         self.response_queue.append(req)
96         req.deferResponse = self.proto.submitRequest(req, False)
97         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
98
99     def requestComplete(self, resp):
100         """Process a completed request."""
101         self._processLastResponse()
102         req = self.response_queue.pop(0)
103         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
104         self._completed += 1
105         if resp.code >= 400:
106             self._errors += 1
107         now = datetime.now()
108         self._responseTimes.append((now, now - req.submissionTime))
109         self._lastResponse = (now, resp.stream.length)
110         req.deferRequest.callback(resp)
111
112     def requestError(self, error):
113         """Process a request that ended with an error."""
114         self._processLastResponse()
115         req = self.response_queue.pop(0)
116         log.msg('Download of %s generated error %r' % (req.uri, error))
117         self._completed += 1
118         self._errors += 1
119         req.deferRequest.errback(error)
120         
121     def hashError(self, error):
122         """Log that a hash error occurred from the peer."""
123         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
124         self._errors += 1
125
126     #{ IHTTPClientManager interface
127     def clientBusy(self, proto):
128         """Save the busy state."""
129         self.busy = True
130
131     def clientIdle(self, proto):
132         """Try to send a new request."""
133         self._processLastResponse()
134         self.busy = False
135         self.processQueue()
136
137     def clientPipelining(self, proto):
138         """Try to send a new request."""
139         self.pipeline = True
140         self.processQueue()
141
142     def clientGone(self, proto):
143         """Mark sent requests as errors."""
144         self._processLastResponse()
145         for req in self.response_queue:
146             req.deferRequest.errback(ProtocolError('lost connection'))
147         self.busy = False
148         self.pipeline = False
149         self.closed = True
150         self.connecting = False
151         self.response_queue = []
152         self.proto = None
153         if self.request_queue:
154             self.processQueue()
155             
156     #{ Downloading request interface
157     def setCommonHeaders(self):
158         """Get the common HTTP headers for all requests."""
159         headers = http_headers.Headers()
160         headers.setHeader('Host', self.host)
161         headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' % 
162                           (version.short(), twisted_version.short(), web2_version.short()))
163         return headers
164     
165     def get(self, path, method="GET", modtime=None):
166         """Add a new request to the queue.
167         
168         @type path: C{string}
169         @param path: the path to request from the peer
170         @type method: C{string}
171         @param method: the HTTP method to use, 'GET' or 'HEAD'
172             (optional, defaults to 'GET')
173         @type modtime: C{int}
174         @param modtime: the modification time to use for an 'If-Modified-Since'
175             header, as seconds since the epoch
176             (optional, defaults to not sending that header)
177         """
178         headers = self.setCommonHeaders()
179         if modtime:
180             headers.setHeader('If-Modified-Since', modtime)
181         return self.submitRequest(ClientRequest(method, path, headers, None))
182     
183     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
184         """Add a new request with a Range header to the queue.
185         
186         @type path: C{string}
187         @param path: the path to request from the peer
188         @type rangeStart: C{int}
189         @param rangeStart: the byte to begin the request at
190         @type rangeEnd: C{int}
191         @param rangeEnd: the byte to end the request at (inclusive)
192         @type method: C{string}
193         @param method: the HTTP method to use, 'GET' or 'HEAD'
194             (optional, defaults to 'GET')
195         """
196         headers = self.setCommonHeaders()
197         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
198         return self.submitRequest(ClientRequest(method, path, headers, None))
199     
200     #{ Peer information
201     def isIdle(self):
202         """Check whether the peer is idle or not."""
203         return not self.busy and not self.request_queue and not self.response_queue
204     
205     def _processLastResponse(self):
206         """Save the download time of the last request for speed calculations."""
207         if self._lastResponse is not None:
208             now = datetime.now()
209             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
210             self._lastResponse = None
211             
212     def downloadSpeed(self):
213         """Gets the latest average download speed for the peer.
214         
215         The average is over the last 10 responses that occurred in the last hour.
216         """
217         total_time = 0.0
218         total_download = 0
219         now = datetime.now()
220         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
221                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
222             self._downloadSpeeds.pop(0)
223
224         # If there are none, then you get 0
225         if not self._downloadSpeeds:
226             return 0.0
227         
228         for download in self._downloadSpeeds:
229             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
230             total_download += download[2]
231
232         return total_download / total_time
233     
234     def responseTime(self):
235         """Gets the latest average response time for the peer.
236         
237         Response time is the time from receiving the request, to the time
238         the download begins. The average is over the last 10 responses that
239         occurred in the last hour.
240         """
241         total_response = 0.0
242         now = datetime.now()
243         while self._responseTimes and (len(self._responseTimes) > 10 or 
244                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
245             self._responseTimes.pop(0)
246
247         # If there are none, give it the benefit of the doubt
248         if not self._responseTimes:
249             return 0.0
250
251         for response in self._responseTimes:
252             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
253
254         return total_response / len(self._responseTimes)
255     
256     def rank(self):
257         """Determine the ranking value for the peer.
258         
259         The ranking value is composed of 5 numbers, each exponentially
260         decreasing from 1 to 0 based on:
261          - if a connection to the peer is open
262          - the number of pending requests
263          - the time to download a single piece
264          - the number of errors
265          - the response time
266         """
267         rank = 1.0
268         if self.closed:
269             rank *= 0.9
270         rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
271         speed = self.downloadSpeed()
272         if speed > 0.0:
273             rank *= exp(-512.0*1024 / speed)
274         if self._completed:
275             rank *= exp(-float(self._errors) / self._completed)
276         rank *= exp(-self.responseTime() / 5.0)
277         return rank
278         
279 class TestClientManager(unittest.TestCase):
280     """Unit tests for the Peer."""
281     
282     client = None
283     pending_calls = []
284     
285     def gotResp(self, resp, num, expect):
286         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
287         if expect is not None:
288             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
289         def print_(n):
290             pass
291         def printdone(n):
292             pass
293         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
294     
295     def test_download(self):
296         """Tests a normal download."""
297         host = 'www.ietf.org'
298         self.client = Peer(host, 80)
299         self.timeout = 10
300         
301         d = self.client.get('/rfc/rfc0013.txt')
302         d.addCallback(self.gotResp, 1, 1070)
303         return d
304         
305     def test_head(self):
306         """Tests a 'HEAD' request."""
307         host = 'www.ietf.org'
308         self.client = Peer(host, 80)
309         self.timeout = 10
310         
311         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
312         d.addCallback(self.gotResp, 1, 0)
313         return d
314         
315     def test_multiple_downloads(self):
316         """Tests multiple downloads with queueing and connection closing."""
317         host = 'www.ietf.org'
318         self.client = Peer(host, 80)
319         self.timeout = 120
320         lastDefer = defer.Deferred()
321         
322         def newRequest(path, num, expect, last=False):
323             d = self.client.get(path)
324             d.addCallback(self.gotResp, num, expect)
325             if last:
326                 d.addBoth(lastDefer.callback)
327
328         # 3 quick requests
329         newRequest("/rfc/rfc0006.txt", 1, 1776)
330         newRequest("/rfc/rfc2362.txt", 2, 159833)
331         newRequest("/rfc/rfc0801.txt", 3, 40824)
332         
333         # This one will probably be queued
334         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
335         
336         # Connection should still be open, but idle
337         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
338         
339         #Connection should be closed
340         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
341         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
342         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
343         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
344         
345         # Now it should definitely be closed
346         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
347         return lastDefer
348         
349     def test_multiple_quick_downloads(self):
350         """Tests lots of multiple downloads with queueing."""
351         host = 'www.ietf.org'
352         self.client = Peer(host, 80)
353         self.timeout = 30
354         lastDefer = defer.Deferred()
355         
356         def newRequest(path, num, expect, last=False):
357             d = self.client.get(path)
358             d.addCallback(self.gotResp, num, expect)
359             if last:
360                 d.addBoth(lastDefer.callback)
361                 
362         newRequest("/rfc/rfc0006.txt", 1, 1776)
363         newRequest("/rfc/rfc2362.txt", 2, 159833)
364         newRequest("/rfc/rfc0801.txt", 3, 40824)
365         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
366         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
367         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
368         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
369         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
370         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
371         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
372         return lastDefer
373         
374     def checkInfo(self):
375         log.msg('Rank is: %r' % self.client.rank(250.0*1024))
376         log.msg('Download speed is: %r' % self.client.downloadSpeed())
377         log.msg('Response Time is: %r' % self.client.responseTime())
378         
379     def test_peer_info(self):
380         """Test retrieving the peer info during a download."""
381         host = 'www.ietf.org'
382         self.client = Peer(host, 80)
383         self.timeout = 120
384         lastDefer = defer.Deferred()
385         
386         def newRequest(path, num, expect, last=False):
387             d = self.client.get(path)
388             d.addCallback(self.gotResp, num, expect)
389             if last:
390                 d.addBoth(lastDefer.callback)
391                 
392         newRequest("/rfc/rfc0006.txt", 1, 1776)
393         newRequest("/rfc/rfc2362.txt", 2, 159833)
394         newRequest("/rfc/rfc0801.txt", 3, 40824)
395         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
396         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
397         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
398         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
399         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
400         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
401         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
402         
403         for i in xrange(2, 122, 2):
404             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
405         
406         return lastDefer
407         
408     def test_range(self):
409         """Test a Range request."""
410         host = 'www.ietf.org'
411         self.client = Peer(host, 80)
412         self.timeout = 10
413         
414         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
415         d.addCallback(self.gotResp, 1, 100)
416         return d
417         
418     def tearDown(self):
419         for p in self.pending_calls:
420             if p.active():
421                 p.cancel()
422         self.pending_calls = []
423         if self.client:
424             self.client.close()
425             self.client = None