2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21 """A modified client protocol that logs the number of bytes received."""
23 def __init__(self, factory, stats = None, mirror = False):
24 HTTPClientProtocol.__init__(self, factory)
28 def lineReceived(self, line):
30 self.stats.receivedBytes(len(line) + 2, self.mirror)
31 HTTPClientProtocol.lineReceived(self, line)
33 def rawDataReceived(self, data):
35 self.stats.receivedBytes(len(data), self.mirror)
36 HTTPClientProtocol.rawDataReceived(self, data)
38 class Peer(ClientFactory):
39 """A manager for all HTTP requests to a single peer.
41 Controls all requests that go to a single peer (host and port).
42 This includes buffering requests until they can be sent and reconnecting
43 in the event of the connection being closed.
47 implements(IHTTPClientManager)
49 def __init__(self, host, port = 80, stats = None):
58 self.connecting = False
59 self.request_queue = []
65 self._downloadSpeeds = []
66 self._lastResponse = None
67 self._responseTimes = []
70 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
72 #{ Manage the request queue
74 """Connect to the peer."""
75 assert self.closed and not self.connecting
76 self.connecting = True
77 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79 d.addCallbacks(self.connected, self.connectionError)
81 def connected(self, proto):
82 """Begin processing the queued requests."""
84 self.connecting = False
86 reactor.callLater(0, self.processQueue)
88 def connectionError(self, err):
89 """Cancel the requests."""
90 log.msg('Failed to connect to the peer by HTTP.')
93 # Remove one request so that we don't loop indefinitely
94 if self.request_queue:
95 req, deferRequest, submissionTime = self.request_queue.pop(0)
96 deferRequest.errback(err)
102 self.connecting = False
103 self.clientGone(None)
106 """Close the connection to the peer."""
108 self.proto.transport.loseConnection()
110 def submitRequest(self, request):
111 """Add a new request to the queue.
113 @type request: L{twisted.web2.client.http.ClientRequest}
114 @return: deferred that will fire with the completed request
116 submissionTime = datetime.now()
117 deferRequest = defer.Deferred()
118 self.request_queue.append((request, deferRequest, submissionTime))
120 reactor.callLater(0, self.processQueue)
123 def processQueue(self):
124 """Check the queue to see if new requests can be sent to the peer."""
125 if not self.request_queue:
132 if self.busy and not self.pipeline:
134 if self.outstanding and not self.pipeline:
137 req, deferRequest, submissionTime = self.request_queue.pop(0)
138 self.outstanding += 1
140 deferResponse = self.proto.submitRequest(req, False)
141 deferResponse.addCallbacks(self.requestComplete, self.requestError,
142 callbackArgs = (req, deferRequest, submissionTime),
143 errbackArgs = (req, deferRequest))
145 def requestComplete(self, resp, req, deferRequest, submissionTime):
146 """Process a completed request."""
147 self._processLastResponse()
148 self.outstanding -= 1
149 assert self.outstanding >= 0
150 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
153 self._responseTimes.append((now, now - submissionTime))
154 self._lastResponse = (now, resp.stream.length)
156 deferRequest.callback(resp)
158 def requestError(self, error, req, deferRequest):
159 """Process a request that ended with an error."""
160 self._processLastResponse()
161 self.outstanding -= 1
162 assert self.outstanding >= 0
163 log.msg('Download of %s generated error %r' % (req.uri, error))
167 deferRequest.errback(error)
169 def hashError(self, error):
170 """Log that a hash error occurred from the peer."""
171 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
175 #{ IHTTPClientManager interface
176 def clientBusy(self, proto):
177 """Save the busy state."""
180 def clientIdle(self, proto):
181 """Try to send a new request."""
182 self._processLastResponse()
184 reactor.callLater(0, self.processQueue)
187 def clientPipelining(self, proto):
188 """Try to send a new request."""
190 reactor.callLater(0, self.processQueue)
192 def clientGone(self, proto):
193 """Mark sent requests as errors."""
194 self._processLastResponse()
196 self.pipeline = False
198 self.connecting = False
201 if self.request_queue:
202 reactor.callLater(0, self.processQueue)
204 #{ Downloading request interface
205 def setCommonHeaders(self):
206 """Get the common HTTP headers for all requests."""
207 headers = http_headers.Headers()
208 headers.setHeader('Host', self.host)
209 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
210 (version.short(), twisted_version.short(), web2_version.short()))
213 def get(self, path, method="GET", modtime=None):
214 """Add a new request to the queue.
216 @type path: C{string}
217 @param path: the path to request from the peer
218 @type method: C{string}
219 @param method: the HTTP method to use, 'GET' or 'HEAD'
220 (optional, defaults to 'GET')
221 @type modtime: C{int}
222 @param modtime: the modification time to use for an 'If-Modified-Since'
223 header, as seconds since the epoch
224 (optional, defaults to not sending that header)
226 headers = self.setCommonHeaders()
228 headers.setHeader('If-Modified-Since', modtime)
229 return self.submitRequest(ClientRequest(method, path, headers, None))
231 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
232 """Add a new request with a Range header to the queue.
234 @type path: C{string}
235 @param path: the path to request from the peer
236 @type rangeStart: C{int}
237 @param rangeStart: the byte to begin the request at
238 @type rangeEnd: C{int}
239 @param rangeEnd: the byte to end the request at (inclusive)
240 @type method: C{string}
241 @param method: the HTTP method to use, 'GET' or 'HEAD'
242 (optional, defaults to 'GET')
244 headers = self.setCommonHeaders()
245 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
246 return self.submitRequest(ClientRequest(method, path, headers, None))
250 """Check whether the peer is idle or not."""
251 return not self.busy and not self.request_queue and not self.outstanding
253 def _processLastResponse(self):
254 """Save the download time of the last request for speed calculations."""
255 if self._lastResponse is not None:
257 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
258 self._lastResponse = None
260 def downloadSpeed(self):
261 """Gets the latest average download speed for the peer.
263 The average is over the last 10 responses that occurred in the last hour.
268 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
269 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
270 self._downloadSpeeds.pop(0)
272 # If there are none, then you get 0
273 if not self._downloadSpeeds:
276 for download in self._downloadSpeeds:
277 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
278 total_download += download[2]
280 return total_download / total_time
282 def responseTime(self):
283 """Gets the latest average response time for the peer.
285 Response time is the time from receiving the request, to the time
286 the download begins. The average is over the last 10 responses that
287 occurred in the last hour.
291 while self._responseTimes and (len(self._responseTimes) > 10 or
292 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
293 self._responseTimes.pop(0)
295 # If there are none, give it the benefit of the doubt
296 if not self._responseTimes:
299 for response in self._responseTimes:
300 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
302 return total_response / len(self._responseTimes)
305 """Determine the ranking value for the peer.
307 The ranking value is composed of 5 numbers, each exponentially
308 decreasing from 1 to 0 based on:
309 - if a connection to the peer is open
310 - the number of pending requests
311 - the time to download a single piece
312 - the number of errors
318 rank *= exp(-(len(self.request_queue) + self.outstanding))
319 speed = self.downloadSpeed()
321 rank *= exp(-512.0*1024 / speed)
323 rank *= exp(-10.0 * self._errors / self._completed)
324 rank *= exp(-self.responseTime() / 5.0)
327 class TestClientManager(unittest.TestCase):
328 """Unit tests for the Peer."""
334 def gotResp(self, resp, num, expect):
335 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
336 if expect is not None:
337 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
338 while len(self.length) <= num:
339 self.length.append(0)
341 def addData(data, self = self, num = num):
342 self.length[num] += len(data)
343 def checkLength(resp, self = self, num = num, length = resp.stream.length):
344 self.failUnlessEqual(self.length[num], length)
346 df = stream_mod.readStream(resp.stream, addData)
347 df.addCallback(checkLength)
350 def test_download(self):
351 """Tests a normal download."""
352 host = 'www.ietf.org'
353 self.client = Peer(host, 80)
356 d = self.client.get('/rfc/rfc0013.txt')
357 d.addCallback(self.gotResp, 1, 1070)
361 """Tests a 'HEAD' request."""
362 host = 'www.ietf.org'
363 self.client = Peer(host, 80)
366 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
367 d.addCallback(self.gotResp, 1, 0)
370 def test_multiple_downloads(self):
371 """Tests multiple downloads with queueing and connection closing."""
372 host = 'www.ietf.org'
373 self.client = Peer(host, 80)
375 lastDefer = defer.Deferred()
377 def newRequest(path, num, expect, last=False):
378 d = self.client.get(path)
379 d.addCallback(self.gotResp, num, expect)
381 d.addBoth(lastDefer.callback)
384 newRequest("/rfc/rfc0006.txt", 1, 1776)
385 newRequest("/rfc/rfc2362.txt", 2, 159833)
386 newRequest("/rfc/rfc0801.txt", 3, 40824)
388 # This one will probably be queued
389 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
391 # Connection should still be open, but idle
392 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
394 #Connection should be closed
395 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
396 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
397 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
398 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
400 # Now it should definitely be closed
401 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
404 def test_multiple_quick_downloads(self):
405 """Tests lots of multiple downloads with queueing."""
406 host = 'www.ietf.org'
407 self.client = Peer(host, 80)
409 lastDefer = defer.Deferred()
411 def newRequest(path, num, expect, last=False):
412 d = self.client.get(path)
413 d.addCallback(self.gotResp, num, expect)
415 d.addBoth(lastDefer.callback)
417 newRequest("/rfc/rfc0006.txt", 1, 1776)
418 newRequest("/rfc/rfc2362.txt", 2, 159833)
419 newRequest("/rfc/rfc0801.txt", 3, 40824)
420 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
421 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
422 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
423 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
424 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
425 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
426 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
430 log.msg('Rank is: %r' % self.client.rank)
431 log.msg('Download speed is: %r' % self.client.downloadSpeed())
432 log.msg('Response Time is: %r' % self.client.responseTime())
434 def test_peer_info(self):
435 """Test retrieving the peer info during a download."""
436 host = 'www.ietf.org'
437 self.client = Peer(host, 80)
439 lastDefer = defer.Deferred()
441 def newRequest(path, num, expect, last=False):
442 d = self.client.get(path)
443 d.addCallback(self.gotResp, num, expect)
445 d.addBoth(lastDefer.callback)
447 newRequest("/rfc/rfc0006.txt", 1, 1776)
448 newRequest("/rfc/rfc2362.txt", 2, 159833)
449 newRequest("/rfc/rfc0801.txt", 3, 40824)
450 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
451 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
452 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
453 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
454 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
455 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
456 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
458 for i in xrange(2, 122, 2):
459 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
463 def test_range(self):
464 """Test a Range request."""
465 host = 'www.ietf.org'
466 self.client = Peer(host, 80)
469 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
470 d.addCallback(self.gotResp, 1, 100)
474 for p in self.pending_calls:
477 self.pending_calls = []