Add property tracking to downloads from peers.
[quix0rs-apt-p2p.git] / apt_dht / HTTPDownloader.py
1
2 from math import exp
3 from datetime import datetime, timedelta
4
5 from twisted.internet import reactor, defer, protocol
6 from twisted.internet.protocol import ClientFactory
7 from twisted import version as twisted_version
8 from twisted.python import log
9 from twisted.web2.client.interfaces import IHTTPClientManager
10 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
11 from twisted.web2 import stream as stream_mod, http_headers
12 from twisted.web2 import version as web2_version
13 from twisted.trial import unittest
14 from zope.interface import implements
15
16 from apt_dht_conf import version
17
18 class Peer(ClientFactory):
19     """A manager for all HTTP requests to a single peer.
20     
21     Controls all requests that go to a single peer (host and port).
22     This includes buffering requests until they can be sent and reconnecting
23     in the event of the connection being closed.
24     
25     """
26
27     implements(IHTTPClientManager)
28     
29     def __init__(self, host, port=80):
30         self.host = host
31         self.port = port
32         self.busy = False
33         self.pipeline = False
34         self.closed = True
35         self.connecting = False
36         self.request_queue = []
37         self.response_queue = []
38         self.proto = None
39         self.connector = None
40         self._errors = 0
41         self._completed = 0
42         self._downloadSpeeds = []
43         self._lastResponse = None
44         self._responseTimes = []
45         
46     def connect(self):
47         assert self.closed and not self.connecting
48         self.connecting = True
49         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
50         d.addCallback(self.connected)
51
52     def connected(self, proto):
53         self.closed = False
54         self.connecting = False
55         self.proto = proto
56         self.processQueue()
57         
58     def close(self):
59         if not self.closed:
60             self.proto.transport.loseConnection()
61
62     def submitRequest(self, request):
63         request.submissionTime = datetime.now()
64         request.deferRequest = defer.Deferred()
65         self.request_queue.append(request)
66         self.processQueue()
67         return request.deferRequest
68
69     def processQueue(self):
70         if not self.request_queue:
71             return
72         if self.connecting:
73             return
74         if self.closed:
75             self.connect()
76             return
77         if self.busy and not self.pipeline:
78             return
79         if self.response_queue and not self.pipeline:
80             return
81
82         req = self.request_queue.pop(0)
83         self.response_queue.append(req)
84         req.deferResponse = self.proto.submitRequest(req, False)
85         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
86
87     def requestComplete(self, resp):
88         self._processLastResponse()
89         req = self.response_queue.pop(0)
90         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
91         self._completed += 1
92         if resp.code >= 400:
93             self._errors += 1
94         now = datetime.now()
95         self._responseTimes.append((now, now - req.submissionTime))
96         self._lastResponse = (now, resp.stream.length)
97         req.deferRequest.callback(resp)
98
99     def requestError(self, error):
100         self._processLastResponse()
101         req = self.response_queue.pop(0)
102         log.msg('Download of %s generated error %r' % (req.uri, error))
103         self._completed += 1
104         self._errors += 1
105         req.deferRequest.errback(error)
106         
107     def hashError(self, error):
108         """Log that a hash error occurred from the peer."""
109         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
110         self._errors += 1
111
112     # The IHTTPClientManager interface functions
113     def clientBusy(self, proto):
114         self.busy = True
115
116     def clientIdle(self, proto):
117         self._processLastResponse()
118         self.busy = False
119         self.processQueue()
120
121     def clientPipelining(self, proto):
122         self.pipeline = True
123         self.processQueue()
124
125     def clientGone(self, proto):
126         self._processLastResponse()
127         for req in self.response_queue:
128             req.deferRequest.errback(ProtocolError('lost connection'))
129         self.busy = False
130         self.pipeline = False
131         self.closed = True
132         self.connecting = False
133         self.response_queue = []
134         self.proto = None
135         if self.request_queue:
136             self.processQueue()
137             
138     # The downloading request interface functions
139     def setCommonHeaders(self):
140         headers = http_headers.Headers()
141         headers.setHeader('Host', self.host)
142         headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % 
143                           (version.short(), twisted_version.short(), web2_version.short()))
144         return headers
145     
146     def get(self, path, method="GET", modtime=None):
147         headers = self.setCommonHeaders()
148         if modtime:
149             headers.setHeader('If-Modified-Since', modtime)
150         return self.submitRequest(ClientRequest(method, path, headers, None))
151     
152     def getRange(self, path, rangeStart, rangeEnd, method="GET"):
153         headers = self.setCommonHeaders()
154         headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
155         return self.submitRequest(ClientRequest(method, path, headers, None))
156     
157     # Functions that return information about the peer
158     def isIdle(self):
159         return not self.busy and not self.request_queue and not self.response_queue
160     
161     def _processLastResponse(self):
162         if self._lastResponse is not None:
163             now = datetime.now()
164             self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
165             self._lastResponse = None
166             
167     def downloadSpeed(self):
168         """Gets the latest average download speed for the peer.
169         
170         The average is over the last 10 responses that occurred in the last hour.
171         """
172         total_time = 0.0
173         total_download = 0
174         now = datetime.now()
175         while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or 
176                                         now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
177             self._downloadSpeeds.pop(0)
178
179         # If there are none, then you get 0
180         if not self._downloadSpeeds:
181             return 0.0
182         
183         for download in self._downloadSpeeds:
184             total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
185             total_download += download[2]
186
187         return total_download / total_time
188     
189     def responseTime(self):
190         """Gets the latest average response time for the peer.
191         
192         Response time is the time from receiving the request, to the time
193         the download begins. The average is over the last 10 responses that
194         occurred in the last hour.
195         """
196         total_response = 0.0
197         now = datetime.now()
198         while self._responseTimes and (len(self._responseTimes) > 10 or 
199                                        now - self._responseTimes[0][0] > timedelta(seconds=3600)):
200             self._responseTimes.pop(0)
201
202         # If there are none, give it the benefit of the doubt
203         if not self._responseTimes:
204             return 0.0
205
206         for response in self._responseTimes:
207             total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
208
209         return total_response / len(self._responseTimes)
210     
211     def rank(self, fastest):
212         """Determine the ranking value for the peer.
213         
214         The ranking value is composed of 5 numbers:
215          - 1 if a connection to the peer is open, 0.9 otherwise
216          - 1 if there are no pending requests, to 0 if there are a maximum
217          - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
218          - 1 if all requests are good, 0 if all produced errors
219          - an exponentially decreasing number based on the response time
220         """
221         rank = 1.0
222         if self.closed:
223             rank *= 0.9
224         rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
225         if fastest > 0.0:
226             rank *= min(1.0, self.downloadSpeed() / fastest)
227         if self._completed:
228             rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
229         rank *= exp(-self.responseTime() / 5.0)
230         return rank
231         
232 class TestClientManager(unittest.TestCase):
233     """Unit tests for the Peer."""
234     
235     client = None
236     pending_calls = []
237     
238     def gotResp(self, resp, num, expect):
239         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
240         if expect is not None:
241             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
242         def print_(n):
243             pass
244         def printdone(n):
245             pass
246         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
247     
248     def test_download(self):
249         host = 'www.ietf.org'
250         self.client = Peer(host, 80)
251         self.timeout = 10
252         
253         d = self.client.get('/rfc/rfc0013.txt')
254         d.addCallback(self.gotResp, 1, 1070)
255         return d
256         
257     def test_head(self):
258         host = 'www.ietf.org'
259         self.client = Peer(host, 80)
260         self.timeout = 10
261         
262         d = self.client.get('/rfc/rfc0013.txt', "HEAD")
263         d.addCallback(self.gotResp, 1, 0)
264         return d
265         
266     def test_multiple_downloads(self):
267         host = 'www.ietf.org'
268         self.client = Peer(host, 80)
269         self.timeout = 120
270         lastDefer = defer.Deferred()
271         
272         def newRequest(path, num, expect, last=False):
273             d = self.client.get(path)
274             d.addCallback(self.gotResp, num, expect)
275             if last:
276                 d.addBoth(lastDefer.callback)
277                 
278         newRequest("/rfc/rfc0006.txt", 1, 1776)
279         newRequest("/rfc/rfc2362.txt", 2, 159833)
280         newRequest("/rfc/rfc0801.txt", 3, 40824)
281         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
282         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
283         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
284         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
285         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
286         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
287         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
288         return lastDefer
289         
290     def test_multiple_quick_downloads(self):
291         host = 'www.ietf.org'
292         self.client = Peer(host, 80)
293         self.timeout = 30
294         lastDefer = defer.Deferred()
295         
296         def newRequest(path, num, expect, last=False):
297             d = self.client.get(path)
298             d.addCallback(self.gotResp, num, expect)
299             if last:
300                 d.addBoth(lastDefer.callback)
301                 
302         newRequest("/rfc/rfc0006.txt", 1, 1776)
303         newRequest("/rfc/rfc2362.txt", 2, 159833)
304         newRequest("/rfc/rfc0801.txt", 3, 40824)
305         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
306         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
307         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
308         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
309         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
310         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
311         self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
312         return lastDefer
313         
314     def checkInfo(self):
315         log.msg('Rank is: %r' % self.client.rank(250.0*1024))
316         log.msg('Download speed is: %r' % self.client.downloadSpeed())
317         log.msg('Response Time is: %r' % self.client.responseTime())
318         
319     def test_peer_info(self):
320         host = 'www.ietf.org'
321         self.client = Peer(host, 80)
322         self.timeout = 120
323         lastDefer = defer.Deferred()
324         
325         def newRequest(path, num, expect, last=False):
326             d = self.client.get(path)
327             d.addCallback(self.gotResp, num, expect)
328             if last:
329                 d.addBoth(lastDefer.callback)
330                 
331         newRequest("/rfc/rfc0006.txt", 1, 1776)
332         newRequest("/rfc/rfc2362.txt", 2, 159833)
333         newRequest("/rfc/rfc0801.txt", 3, 40824)
334         self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
335         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
336         self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
337         self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
338         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
339         self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
340         self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
341         
342         for i in xrange(2, 122, 2):
343             self.pending_calls.append(reactor.callLater(i, self.checkInfo))
344         
345         return lastDefer
346         
347     def test_range(self):
348         host = 'www.ietf.org'
349         self.client = Peer(host, 80)
350         self.timeout = 10
351         
352         d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
353         d.addCallback(self.gotResp, 1, 100)
354         return d
355         
356     def tearDown(self):
357         for p in self.pending_calls:
358             if p.active():
359                 p.cancel()
360         self.pending_calls = []
361         if self.client:
362             self.client.close()
363             self.client = None