2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
37 self.connecting = False
38 self.request_queue = []
39 self.response_queue = []
44 self._downloadSpeeds = []
45 self._lastResponse = None
46 self._responseTimes = []
48 #{ Manage the request queue
50 """Connect to the peer."""
51 assert self.closed and not self.connecting
52 self.connecting = True
53 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
54 d.addCallback(self.connected)
56 def connected(self, proto):
57 """Begin processing the queued requests."""
59 self.connecting = False
64 """Close the connection to the peer."""
66 self.proto.transport.loseConnection()
68 def submitRequest(self, request):
69 """Add a new request to the queue.
71 @type request: L{twisted.web2.client.http.ClientRequest}
72 @return: deferred that will fire with the completed request
74 request.submissionTime = datetime.now()
75 request.deferRequest = defer.Deferred()
76 self.request_queue.append(request)
78 return request.deferRequest
80 def processQueue(self):
81 """Check the queue to see if new requests can be sent to the peer."""
82 if not self.request_queue:
89 if self.busy and not self.pipeline:
91 if self.response_queue and not self.pipeline:
94 req = self.request_queue.pop(0)
95 self.response_queue.append(req)
96 req.deferResponse = self.proto.submitRequest(req, False)
97 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
99 def requestComplete(self, resp):
100 """Process a completed request."""
101 self._processLastResponse()
102 req = self.response_queue.pop(0)
103 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
108 self._responseTimes.append((now, now - req.submissionTime))
109 self._lastResponse = (now, resp.stream.length)
110 req.deferRequest.callback(resp)
112 def requestError(self, error):
113 """Process a request that ended with an error."""
114 self._processLastResponse()
115 req = self.response_queue.pop(0)
116 log.msg('Download of %s generated error %r' % (req.uri, error))
119 req.deferRequest.errback(error)
121 def hashError(self, error):
122 """Log that a hash error occurred from the peer."""
123 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
126 #{ IHTTPClientManager interface
127 def clientBusy(self, proto):
128 """Save the busy state."""
131 def clientIdle(self, proto):
132 """Try to send a new request."""
133 self._processLastResponse()
137 def clientPipelining(self, proto):
138 """Try to send a new request."""
142 def clientGone(self, proto):
143 """Mark sent requests as errors."""
144 self._processLastResponse()
145 for req in self.response_queue:
146 req.deferRequest.errback(ProtocolError('lost connection'))
148 self.pipeline = False
150 self.connecting = False
151 self.response_queue = []
153 if self.request_queue:
156 #{ Downloading request interface
157 def setCommonHeaders(self):
158 """Get the common HTTP headers for all requests."""
159 headers = http_headers.Headers()
160 headers.setHeader('Host', self.host)
161 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
162 (version.short(), twisted_version.short(), web2_version.short()))
165 def get(self, path, method="GET", modtime=None):
166 """Add a new request to the queue.
168 @type path: C{string}
169 @param path: the path to request from the peer
170 @type method: C{string}
171 @param method: the HTTP method to use, 'GET' or 'HEAD'
172 (optional, defaults to 'GET')
173 @type modtime: C{int}
174 @param modtime: the modification time to use for an 'If-Modified-Since'
175 header, as seconds since the epoch
176 (optional, defaults to not sending that header)
178 headers = self.setCommonHeaders()
180 headers.setHeader('If-Modified-Since', modtime)
181 return self.submitRequest(ClientRequest(method, path, headers, None))
183 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
184 """Add a new request with a Range header to the queue.
186 @type path: C{string}
187 @param path: the path to request from the peer
188 @type rangeStart: C{int}
189 @param rangeStart: the byte to begin the request at
190 @type rangeEnd: C{int}
191 @param rangeEnd: the byte to end the request at (inclusive)
192 @type method: C{string}
193 @param method: the HTTP method to use, 'GET' or 'HEAD'
194 (optional, defaults to 'GET')
196 headers = self.setCommonHeaders()
197 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
198 return self.submitRequest(ClientRequest(method, path, headers, None))
202 """Check whether the peer is idle or not."""
203 return not self.busy and not self.request_queue and not self.response_queue
205 def _processLastResponse(self):
206 """Save the download time of the last request for speed calculations."""
207 if self._lastResponse is not None:
209 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
210 self._lastResponse = None
212 def downloadSpeed(self):
213 """Gets the latest average download speed for the peer.
215 The average is over the last 10 responses that occurred in the last hour.
220 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
221 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
222 self._downloadSpeeds.pop(0)
224 # If there are none, then you get 0
225 if not self._downloadSpeeds:
228 for download in self._downloadSpeeds:
229 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
230 total_download += download[2]
232 return total_download / total_time
234 def responseTime(self):
235 """Gets the latest average response time for the peer.
237 Response time is the time from receiving the request, to the time
238 the download begins. The average is over the last 10 responses that
239 occurred in the last hour.
243 while self._responseTimes and (len(self._responseTimes) > 10 or
244 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
245 self._responseTimes.pop(0)
247 # If there are none, give it the benefit of the doubt
248 if not self._responseTimes:
251 for response in self._responseTimes:
252 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
254 return total_response / len(self._responseTimes)
256 def rank(self, fastest):
257 """Determine the ranking value for the peer.
259 The ranking value is composed of 5 numbers:
260 - 1 if a connection to the peer is open, 0.9 otherwise
261 - 1 if there are no pending requests, to 0 if there are a maximum
262 - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
263 - 1 if all requests are good, 0 if all produced errors
264 - an exponentially decreasing number based on the response time
269 rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
271 rank *= min(1.0, self.downloadSpeed() / fastest)
273 rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
274 rank *= exp(-self.responseTime() / 5.0)
277 class TestClientManager(unittest.TestCase):
278 """Unit tests for the Peer."""
283 def gotResp(self, resp, num, expect):
284 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
285 if expect is not None:
286 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
291 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
293 def test_download(self):
294 """Tests a normal download."""
295 host = 'www.ietf.org'
296 self.client = Peer(host, 80)
299 d = self.client.get('/rfc/rfc0013.txt')
300 d.addCallback(self.gotResp, 1, 1070)
304 """Tests a 'HEAD' request."""
305 host = 'www.ietf.org'
306 self.client = Peer(host, 80)
309 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
310 d.addCallback(self.gotResp, 1, 0)
313 def test_multiple_downloads(self):
314 """Tests multiple downloads with queueing and connection closing."""
315 host = 'www.ietf.org'
316 self.client = Peer(host, 80)
318 lastDefer = defer.Deferred()
320 def newRequest(path, num, expect, last=False):
321 d = self.client.get(path)
322 d.addCallback(self.gotResp, num, expect)
324 d.addBoth(lastDefer.callback)
327 newRequest("/rfc/rfc0006.txt", 1, 1776)
328 newRequest("/rfc/rfc2362.txt", 2, 159833)
329 newRequest("/rfc/rfc0801.txt", 3, 40824)
331 # This one will probably be queued
332 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
334 # Connection should still be open, but idle
335 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
337 #Connection should be closed
338 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
339 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
340 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
341 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
343 # Now it should definitely be closed
344 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
347 def test_multiple_quick_downloads(self):
348 """Tests lots of multiple downloads with queueing."""
349 host = 'www.ietf.org'
350 self.client = Peer(host, 80)
352 lastDefer = defer.Deferred()
354 def newRequest(path, num, expect, last=False):
355 d = self.client.get(path)
356 d.addCallback(self.gotResp, num, expect)
358 d.addBoth(lastDefer.callback)
360 newRequest("/rfc/rfc0006.txt", 1, 1776)
361 newRequest("/rfc/rfc2362.txt", 2, 159833)
362 newRequest("/rfc/rfc0801.txt", 3, 40824)
363 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
364 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
365 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
366 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
367 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
368 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
369 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
373 log.msg('Rank is: %r' % self.client.rank(250.0*1024))
374 log.msg('Download speed is: %r' % self.client.downloadSpeed())
375 log.msg('Response Time is: %r' % self.client.responseTime())
377 def test_peer_info(self):
378 """Test retrieving the peer info during a download."""
379 host = 'www.ietf.org'
380 self.client = Peer(host, 80)
382 lastDefer = defer.Deferred()
384 def newRequest(path, num, expect, last=False):
385 d = self.client.get(path)
386 d.addCallback(self.gotResp, num, expect)
388 d.addBoth(lastDefer.callback)
390 newRequest("/rfc/rfc0006.txt", 1, 1776)
391 newRequest("/rfc/rfc2362.txt", 2, 159833)
392 newRequest("/rfc/rfc0801.txt", 3, 40824)
393 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
394 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
395 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
396 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
397 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
398 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
399 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
401 for i in xrange(2, 122, 2):
402 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
406 def test_range(self):
407 """Test a Range request."""
408 host = 'www.ietf.org'
409 self.client = Peer(host, 80)
412 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
413 d.addCallback(self.gotResp, 1, 100)
417 for p in self.pending_calls:
420 self.pending_calls = []