2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
39 self.connecting = False
40 self.request_queue = []
41 self.response_queue = []
46 self._downloadSpeeds = []
47 self._lastResponse = None
48 self._responseTimes = []
51 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
53 #{ Manage the request queue
55 """Connect to the peer."""
56 assert self.closed and not self.connecting
57 self.connecting = True
58 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
59 d.addCallbacks(self.connected, self.connectionError)
61 def connected(self, proto):
62 """Begin processing the queued requests."""
64 self.connecting = False
68 def connectionError(self, err):
69 """Cancel the requests."""
70 log.msg('Failed to connect to the peer by HTTP.')
73 # Remove one request so that we don't loop indefinitely
74 if self.request_queue:
75 req = self.request_queue.pop(0)
76 req.deferRequest.errback(err)
82 self.connecting = False
86 """Close the connection to the peer."""
88 self.proto.transport.loseConnection()
90 def submitRequest(self, request):
91 """Add a new request to the queue.
93 @type request: L{twisted.web2.client.http.ClientRequest}
94 @return: deferred that will fire with the completed request
96 request.submissionTime = datetime.now()
97 request.deferRequest = defer.Deferred()
98 self.request_queue.append(request)
101 return request.deferRequest
103 def processQueue(self):
104 """Check the queue to see if new requests can be sent to the peer."""
105 if not self.request_queue:
112 if self.busy and not self.pipeline:
114 if self.response_queue and not self.pipeline:
117 req = self.request_queue.pop(0)
118 self.response_queue.append(req)
120 req.deferResponse = self.proto.submitRequest(req, False)
121 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
123 def requestComplete(self, resp):
124 """Process a completed request."""
125 self._processLastResponse()
126 req = self.response_queue.pop(0)
127 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
130 self._responseTimes.append((now, now - req.submissionTime))
131 self._lastResponse = (now, resp.stream.length)
133 req.deferRequest.callback(resp)
135 def requestError(self, error):
136 """Process a request that ended with an error."""
137 self._processLastResponse()
138 req = self.response_queue.pop(0)
139 log.msg('Download of %s generated error %r' % (req.uri, error))
143 req.deferRequest.errback(error)
145 def hashError(self, error):
146 """Log that a hash error occurred from the peer."""
147 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
151 #{ IHTTPClientManager interface
152 def clientBusy(self, proto):
153 """Save the busy state."""
156 def clientIdle(self, proto):
157 """Try to send a new request."""
158 self._processLastResponse()
163 def clientPipelining(self, proto):
164 """Try to send a new request."""
168 def clientGone(self, proto):
169 """Mark sent requests as errors."""
170 self._processLastResponse()
171 for req in self.response_queue:
172 req.deferRequest.errback(ProtocolError('lost connection'))
174 self.pipeline = False
176 self.connecting = False
177 self.response_queue = []
180 if self.request_queue:
183 #{ Downloading request interface
184 def setCommonHeaders(self):
185 """Get the common HTTP headers for all requests."""
186 headers = http_headers.Headers()
187 headers.setHeader('Host', self.host)
188 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
189 (version.short(), twisted_version.short(), web2_version.short()))
192 def get(self, path, method="GET", modtime=None):
193 """Add a new request to the queue.
195 @type path: C{string}
196 @param path: the path to request from the peer
197 @type method: C{string}
198 @param method: the HTTP method to use, 'GET' or 'HEAD'
199 (optional, defaults to 'GET')
200 @type modtime: C{int}
201 @param modtime: the modification time to use for an 'If-Modified-Since'
202 header, as seconds since the epoch
203 (optional, defaults to not sending that header)
205 headers = self.setCommonHeaders()
207 headers.setHeader('If-Modified-Since', modtime)
208 return self.submitRequest(ClientRequest(method, path, headers, None))
210 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
211 """Add a new request with a Range header to the queue.
213 @type path: C{string}
214 @param path: the path to request from the peer
215 @type rangeStart: C{int}
216 @param rangeStart: the byte to begin the request at
217 @type rangeEnd: C{int}
218 @param rangeEnd: the byte to end the request at (inclusive)
219 @type method: C{string}
220 @param method: the HTTP method to use, 'GET' or 'HEAD'
221 (optional, defaults to 'GET')
223 headers = self.setCommonHeaders()
224 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
225 return self.submitRequest(ClientRequest(method, path, headers, None))
229 """Check whether the peer is idle or not."""
230 return not self.busy and not self.request_queue and not self.response_queue
232 def _processLastResponse(self):
233 """Save the download time of the last request for speed calculations."""
234 if self._lastResponse is not None:
236 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
237 self._lastResponse = None
239 def downloadSpeed(self):
240 """Gets the latest average download speed for the peer.
242 The average is over the last 10 responses that occurred in the last hour.
247 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
248 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
249 self._downloadSpeeds.pop(0)
251 # If there are none, then you get 0
252 if not self._downloadSpeeds:
255 for download in self._downloadSpeeds:
256 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
257 total_download += download[2]
259 return total_download / total_time
261 def responseTime(self):
262 """Gets the latest average response time for the peer.
264 Response time is the time from receiving the request, to the time
265 the download begins. The average is over the last 10 responses that
266 occurred in the last hour.
270 while self._responseTimes and (len(self._responseTimes) > 10 or
271 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
272 self._responseTimes.pop(0)
274 # If there are none, give it the benefit of the doubt
275 if not self._responseTimes:
278 for response in self._responseTimes:
279 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
281 return total_response / len(self._responseTimes)
284 """Determine the ranking value for the peer.
286 The ranking value is composed of 5 numbers, each exponentially
287 decreasing from 1 to 0 based on:
288 - if a connection to the peer is open
289 - the number of pending requests
290 - the time to download a single piece
291 - the number of errors
297 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
298 speed = self.downloadSpeed()
300 rank *= exp(-512.0*1024 / speed)
302 rank *= exp(-float(self._errors) / self._completed)
303 rank *= exp(-self.responseTime() / 5.0)
306 class TestClientManager(unittest.TestCase):
307 """Unit tests for the Peer."""
312 def gotResp(self, resp, num, expect):
313 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
314 if expect is not None:
315 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
320 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
322 def test_download(self):
323 """Tests a normal download."""
324 host = 'www.ietf.org'
325 self.client = Peer(host, 80)
328 d = self.client.get('/rfc/rfc0013.txt')
329 d.addCallback(self.gotResp, 1, 1070)
333 """Tests a 'HEAD' request."""
334 host = 'www.ietf.org'
335 self.client = Peer(host, 80)
338 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
339 d.addCallback(self.gotResp, 1, 0)
342 def test_multiple_downloads(self):
343 """Tests multiple downloads with queueing and connection closing."""
344 host = 'www.ietf.org'
345 self.client = Peer(host, 80)
347 lastDefer = defer.Deferred()
349 def newRequest(path, num, expect, last=False):
350 d = self.client.get(path)
351 d.addCallback(self.gotResp, num, expect)
353 d.addBoth(lastDefer.callback)
356 newRequest("/rfc/rfc0006.txt", 1, 1776)
357 newRequest("/rfc/rfc2362.txt", 2, 159833)
358 newRequest("/rfc/rfc0801.txt", 3, 40824)
360 # This one will probably be queued
361 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
363 # Connection should still be open, but idle
364 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
366 #Connection should be closed
367 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
368 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
369 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
370 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
372 # Now it should definitely be closed
373 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
376 def test_multiple_quick_downloads(self):
377 """Tests lots of multiple downloads with queueing."""
378 host = 'www.ietf.org'
379 self.client = Peer(host, 80)
381 lastDefer = defer.Deferred()
383 def newRequest(path, num, expect, last=False):
384 d = self.client.get(path)
385 d.addCallback(self.gotResp, num, expect)
387 d.addBoth(lastDefer.callback)
389 newRequest("/rfc/rfc0006.txt", 1, 1776)
390 newRequest("/rfc/rfc2362.txt", 2, 159833)
391 newRequest("/rfc/rfc0801.txt", 3, 40824)
392 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
393 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
394 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
395 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
396 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
397 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
398 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
402 log.msg('Rank is: %r' % self.client.rank)
403 log.msg('Download speed is: %r' % self.client.downloadSpeed())
404 log.msg('Response Time is: %r' % self.client.responseTime())
406 def test_peer_info(self):
407 """Test retrieving the peer info during a download."""
408 host = 'www.ietf.org'
409 self.client = Peer(host, 80)
411 lastDefer = defer.Deferred()
413 def newRequest(path, num, expect, last=False):
414 d = self.client.get(path)
415 d.addCallback(self.gotResp, num, expect)
417 d.addBoth(lastDefer.callback)
419 newRequest("/rfc/rfc0006.txt", 1, 1776)
420 newRequest("/rfc/rfc2362.txt", 2, 159833)
421 newRequest("/rfc/rfc0801.txt", 3, 40824)
422 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
423 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
424 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
425 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
426 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
427 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
428 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
430 for i in xrange(2, 122, 2):
431 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
435 def test_range(self):
436 """Test a Range request."""
437 host = 'www.ietf.org'
438 self.client = Peer(host, 80)
441 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
442 d.addCallback(self.gotResp, 1, 100)
446 for p in self.pending_calls:
449 self.pending_calls = []