2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
19 from apt_p2p_conf import version
21 class PipelineError(Exception):
22 """An error has occurred in pipelining requests."""
24 class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
25 """Fix the broken _error function."""
27 def __init__(self, channel, request, closeAfter):
28 HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
31 def _error(self, err):
33 Abort parsing, and depending of the status of the request, either fire
34 the C{responseDefer} if no response has been sent yet, or close the
39 if hasattr(self, 'stream') and self.stream is not None:
40 self.stream.finish(err)
42 self.responseDefer.errback(err)
44 def gotInitialLine(self, initialLine):
46 HTTPClientChannelRequest.gotInitialLine(self, initialLine)
48 class LoggingHTTPClientProtocol(HTTPClientProtocol):
49 """A modified client protocol that logs the number of bytes received."""
51 def __init__(self, factory, stats = None, mirror = False):
52 HTTPClientProtocol.__init__(self, factory)
56 def lineReceived(self, line):
58 self.stats.receivedBytes(len(line) + 2, self.mirror)
59 HTTPClientProtocol.lineReceived(self, line)
61 def rawDataReceived(self, data):
63 self.stats.receivedBytes(len(data), self.mirror)
64 HTTPClientProtocol.rawDataReceived(self, data)
66 def submitRequest(self, request, closeAfter=True):
68 @param request: The request to send to a remote server.
69 @type request: L{ClientRequest}
71 @param closeAfter: If True the 'Connection: close' header will be sent,
72 otherwise 'Connection: keep-alive'
73 @type closeAfter: C{bool}
75 @rtype: L{twisted.internet.defer.Deferred}
76 @return: A Deferred which will be called back with the
77 L{twisted.web2.http.Response} from the server.
80 # Assert we're in a valid state to submit more
81 assert self.outRequest is None
82 assert ((self.readPersistent is PERSIST_NO_PIPELINE
83 and not self.inRequests)
84 or self.readPersistent is PERSIST_PIPELINE)
86 self.manager.clientBusy(self)
88 self.readPersistent = False
90 self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
92 self.inRequests.append(chanRequest)
95 return chanRequest.responseDefer
97 def setReadPersistent(self, persist):
98 self.readPersistent = persist
100 # Tell all requests but first to abort.
101 lostRequests = self.inRequests[1:]
102 del self.inRequests[1:]
103 for request in lostRequests:
104 request.connectionLost(PipelineError('The pipelined connection was lost'))
106 def connectionLost(self, reason):
107 self.readPersistent = False
108 self.setTimeout(None)
109 self.manager.clientGone(self)
110 # Cancel the current request
111 if self.inRequests and self.inRequests[0] is not None:
112 self.inRequests[0].connectionLost(reason)
113 # Tell all remaining requests to abort.
114 lostRequests = self.inRequests[1:]
115 del self.inRequests[1:]
116 for request in lostRequests:
117 if request is not None:
118 request.connectionLost(reason)
120 class Peer(ClientFactory):
121 """A manager for all HTTP requests to a single peer.
123 Controls all requests that go to a single peer (host and port).
124 This includes buffering requests until they can be sent and reconnecting
125 in the event of the connection being closed.
129 implements(IHTTPClientManager)
131 def __init__(self, host, port = 80, stats = None):
138 self.pipeline = False
140 self.connecting = False
141 self.request_queue = []
144 self.connector = None
147 self._downloadSpeeds = []
148 self._lastResponse = None
149 self._responseTimes = []
152 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
154 #{ Manage the request queue
156 """Connect to the peer."""
157 assert self.closed and not self.connecting
158 log.msg('Connecting to (%s, %d)' % (self.host, self.port))
159 self.connecting = True
160 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
161 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
162 d.addCallbacks(self.connected, self.connectionError)
164 def connected(self, proto):
165 """Begin processing the queued requests."""
166 log.msg('Connected to (%s, %d)' % (self.host, self.port))
168 self.connecting = False
170 reactor.callLater(0, self.processQueue)
172 def connectionError(self, err):
173 """Cancel the requests."""
174 log.msg('Failed to connect to the peer by HTTP.')
177 # Remove one request so that we don't loop indefinitely
178 if self.request_queue:
179 req, deferRequest, submissionTime = self.request_queue.pop(0)
180 deferRequest.errback(err)
186 self.connecting = False
187 self.clientGone(None)
190 """Close the connection to the peer."""
192 self.proto.transport.loseConnection()
194 def submitRequest(self, request):
195 """Add a new request to the queue.
197 @type request: L{twisted.web2.client.http.ClientRequest}
198 @return: deferred that will fire with the completed request
200 submissionTime = datetime.now()
201 deferRequest = defer.Deferred()
202 self.request_queue.append((request, deferRequest, submissionTime))
204 reactor.callLater(0, self.processQueue)
207 def processQueue(self):
208 """Check the queue to see if new requests can be sent to the peer."""
209 if not self.request_queue:
216 if self.busy and not self.pipeline:
218 if self.outstanding and not self.pipeline:
220 if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
221 and not self.proto.inRequests)
222 or self.proto.readPersistent is PERSIST_PIPELINE):
223 log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
224 (self.proto.readPersistent, self.proto.inRequests))
227 req, deferRequest, submissionTime = self.request_queue.pop(0)
229 deferResponse = self.proto.submitRequest(req, False)
232 log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
234 self.request_queue.insert(0, (request, deferRequest, submissionTime))
235 ractor.callLater(1, self.processQueue)
238 self.outstanding += 1
240 deferResponse.addCallbacks(self.requestComplete, self.requestError,
241 callbackArgs = (req, deferRequest, submissionTime),
242 errbackArgs = (req, deferRequest))
244 def requestComplete(self, resp, req, deferRequest, submissionTime):
245 """Process a completed request."""
246 self._processLastResponse()
247 self.outstanding -= 1
248 assert self.outstanding >= 0
249 log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
252 self._responseTimes.append((now, now - submissionTime))
253 self._lastResponse = (now, resp.stream.length)
255 deferRequest.callback(resp)
257 def requestError(self, error, req, deferRequest):
258 """Process a request that ended with an error."""
259 self._processLastResponse()
260 self.outstanding -= 1
261 assert self.outstanding >= 0
262 log.msg('Download of %s generated error %r' % (req.uri, error))
266 deferRequest.errback(error)
268 def hashError(self, error):
269 """Log that a hash error occurred from the peer."""
270 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
274 #{ IHTTPClientManager interface
275 def clientBusy(self, proto):
276 """Save the busy state."""
279 def clientIdle(self, proto):
280 """Try to send a new request."""
281 self._processLastResponse()
283 reactor.callLater(0, self.processQueue)
286 def clientPipelining(self, proto):
287 """Try to send a new request."""
289 reactor.callLater(0, self.processQueue)
291 def clientGone(self, proto):
292 """Mark sent requests as errors."""
293 self._processLastResponse()
294 log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
296 self.pipeline = False
298 self.connecting = False
301 if self.request_queue:
302 reactor.callLater(0, self.processQueue)
304 #{ Downloading request interface
305 def setCommonHeaders(self):
306 """Get the common HTTP headers for all requests."""
307 headers = http_headers.Headers()
308 headers.setHeader('Host', self.host)
309 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
310 (version.short(), twisted_version.short(), web2_version.short()))
313 def get(self, path, method="GET", modtime=None):
314 """Add a new request to the queue.
316 @type path: C{string}
317 @param path: the path to request from the peer
318 @type method: C{string}
319 @param method: the HTTP method to use, 'GET' or 'HEAD'
320 (optional, defaults to 'GET')
321 @type modtime: C{int}
322 @param modtime: the modification time to use for an 'If-Modified-Since'
323 header, as seconds since the epoch
324 (optional, defaults to not sending that header)
326 headers = self.setCommonHeaders()
328 headers.setHeader('If-Modified-Since', modtime)
329 return self.submitRequest(ClientRequest(method, path, headers, None))
331 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
332 """Add a new request with a Range header to the queue.
334 @type path: C{string}
335 @param path: the path to request from the peer
336 @type rangeStart: C{int}
337 @param rangeStart: the byte to begin the request at
338 @type rangeEnd: C{int}
339 @param rangeEnd: the byte to end the request at (inclusive)
340 @type method: C{string}
341 @param method: the HTTP method to use, 'GET' or 'HEAD'
342 (optional, defaults to 'GET')
344 headers = self.setCommonHeaders()
345 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
346 return self.submitRequest(ClientRequest(method, path, headers, None))
350 """Check whether the peer is idle or not."""
351 return not self.busy and not self.request_queue and not self.outstanding
353 def _processLastResponse(self):
354 """Save the download time of the last request for speed calculations."""
355 if self._lastResponse is not None:
357 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
358 self._lastResponse = None
360 def downloadSpeed(self):
361 """Gets the latest average download speed for the peer.
363 The average is over the last 10 responses that occurred in the last hour.
368 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
369 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
370 self._downloadSpeeds.pop(0)
372 # If there are none, then you get 0
373 if not self._downloadSpeeds:
376 for download in self._downloadSpeeds:
377 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
378 total_download += download[2]
380 return total_download / total_time
382 def responseTime(self):
383 """Gets the latest average response time for the peer.
385 Response time is the time from receiving the request, to the time
386 the download begins. The average is over the last 10 responses that
387 occurred in the last hour.
391 while self._responseTimes and (len(self._responseTimes) > 10 or
392 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
393 self._responseTimes.pop(0)
395 # If there are none, give it the benefit of the doubt
396 if not self._responseTimes:
399 for response in self._responseTimes:
400 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
402 return total_response / len(self._responseTimes)
405 """Determine the ranking value for the peer.
407 The ranking value is composed of 5 numbers, each exponentially
408 decreasing from 1 to 0 based on:
409 - if a connection to the peer is open
410 - the number of pending requests
411 - the time to download a single piece
412 - the number of errors
418 rank *= exp(-(len(self.request_queue) + self.outstanding))
419 speed = self.downloadSpeed()
421 rank *= exp(-512.0*1024 / speed)
423 rank *= exp(-10.0 * self._errors / self._completed)
424 rank *= exp(-self.responseTime() / 5.0)
427 class TestClientManager(unittest.TestCase):
428 """Unit tests for the Peer."""
434 def gotResp(self, resp, num, expect):
435 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
436 if expect is not None:
437 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
438 while len(self.length) <= num:
439 self.length.append(0)
441 def addData(data, self = self, num = num):
442 self.length[num] += len(data)
443 def checkLength(resp, self = self, num = num, length = resp.stream.length):
444 self.failUnlessEqual(self.length[num], length)
446 df = stream_mod.readStream(resp.stream, addData)
447 df.addCallback(checkLength)
450 def test_download(self):
451 """Tests a normal download."""
452 host = 'www.ietf.org'
453 self.client = Peer(host, 80)
456 d = self.client.get('/rfc/rfc0013.txt')
457 d.addCallback(self.gotResp, 1, 1070)
461 """Tests a 'HEAD' request."""
462 host = 'www.ietf.org'
463 self.client = Peer(host, 80)
466 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
467 d.addCallback(self.gotResp, 1, 0)
470 def test_multiple_downloads(self):
471 """Tests multiple downloads with queueing and connection closing."""
472 host = 'www.ietf.org'
473 self.client = Peer(host, 80)
475 lastDefer = defer.Deferred()
477 def newRequest(path, num, expect, last=False):
478 d = self.client.get(path)
479 d.addCallback(self.gotResp, num, expect)
481 d.addBoth(lastDefer.callback)
484 newRequest("/rfc/rfc0006.txt", 1, 1776)
485 newRequest("/rfc/rfc2362.txt", 2, 159833)
486 newRequest("/rfc/rfc0801.txt", 3, 40824)
488 # This one will probably be queued
489 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
491 # Connection should still be open, but idle
492 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
494 #Connection should be closed
495 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
496 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
497 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
498 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
500 # Now it should definitely be closed
501 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
504 def test_multiple_quick_downloads(self):
505 """Tests lots of multiple downloads with queueing."""
506 host = 'www.ietf.org'
507 self.client = Peer(host, 80)
509 lastDefer = defer.Deferred()
511 def newRequest(path, num, expect, last=False):
512 d = self.client.get(path)
513 d.addCallback(self.gotResp, num, expect)
515 d.addBoth(lastDefer.callback)
517 newRequest("/rfc/rfc0006.txt", 1, 1776)
518 newRequest("/rfc/rfc2362.txt", 2, 159833)
519 newRequest("/rfc/rfc0801.txt", 3, 40824)
520 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
521 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
522 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
523 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
524 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
525 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
526 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
530 log.msg('Rank is: %r' % self.client.rank)
531 log.msg('Download speed is: %r' % self.client.downloadSpeed())
532 log.msg('Response Time is: %r' % self.client.responseTime())
534 def test_peer_info(self):
535 """Test retrieving the peer info during a download."""
536 host = 'www.ietf.org'
537 self.client = Peer(host, 80)
539 lastDefer = defer.Deferred()
541 def newRequest(path, num, expect, last=False):
542 d = self.client.get(path)
543 d.addCallback(self.gotResp, num, expect)
545 d.addBoth(lastDefer.callback)
547 newRequest("/rfc/rfc0006.txt", 1, 1776)
548 newRequest("/rfc/rfc2362.txt", 2, 159833)
549 newRequest("/rfc/rfc0801.txt", 3, 40824)
550 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
551 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
552 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
553 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
554 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
555 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
556 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
558 for i in xrange(2, 122, 2):
559 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
563 def test_range(self):
564 """Test a Range request."""
565 host = 'www.ietf.org'
566 self.client = Peer(host, 80)
569 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
570 d.addCallback(self.gotResp, 1, 100)
574 for p in self.pending_calls:
577 self.pending_calls = []