2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
19 from apt_p2p_conf import version
21 class PipelineError(Exception):
22 """An error has occurred in pipelining requests."""
24 class FixedHTTPClientChannelRequest(HTTPClientChannelRequest):
25 """Fix the broken _error function."""
27 def __init__(self, channel, request, closeAfter):
28 HTTPClientChannelRequest.__init__(self, channel, request, closeAfter)
31 def _error(self, err):
33 Abort parsing, and depending of the status of the request, either fire
34 the C{responseDefer} if no response has been sent yet, or close the
39 if hasattr(self, 'stream') and self.stream is not None:
40 self.stream.finish(err)
42 self.responseDefer.errback(err)
44 def gotInitialLine(self, initialLine):
46 HTTPClientChannelRequest.gotInitialLine(self, initialLine)
48 class LoggingHTTPClientProtocol(HTTPClientProtocol):
49 """A modified client protocol that logs the number of bytes received."""
51 def __init__(self, factory, stats = None, mirror = False):
52 HTTPClientProtocol.__init__(self, factory)
56 def lineReceived(self, line):
58 self.stats.receivedBytes(len(line) + 2, self.mirror)
59 HTTPClientProtocol.lineReceived(self, line)
61 def rawDataReceived(self, data):
63 self.stats.receivedBytes(len(data), self.mirror)
64 HTTPClientProtocol.rawDataReceived(self, data)
66 def submitRequest(self, request, closeAfter=True):
68 @param request: The request to send to a remote server.
69 @type request: L{ClientRequest}
71 @param closeAfter: If True the 'Connection: close' header will be sent,
72 otherwise 'Connection: keep-alive'
73 @type closeAfter: C{bool}
75 @rtype: L{twisted.internet.defer.Deferred}
76 @return: A Deferred which will be called back with the
77 L{twisted.web2.http.Response} from the server.
80 # Assert we're in a valid state to submit more
81 assert self.outRequest is None
82 assert ((self.readPersistent is PERSIST_NO_PIPELINE
83 and not self.inRequests)
84 or self.readPersistent is PERSIST_PIPELINE)
86 self.manager.clientBusy(self)
88 self.readPersistent = False
90 self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self,
92 self.inRequests.append(chanRequest)
95 return chanRequest.responseDefer
97 def setReadPersistent(self, persist):
98 oldPersist = self.readPersistent
99 self.readPersistent = persist
101 # Tell all requests but first to abort.
102 lostRequests = self.inRequests[1:]
103 del self.inRequests[1:]
104 for request in lostRequests:
105 request.connectionLost(PipelineError('Pipelined connection was closed.'))
106 elif (oldPersist is PERSIST_NO_PIPELINE and
107 persist is PERSIST_PIPELINE and
108 self.outRequest is None):
109 self.manager.clientPipelining(self)
111 def connectionLost(self, reason):
112 self.readPersistent = False
113 self.setTimeout(None)
114 self.manager.clientGone(self)
115 # Cancel the current request
116 if self.inRequests and self.inRequests[0] is not None:
117 self.inRequests[0].connectionLost(reason)
118 # Tell all remaining requests to abort.
119 lostRequests = self.inRequests[1:]
120 del self.inRequests[1:]
121 for request in lostRequests:
122 if request is not None:
123 request.connectionLost(PipelineError('Pipelined connection was closed.'))
125 class Peer(ClientFactory):
126 """A manager for all HTTP requests to a single peer.
128 Controls all requests that go to a single peer (host and port).
129 This includes buffering requests until they can be sent and reconnecting
130 in the event of the connection being closed.
134 implements(IHTTPClientManager)
136 def __init__(self, host, port = 80, stats = None):
143 self.pipeline = False
145 self.connecting = False
146 self.request_queue = []
149 self.connector = None
152 self._downloadSpeeds = []
153 self._lastResponse = None
154 self._responseTimes = []
157 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
159 #{ Manage the request queue
161 """Connect to the peer."""
162 assert self.closed and not self.connecting
163 log.msg('Connecting to (%s, %d)' % (self.host, self.port))
164 self.connecting = True
165 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
166 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port, timeout = 10)
167 d.addCallbacks(self.connected, self.connectionError)
169 def connected(self, proto):
170 """Begin processing the queued requests."""
171 log.msg('Connected to (%s, %d)' % (self.host, self.port))
173 self.connecting = False
175 reactor.callLater(0, self.processQueue)
177 def connectionError(self, err):
178 """Cancel the requests."""
179 log.msg('Failed to connect to the peer by HTTP.')
182 # Remove one request so that we don't loop indefinitely
183 if self.request_queue:
184 req, deferRequest, submissionTime = self.request_queue.pop(0)
185 deferRequest.errback(err)
191 self.connecting = False
192 self.clientGone(None)
195 """Close the connection to the peer."""
197 self.proto.transport.loseConnection()
199 def submitRequest(self, request):
200 """Add a new request to the queue.
202 @type request: L{twisted.web2.client.http.ClientRequest}
203 @return: deferred that will fire with the completed request
205 submissionTime = datetime.now()
206 deferRequest = defer.Deferred()
207 self.request_queue.append((request, deferRequest, submissionTime))
209 reactor.callLater(0, self.processQueue)
212 def processQueue(self):
213 """Check the queue to see if new requests can be sent to the peer."""
214 if not self.request_queue:
221 if self.busy and not self.pipeline:
223 if self.outstanding and not self.pipeline:
225 if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
226 and not self.proto.inRequests)
227 or self.proto.readPersistent is PERSIST_PIPELINE):
228 log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
229 (self.proto.readPersistent, self.proto.inRequests))
232 req, deferRequest, submissionTime = self.request_queue.pop(0)
234 deferResponse = self.proto.submitRequest(req, False)
237 log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
239 self.request_queue.insert(0, (request, deferRequest, submissionTime))
240 ractor.callLater(1, self.processQueue)
243 self.outstanding += 1
245 deferResponse.addCallbacks(self.requestComplete, self.requestError,
246 callbackArgs = (req, deferRequest, submissionTime),
247 errbackArgs = (req, deferRequest))
249 def requestComplete(self, resp, req, deferRequest, submissionTime):
250 """Process a completed request."""
251 self._processLastResponse()
252 self.outstanding -= 1
253 assert self.outstanding >= 0
254 log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
257 self._responseTimes.append((now, now - submissionTime))
258 self._lastResponse = (now, resp.stream.length)
260 deferRequest.callback(resp)
262 def requestError(self, error, req, deferRequest):
263 """Process a request that ended with an error."""
264 self._processLastResponse()
265 self.outstanding -= 1
266 assert self.outstanding >= 0
267 log.msg('Download of %s generated error %r' % (req.uri, error))
271 deferRequest.errback(error)
273 def hashError(self, error):
274 """Log that a hash error occurred from the peer."""
275 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
279 #{ IHTTPClientManager interface
280 def clientBusy(self, proto):
281 """Save the busy state."""
284 def clientIdle(self, proto):
285 """Try to send a new request."""
286 self._processLastResponse()
288 reactor.callLater(0, self.processQueue)
291 def clientPipelining(self, proto):
292 """Try to send a new request."""
294 reactor.callLater(0, self.processQueue)
296 def clientGone(self, proto):
297 """Mark sent requests as errors."""
298 self._processLastResponse()
299 log.msg('Lost the connection to (%s, %d)' % (self.host, self.port))
301 self.pipeline = False
303 self.connecting = False
306 if self.request_queue:
307 reactor.callLater(0, self.processQueue)
309 #{ Downloading request interface
310 def setCommonHeaders(self):
311 """Get the common HTTP headers for all requests."""
312 headers = http_headers.Headers()
313 headers.setHeader('Host', self.host)
314 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
315 (version.short(), twisted_version.short(), web2_version.short()))
318 def get(self, path, method="GET", modtime=None):
319 """Add a new request to the queue.
321 @type path: C{string}
322 @param path: the path to request from the peer
323 @type method: C{string}
324 @param method: the HTTP method to use, 'GET' or 'HEAD'
325 (optional, defaults to 'GET')
326 @type modtime: C{int}
327 @param modtime: the modification time to use for an 'If-Modified-Since'
328 header, as seconds since the epoch
329 (optional, defaults to not sending that header)
331 headers = self.setCommonHeaders()
333 headers.setHeader('If-Modified-Since', modtime)
334 return self.submitRequest(ClientRequest(method, path, headers, None))
336 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
337 """Add a new request with a Range header to the queue.
339 @type path: C{string}
340 @param path: the path to request from the peer
341 @type rangeStart: C{int}
342 @param rangeStart: the byte to begin the request at
343 @type rangeEnd: C{int}
344 @param rangeEnd: the byte to end the request at (inclusive)
345 @type method: C{string}
346 @param method: the HTTP method to use, 'GET' or 'HEAD'
347 (optional, defaults to 'GET')
349 headers = self.setCommonHeaders()
350 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
351 return self.submitRequest(ClientRequest(method, path, headers, None))
355 """Check whether the peer is idle or not."""
356 return not self.busy and not self.request_queue and not self.outstanding
358 def _processLastResponse(self):
359 """Save the download time of the last request for speed calculations."""
360 if self._lastResponse is not None:
361 if self._lastResponse[1] is not None:
363 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
364 self._lastResponse = None
366 def downloadSpeed(self):
367 """Gets the latest average download speed for the peer.
369 The average is over the last 10 responses that occurred in the last hour.
374 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
375 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
376 self._downloadSpeeds.pop(0)
378 # If there are none, then you get 0
379 if not self._downloadSpeeds:
382 for download in self._downloadSpeeds:
383 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
384 total_download += download[2]
386 return total_download / total_time
388 def responseTime(self):
389 """Gets the latest average response time for the peer.
391 Response time is the time from receiving the request, to the time
392 the download begins. The average is over the last 10 responses that
393 occurred in the last hour.
397 while self._responseTimes and (len(self._responseTimes) > 10 or
398 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
399 self._responseTimes.pop(0)
401 # If there are none, give it the benefit of the doubt
402 if not self._responseTimes:
405 for response in self._responseTimes:
406 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
408 return total_response / len(self._responseTimes)
411 """Determine the ranking value for the peer.
413 The ranking value is composed of 5 numbers, each exponentially
414 decreasing from 1 to 0 based on:
415 - if a connection to the peer is open
416 - the number of pending requests
417 - the time to download a single piece
418 - the number of errors
424 rank *= exp(-(len(self.request_queue) + self.outstanding))
425 speed = self.downloadSpeed()
427 rank *= exp(-512.0*1024 / speed)
429 rank *= exp(-10.0 * self._errors / self._completed)
430 rank *= exp(-self.responseTime() / 5.0)
433 class TestClientManager(unittest.TestCase):
434 """Unit tests for the Peer."""
440 def gotResp(self, resp, num, expect):
441 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
442 if expect is not None:
443 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
444 while len(self.length) <= num:
445 self.length.append(0)
447 def addData(data, self = self, num = num):
448 self.length[num] += len(data)
449 def checkLength(resp, self = self, num = num, length = resp.stream.length):
450 self.failUnlessEqual(self.length[num], length)
452 df = stream_mod.readStream(resp.stream, addData)
453 df.addCallback(checkLength)
456 def test_download(self):
457 """Tests a normal download."""
458 host = 'www.ietf.org'
459 self.client = Peer(host, 80)
462 d = self.client.get('/rfc/rfc0013.txt')
463 d.addCallback(self.gotResp, 1, 1070)
467 """Tests a 'HEAD' request."""
468 host = 'www.ietf.org'
469 self.client = Peer(host, 80)
472 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
473 d.addCallback(self.gotResp, 1, 0)
476 def test_multiple_downloads(self):
477 """Tests multiple downloads with queueing and connection closing."""
478 host = 'www.ietf.org'
479 self.client = Peer(host, 80)
481 lastDefer = defer.Deferred()
483 def newRequest(path, num, expect, last=False):
484 d = self.client.get(path)
485 d.addCallback(self.gotResp, num, expect)
487 d.addBoth(lastDefer.callback)
490 newRequest("/rfc/rfc0006.txt", 1, 1776)
491 newRequest("/rfc/rfc2362.txt", 2, 159833)
492 newRequest("/rfc/rfc0801.txt", 3, 40824)
494 # This one will probably be queued
495 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
497 # Connection should still be open, but idle
498 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
500 #Connection should be closed
501 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
502 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
503 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
504 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
506 # Now it should definitely be closed
507 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
510 def test_multiple_quick_downloads(self):
511 """Tests lots of multiple downloads with queueing."""
512 host = 'www.ietf.org'
513 self.client = Peer(host, 80)
515 lastDefer = defer.Deferred()
517 def newRequest(path, num, expect, last=False):
518 d = self.client.get(path)
519 d.addCallback(self.gotResp, num, expect)
521 d.addBoth(lastDefer.callback)
523 newRequest("/rfc/rfc0006.txt", 1, 1776)
524 newRequest("/rfc/rfc2362.txt", 2, 159833)
525 newRequest("/rfc/rfc0801.txt", 3, 40824)
526 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
527 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
528 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
529 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
530 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
531 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
532 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
536 log.msg('Rank is: %r' % self.client.rank)
537 log.msg('Download speed is: %r' % self.client.downloadSpeed())
538 log.msg('Response Time is: %r' % self.client.responseTime())
540 def test_peer_info(self):
541 """Test retrieving the peer info during a download."""
542 host = 'www.ietf.org'
543 self.client = Peer(host, 80)
545 lastDefer = defer.Deferred()
547 def newRequest(path, num, expect, last=False):
548 d = self.client.get(path)
549 d.addCallback(self.gotResp, num, expect)
551 d.addBoth(lastDefer.callback)
553 newRequest("/rfc/rfc0006.txt", 1, 1776)
554 newRequest("/rfc/rfc2362.txt", 2, 159833)
555 newRequest("/rfc/rfc0801.txt", 3, 40824)
556 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
557 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
558 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
559 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
560 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
561 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
562 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
564 for i in xrange(2, 122, 2):
565 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
569 def test_range(self):
570 """Test a Range request."""
571 host = 'www.ietf.org'
572 self.client = Peer(host, 80)
575 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
576 d.addCallback(self.gotResp, 1, 100)
579 def test_timeout(self):
580 """Tests a connection timeout."""
581 from twisted.internet.error import TimeoutError
582 host = 'steveholt.hopto.org'
583 self.client = Peer(host, 80)
586 d = self.client.get('/rfc/rfc0013.txt')
587 d.addCallback(self.gotResp, 1, 1070)
588 d = self.failUnlessFailure(d, TimeoutError)
589 d.addCallback(lambda a: self.flushLoggedErrors(TimeoutError))
592 def test_dnserror(self):
593 """Tests a connection timeout."""
594 from twisted.internet.error import DNSLookupError
595 host = 'hureyfnvbfha.debian.net'
596 self.client = Peer(host, 80)
599 d = self.client.get('/rfc/rfc0013.txt')
600 d.addCallback(self.gotResp, 1, 1070)
601 d = self.failUnlessFailure(d, DNSLookupError)
602 d.addCallback(lambda a: self.flushLoggedErrors(DNSLookupError))
605 def test_noroute(self):
606 """Tests a connection timeout."""
607 from twisted.internet.error import NoRouteError
609 self.client = Peer(host, 80)
612 d = self.client.get('/rfc/rfc0013.txt')
613 d.addCallback(self.gotResp, 1, 1070)
614 d = self.failUnlessFailure(d, NoRouteError)
615 d.addCallback(lambda a: self.flushLoggedErrors(NoRouteError))
619 for p in self.pending_calls:
622 self.pending_calls = []