2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
19 from apt_p2p_conf import version
21 class PipelineError(Exception):
22 """An error has occurred in pipelining requests."""
24 class LoggingHTTPClientProtocol(HTTPClientProtocol):
25 """A modified client protocol that logs the number of bytes received."""
27 def __init__(self, factory, stats = None, mirror = False):
28 HTTPClientProtocol.__init__(self, factory)
32 def lineReceived(self, line):
34 self.stats.receivedBytes(len(line) + 2, self.mirror)
35 HTTPClientProtocol.lineReceived(self, line)
37 def rawDataReceived(self, data):
39 self.stats.receivedBytes(len(data), self.mirror)
40 HTTPClientProtocol.rawDataReceived(self, data)
42 def setReadPersistent(self, persist):
43 self.readPersistent = persist
45 # Tell all requests but first to abort.
46 lostRequests = self.inRequests[1:]
47 del self.inRequests[1:]
48 for request in lostRequests:
49 request.connectionLost(PipelineError('The pipelined connection was lost'))
51 class Peer(ClientFactory):
52 """A manager for all HTTP requests to a single peer.
54 Controls all requests that go to a single peer (host and port).
55 This includes buffering requests until they can be sent and reconnecting
56 in the event of the connection being closed.
60 implements(IHTTPClientManager)
62 def __init__(self, host, port = 80, stats = None):
71 self.connecting = False
72 self.request_queue = []
78 self._downloadSpeeds = []
79 self._lastResponse = None
80 self._responseTimes = []
83 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
85 #{ Manage the request queue
87 """Connect to the peer."""
88 assert self.closed and not self.connecting
89 self.connecting = True
90 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
91 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
92 d.addCallbacks(self.connected, self.connectionError)
94 def connected(self, proto):
95 """Begin processing the queued requests."""
97 self.connecting = False
99 reactor.callLater(0, self.processQueue)
101 def connectionError(self, err):
102 """Cancel the requests."""
103 log.msg('Failed to connect to the peer by HTTP.')
106 # Remove one request so that we don't loop indefinitely
107 if self.request_queue:
108 req, deferRequest, submissionTime = self.request_queue.pop(0)
109 deferRequest.errback(err)
115 self.connecting = False
116 self.clientGone(None)
119 """Close the connection to the peer."""
121 self.proto.transport.loseConnection()
123 def submitRequest(self, request):
124 """Add a new request to the queue.
126 @type request: L{twisted.web2.client.http.ClientRequest}
127 @return: deferred that will fire with the completed request
129 submissionTime = datetime.now()
130 deferRequest = defer.Deferred()
131 self.request_queue.append((request, deferRequest, submissionTime))
133 reactor.callLater(0, self.processQueue)
136 def processQueue(self):
137 """Check the queue to see if new requests can be sent to the peer."""
138 if not self.request_queue:
145 if self.busy and not self.pipeline:
147 if self.outstanding and not self.pipeline:
149 if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
150 and not self.proto.inRequests)
151 or self.proto.readPersistent is PERSIST_PIPELINE):
152 log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
153 (self.proto.readPersistent, self.proto.inRequests))
156 req, deferRequest, submissionTime = self.request_queue.pop(0)
158 deferResponse = self.proto.submitRequest(req, False)
161 log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
163 self.request_queue.insert(0, (request, deferRequest, submissionTime))
164 ractor.callLater(1, self.processQueue)
167 self.outstanding += 1
169 deferResponse.addCallbacks(self.requestComplete, self.requestError,
170 callbackArgs = (req, deferRequest, submissionTime),
171 errbackArgs = (req, deferRequest))
173 def requestComplete(self, resp, req, deferRequest, submissionTime):
174 """Process a completed request."""
175 self._processLastResponse()
176 self.outstanding -= 1
177 assert self.outstanding >= 0
178 log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
181 self._responseTimes.append((now, now - submissionTime))
182 self._lastResponse = (now, resp.stream.length)
184 deferRequest.callback(resp)
186 def requestError(self, error, req, deferRequest):
187 """Process a request that ended with an error."""
188 self._processLastResponse()
189 self.outstanding -= 1
190 assert self.outstanding >= 0
191 log.msg('Download of %s generated error %r' % (req.uri, error))
195 deferRequest.errback(error)
197 def hashError(self, error):
198 """Log that a hash error occurred from the peer."""
199 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
203 #{ IHTTPClientManager interface
204 def clientBusy(self, proto):
205 """Save the busy state."""
208 def clientIdle(self, proto):
209 """Try to send a new request."""
210 self._processLastResponse()
212 reactor.callLater(0, self.processQueue)
215 def clientPipelining(self, proto):
216 """Try to send a new request."""
218 reactor.callLater(0, self.processQueue)
220 def clientGone(self, proto):
221 """Mark sent requests as errors."""
222 self._processLastResponse()
224 self.pipeline = False
226 self.connecting = False
229 if self.request_queue:
230 reactor.callLater(0, self.processQueue)
232 #{ Downloading request interface
233 def setCommonHeaders(self):
234 """Get the common HTTP headers for all requests."""
235 headers = http_headers.Headers()
236 headers.setHeader('Host', self.host)
237 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
238 (version.short(), twisted_version.short(), web2_version.short()))
241 def get(self, path, method="GET", modtime=None):
242 """Add a new request to the queue.
244 @type path: C{string}
245 @param path: the path to request from the peer
246 @type method: C{string}
247 @param method: the HTTP method to use, 'GET' or 'HEAD'
248 (optional, defaults to 'GET')
249 @type modtime: C{int}
250 @param modtime: the modification time to use for an 'If-Modified-Since'
251 header, as seconds since the epoch
252 (optional, defaults to not sending that header)
254 headers = self.setCommonHeaders()
256 headers.setHeader('If-Modified-Since', modtime)
257 return self.submitRequest(ClientRequest(method, path, headers, None))
259 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
260 """Add a new request with a Range header to the queue.
262 @type path: C{string}
263 @param path: the path to request from the peer
264 @type rangeStart: C{int}
265 @param rangeStart: the byte to begin the request at
266 @type rangeEnd: C{int}
267 @param rangeEnd: the byte to end the request at (inclusive)
268 @type method: C{string}
269 @param method: the HTTP method to use, 'GET' or 'HEAD'
270 (optional, defaults to 'GET')
272 headers = self.setCommonHeaders()
273 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
274 return self.submitRequest(ClientRequest(method, path, headers, None))
278 """Check whether the peer is idle or not."""
279 return not self.busy and not self.request_queue and not self.outstanding
281 def _processLastResponse(self):
282 """Save the download time of the last request for speed calculations."""
283 if self._lastResponse is not None:
285 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
286 self._lastResponse = None
288 def downloadSpeed(self):
289 """Gets the latest average download speed for the peer.
291 The average is over the last 10 responses that occurred in the last hour.
296 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
297 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
298 self._downloadSpeeds.pop(0)
300 # If there are none, then you get 0
301 if not self._downloadSpeeds:
304 for download in self._downloadSpeeds:
305 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
306 total_download += download[2]
308 return total_download / total_time
310 def responseTime(self):
311 """Gets the latest average response time for the peer.
313 Response time is the time from receiving the request, to the time
314 the download begins. The average is over the last 10 responses that
315 occurred in the last hour.
319 while self._responseTimes and (len(self._responseTimes) > 10 or
320 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
321 self._responseTimes.pop(0)
323 # If there are none, give it the benefit of the doubt
324 if not self._responseTimes:
327 for response in self._responseTimes:
328 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
330 return total_response / len(self._responseTimes)
333 """Determine the ranking value for the peer.
335 The ranking value is composed of 5 numbers, each exponentially
336 decreasing from 1 to 0 based on:
337 - if a connection to the peer is open
338 - the number of pending requests
339 - the time to download a single piece
340 - the number of errors
346 rank *= exp(-(len(self.request_queue) + self.outstanding))
347 speed = self.downloadSpeed()
349 rank *= exp(-512.0*1024 / speed)
351 rank *= exp(-10.0 * self._errors / self._completed)
352 rank *= exp(-self.responseTime() / 5.0)
355 class TestClientManager(unittest.TestCase):
356 """Unit tests for the Peer."""
362 def gotResp(self, resp, num, expect):
363 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
364 if expect is not None:
365 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
366 while len(self.length) <= num:
367 self.length.append(0)
369 def addData(data, self = self, num = num):
370 self.length[num] += len(data)
371 def checkLength(resp, self = self, num = num, length = resp.stream.length):
372 self.failUnlessEqual(self.length[num], length)
374 df = stream_mod.readStream(resp.stream, addData)
375 df.addCallback(checkLength)
378 def test_download(self):
379 """Tests a normal download."""
380 host = 'www.ietf.org'
381 self.client = Peer(host, 80)
384 d = self.client.get('/rfc/rfc0013.txt')
385 d.addCallback(self.gotResp, 1, 1070)
389 """Tests a 'HEAD' request."""
390 host = 'www.ietf.org'
391 self.client = Peer(host, 80)
394 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
395 d.addCallback(self.gotResp, 1, 0)
398 def test_multiple_downloads(self):
399 """Tests multiple downloads with queueing and connection closing."""
400 host = 'www.ietf.org'
401 self.client = Peer(host, 80)
403 lastDefer = defer.Deferred()
405 def newRequest(path, num, expect, last=False):
406 d = self.client.get(path)
407 d.addCallback(self.gotResp, num, expect)
409 d.addBoth(lastDefer.callback)
412 newRequest("/rfc/rfc0006.txt", 1, 1776)
413 newRequest("/rfc/rfc2362.txt", 2, 159833)
414 newRequest("/rfc/rfc0801.txt", 3, 40824)
416 # This one will probably be queued
417 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
419 # Connection should still be open, but idle
420 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
422 #Connection should be closed
423 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
424 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
425 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
426 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
428 # Now it should definitely be closed
429 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
432 def test_multiple_quick_downloads(self):
433 """Tests lots of multiple downloads with queueing."""
434 host = 'www.ietf.org'
435 self.client = Peer(host, 80)
437 lastDefer = defer.Deferred()
439 def newRequest(path, num, expect, last=False):
440 d = self.client.get(path)
441 d.addCallback(self.gotResp, num, expect)
443 d.addBoth(lastDefer.callback)
445 newRequest("/rfc/rfc0006.txt", 1, 1776)
446 newRequest("/rfc/rfc2362.txt", 2, 159833)
447 newRequest("/rfc/rfc0801.txt", 3, 40824)
448 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
449 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
450 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
451 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
452 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
453 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
454 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
458 log.msg('Rank is: %r' % self.client.rank)
459 log.msg('Download speed is: %r' % self.client.downloadSpeed())
460 log.msg('Response Time is: %r' % self.client.responseTime())
462 def test_peer_info(self):
463 """Test retrieving the peer info during a download."""
464 host = 'www.ietf.org'
465 self.client = Peer(host, 80)
467 lastDefer = defer.Deferred()
469 def newRequest(path, num, expect, last=False):
470 d = self.client.get(path)
471 d.addCallback(self.gotResp, num, expect)
473 d.addBoth(lastDefer.callback)
475 newRequest("/rfc/rfc0006.txt", 1, 1776)
476 newRequest("/rfc/rfc2362.txt", 2, 159833)
477 newRequest("/rfc/rfc0801.txt", 3, 40824)
478 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
479 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
480 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
481 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
482 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
483 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
484 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
486 for i in xrange(2, 122, 2):
487 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
491 def test_range(self):
492 """Test a Range request."""
493 host = 'www.ietf.org'
494 self.client = Peer(host, 80)
497 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
498 d.addCallback(self.gotResp, 1, 100)
502 for p in self.pending_calls:
505 self.pending_calls = []