2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE
14 from twisted.web2 import stream as stream_mod, http_headers
15 from twisted.web2 import version as web2_version
16 from twisted.trial import unittest
17 from zope.interface import implements
19 from apt_p2p_conf import version
21 class LoggingHTTPClientProtocol(HTTPClientProtocol):
22 """A modified client protocol that logs the number of bytes received."""
24 def __init__(self, factory, stats = None, mirror = False):
25 HTTPClientProtocol.__init__(self, factory)
29 def lineReceived(self, line):
31 self.stats.receivedBytes(len(line) + 2, self.mirror)
32 HTTPClientProtocol.lineReceived(self, line)
34 def rawDataReceived(self, data):
36 self.stats.receivedBytes(len(data), self.mirror)
37 HTTPClientProtocol.rawDataReceived(self, data)
39 class Peer(ClientFactory):
40 """A manager for all HTTP requests to a single peer.
42 Controls all requests that go to a single peer (host and port).
43 This includes buffering requests until they can be sent and reconnecting
44 in the event of the connection being closed.
48 implements(IHTTPClientManager)
50 def __init__(self, host, port = 80, stats = None):
59 self.connecting = False
60 self.request_queue = []
66 self._downloadSpeeds = []
67 self._lastResponse = None
68 self._responseTimes = []
71 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
73 #{ Manage the request queue
75 """Connect to the peer."""
76 assert self.closed and not self.connecting
77 self.connecting = True
78 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
79 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
80 d.addCallbacks(self.connected, self.connectionError)
82 def connected(self, proto):
83 """Begin processing the queued requests."""
85 self.connecting = False
87 reactor.callLater(0, self.processQueue)
89 def connectionError(self, err):
90 """Cancel the requests."""
91 log.msg('Failed to connect to the peer by HTTP.')
94 # Remove one request so that we don't loop indefinitely
95 if self.request_queue:
96 req, deferRequest, submissionTime = self.request_queue.pop(0)
97 deferRequest.errback(err)
103 self.connecting = False
104 self.clientGone(None)
107 """Close the connection to the peer."""
109 self.proto.transport.loseConnection()
111 def submitRequest(self, request):
112 """Add a new request to the queue.
114 @type request: L{twisted.web2.client.http.ClientRequest}
115 @return: deferred that will fire with the completed request
117 submissionTime = datetime.now()
118 deferRequest = defer.Deferred()
119 self.request_queue.append((request, deferRequest, submissionTime))
121 reactor.callLater(0, self.processQueue)
124 def processQueue(self):
125 """Check the queue to see if new requests can be sent to the peer."""
126 if not self.request_queue:
133 if self.busy and not self.pipeline:
135 if self.outstanding and not self.pipeline:
137 if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE
138 and not self.proto.inRequests)
139 or self.proto.readPersistent is PERSIST_PIPELINE):
140 log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' %
141 (self.proto.readPersistent, self.proto.inRequests))
144 req, deferRequest, submissionTime = self.request_queue.pop(0)
146 deferResponse = self.proto.submitRequest(req, False)
149 log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, ))
151 self.request_queue.insert(0, (request, deferRequest, submissionTime))
152 ractor.callLater(1, self.processQueue)
155 self.outstanding += 1
157 deferResponse.addCallbacks(self.requestComplete, self.requestError,
158 callbackArgs = (req, deferRequest, submissionTime),
159 errbackArgs = (req, deferRequest))
161 def requestComplete(self, resp, req, deferRequest, submissionTime):
162 """Process a completed request."""
163 self._processLastResponse()
164 self.outstanding -= 1
165 assert self.outstanding >= 0
166 log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers))
169 self._responseTimes.append((now, now - submissionTime))
170 self._lastResponse = (now, resp.stream.length)
172 deferRequest.callback(resp)
174 def requestError(self, error, req, deferRequest):
175 """Process a request that ended with an error."""
176 self._processLastResponse()
177 self.outstanding -= 1
178 assert self.outstanding >= 0
179 log.msg('Download of %s generated error %r' % (req.uri, error))
183 deferRequest.errback(error)
185 def hashError(self, error):
186 """Log that a hash error occurred from the peer."""
187 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
191 #{ IHTTPClientManager interface
192 def clientBusy(self, proto):
193 """Save the busy state."""
196 def clientIdle(self, proto):
197 """Try to send a new request."""
198 self._processLastResponse()
200 reactor.callLater(0, self.processQueue)
203 def clientPipelining(self, proto):
204 """Try to send a new request."""
206 reactor.callLater(0, self.processQueue)
208 def clientGone(self, proto):
209 """Mark sent requests as errors."""
210 self._processLastResponse()
212 self.pipeline = False
214 self.connecting = False
217 if self.request_queue:
218 reactor.callLater(0, self.processQueue)
220 #{ Downloading request interface
221 def setCommonHeaders(self):
222 """Get the common HTTP headers for all requests."""
223 headers = http_headers.Headers()
224 headers.setHeader('Host', self.host)
225 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
226 (version.short(), twisted_version.short(), web2_version.short()))
229 def get(self, path, method="GET", modtime=None):
230 """Add a new request to the queue.
232 @type path: C{string}
233 @param path: the path to request from the peer
234 @type method: C{string}
235 @param method: the HTTP method to use, 'GET' or 'HEAD'
236 (optional, defaults to 'GET')
237 @type modtime: C{int}
238 @param modtime: the modification time to use for an 'If-Modified-Since'
239 header, as seconds since the epoch
240 (optional, defaults to not sending that header)
242 headers = self.setCommonHeaders()
244 headers.setHeader('If-Modified-Since', modtime)
245 return self.submitRequest(ClientRequest(method, path, headers, None))
247 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
248 """Add a new request with a Range header to the queue.
250 @type path: C{string}
251 @param path: the path to request from the peer
252 @type rangeStart: C{int}
253 @param rangeStart: the byte to begin the request at
254 @type rangeEnd: C{int}
255 @param rangeEnd: the byte to end the request at (inclusive)
256 @type method: C{string}
257 @param method: the HTTP method to use, 'GET' or 'HEAD'
258 (optional, defaults to 'GET')
260 headers = self.setCommonHeaders()
261 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
262 return self.submitRequest(ClientRequest(method, path, headers, None))
266 """Check whether the peer is idle or not."""
267 return not self.busy and not self.request_queue and not self.outstanding
269 def _processLastResponse(self):
270 """Save the download time of the last request for speed calculations."""
271 if self._lastResponse is not None:
273 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
274 self._lastResponse = None
276 def downloadSpeed(self):
277 """Gets the latest average download speed for the peer.
279 The average is over the last 10 responses that occurred in the last hour.
284 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
285 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
286 self._downloadSpeeds.pop(0)
288 # If there are none, then you get 0
289 if not self._downloadSpeeds:
292 for download in self._downloadSpeeds:
293 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
294 total_download += download[2]
296 return total_download / total_time
298 def responseTime(self):
299 """Gets the latest average response time for the peer.
301 Response time is the time from receiving the request, to the time
302 the download begins. The average is over the last 10 responses that
303 occurred in the last hour.
307 while self._responseTimes and (len(self._responseTimes) > 10 or
308 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
309 self._responseTimes.pop(0)
311 # If there are none, give it the benefit of the doubt
312 if not self._responseTimes:
315 for response in self._responseTimes:
316 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
318 return total_response / len(self._responseTimes)
321 """Determine the ranking value for the peer.
323 The ranking value is composed of 5 numbers, each exponentially
324 decreasing from 1 to 0 based on:
325 - if a connection to the peer is open
326 - the number of pending requests
327 - the time to download a single piece
328 - the number of errors
334 rank *= exp(-(len(self.request_queue) + self.outstanding))
335 speed = self.downloadSpeed()
337 rank *= exp(-512.0*1024 / speed)
339 rank *= exp(-10.0 * self._errors / self._completed)
340 rank *= exp(-self.responseTime() / 5.0)
343 class TestClientManager(unittest.TestCase):
344 """Unit tests for the Peer."""
350 def gotResp(self, resp, num, expect):
351 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
352 if expect is not None:
353 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
354 while len(self.length) <= num:
355 self.length.append(0)
357 def addData(data, self = self, num = num):
358 self.length[num] += len(data)
359 def checkLength(resp, self = self, num = num, length = resp.stream.length):
360 self.failUnlessEqual(self.length[num], length)
362 df = stream_mod.readStream(resp.stream, addData)
363 df.addCallback(checkLength)
366 def test_download(self):
367 """Tests a normal download."""
368 host = 'www.ietf.org'
369 self.client = Peer(host, 80)
372 d = self.client.get('/rfc/rfc0013.txt')
373 d.addCallback(self.gotResp, 1, 1070)
377 """Tests a 'HEAD' request."""
378 host = 'www.ietf.org'
379 self.client = Peer(host, 80)
382 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
383 d.addCallback(self.gotResp, 1, 0)
386 def test_multiple_downloads(self):
387 """Tests multiple downloads with queueing and connection closing."""
388 host = 'www.ietf.org'
389 self.client = Peer(host, 80)
391 lastDefer = defer.Deferred()
393 def newRequest(path, num, expect, last=False):
394 d = self.client.get(path)
395 d.addCallback(self.gotResp, num, expect)
397 d.addBoth(lastDefer.callback)
400 newRequest("/rfc/rfc0006.txt", 1, 1776)
401 newRequest("/rfc/rfc2362.txt", 2, 159833)
402 newRequest("/rfc/rfc0801.txt", 3, 40824)
404 # This one will probably be queued
405 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
407 # Connection should still be open, but idle
408 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
410 #Connection should be closed
411 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
412 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
413 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
414 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
416 # Now it should definitely be closed
417 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
420 def test_multiple_quick_downloads(self):
421 """Tests lots of multiple downloads with queueing."""
422 host = 'www.ietf.org'
423 self.client = Peer(host, 80)
425 lastDefer = defer.Deferred()
427 def newRequest(path, num, expect, last=False):
428 d = self.client.get(path)
429 d.addCallback(self.gotResp, num, expect)
431 d.addBoth(lastDefer.callback)
433 newRequest("/rfc/rfc0006.txt", 1, 1776)
434 newRequest("/rfc/rfc2362.txt", 2, 159833)
435 newRequest("/rfc/rfc0801.txt", 3, 40824)
436 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
437 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
438 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
439 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
440 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
441 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
442 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
446 log.msg('Rank is: %r' % self.client.rank)
447 log.msg('Download speed is: %r' % self.client.downloadSpeed())
448 log.msg('Response Time is: %r' % self.client.responseTime())
450 def test_peer_info(self):
451 """Test retrieving the peer info during a download."""
452 host = 'www.ietf.org'
453 self.client = Peer(host, 80)
455 lastDefer = defer.Deferred()
457 def newRequest(path, num, expect, last=False):
458 d = self.client.get(path)
459 d.addCallback(self.gotResp, num, expect)
461 d.addBoth(lastDefer.callback)
463 newRequest("/rfc/rfc0006.txt", 1, 1776)
464 newRequest("/rfc/rfc2362.txt", 2, 159833)
465 newRequest("/rfc/rfc0801.txt", 3, 40824)
466 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
467 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
468 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
469 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
470 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
471 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
472 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
474 for i in xrange(2, 122, 2):
475 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
479 def test_range(self):
480 """Test a Range request."""
481 host = 'www.ietf.org'
482 self.client = Peer(host, 80)
485 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
486 d.addCallback(self.gotResp, 1, 100)
490 for p in self.pending_calls:
493 self.pending_calls = []