2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21 """A modified client protocol that logs the number of bytes received."""
23 def __init__(self, factory, stats = None, mirror = False):
24 HTTPClientProtocol.__init__(self, factory)
28 def lineReceived(self, line):
30 self.stats.receivedBytes(len(line) + 2, self.mirror)
31 HTTPClientProtocol.lineReceived(self, line)
33 def rawDataReceived(self, data):
35 self.stats.receivedBytes(len(data), self.mirror)
36 HTTPClientProtocol.rawDataReceived(self, data)
38 class Peer(ClientFactory):
39 """A manager for all HTTP requests to a single peer.
41 Controls all requests that go to a single peer (host and port).
42 This includes buffering requests until they can be sent and reconnecting
43 in the event of the connection being closed.
47 implements(IHTTPClientManager)
49 def __init__(self, host, port = 80, stats = None):
58 self.connecting = False
59 self.request_queue = []
60 self.response_queue = []
65 self._downloadSpeeds = []
66 self._lastResponse = None
67 self._responseTimes = []
70 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
72 #{ Manage the request queue
74 """Connect to the peer."""
75 assert self.closed and not self.connecting
76 self.connecting = True
77 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79 d.addCallbacks(self.connected, self.connectionError)
81 def connected(self, proto):
82 """Begin processing the queued requests."""
84 self.connecting = False
88 def connectionError(self, err):
89 """Cancel the requests."""
90 log.msg('Failed to connect to the peer by HTTP.')
93 # Remove one request so that we don't loop indefinitely
94 if self.request_queue:
95 req = self.request_queue.pop(0)
96 req.deferRequest.errback(err)
102 self.connecting = False
103 self.clientGone(None)
106 """Close the connection to the peer."""
108 self.proto.transport.loseConnection()
110 def submitRequest(self, request):
111 """Add a new request to the queue.
113 @type request: L{twisted.web2.client.http.ClientRequest}
114 @return: deferred that will fire with the completed request
116 request.submissionTime = datetime.now()
117 request.deferRequest = defer.Deferred()
118 self.request_queue.append(request)
121 return request.deferRequest
123 def processQueue(self):
124 """Check the queue to see if new requests can be sent to the peer."""
125 if not self.request_queue:
132 if self.busy and not self.pipeline:
134 if self.response_queue and not self.pipeline:
137 req = self.request_queue.pop(0)
138 self.response_queue.append(req)
140 req.deferResponse = self.proto.submitRequest(req, False)
141 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
143 def requestComplete(self, resp):
144 """Process a completed request."""
145 self._processLastResponse()
146 req = self.response_queue.pop(0)
147 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
150 self._responseTimes.append((now, now - req.submissionTime))
151 self._lastResponse = (now, resp.stream.length)
153 req.deferRequest.callback(resp)
155 def requestError(self, error):
156 """Process a request that ended with an error."""
157 self._processLastResponse()
158 req = self.response_queue.pop(0)
159 log.msg('Download of %s generated error %r' % (req.uri, error))
163 req.deferRequest.errback(error)
165 def hashError(self, error):
166 """Log that a hash error occurred from the peer."""
167 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
171 #{ IHTTPClientManager interface
172 def clientBusy(self, proto):
173 """Save the busy state."""
176 def clientIdle(self, proto):
177 """Try to send a new request."""
178 self._processLastResponse()
183 def clientPipelining(self, proto):
184 """Try to send a new request."""
188 def clientGone(self, proto):
189 """Mark sent requests as errors."""
190 self._processLastResponse()
191 for req in self.response_queue:
192 reactor.callLater(0, req.deferRequest.errback,
193 ProtocolError('lost connection'))
195 self.pipeline = False
197 self.connecting = False
198 self.response_queue = []
201 if self.request_queue:
204 #{ Downloading request interface
205 def setCommonHeaders(self):
206 """Get the common HTTP headers for all requests."""
207 headers = http_headers.Headers()
208 headers.setHeader('Host', self.host)
209 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
210 (version.short(), twisted_version.short(), web2_version.short()))
213 def get(self, path, method="GET", modtime=None):
214 """Add a new request to the queue.
216 @type path: C{string}
217 @param path: the path to request from the peer
218 @type method: C{string}
219 @param method: the HTTP method to use, 'GET' or 'HEAD'
220 (optional, defaults to 'GET')
221 @type modtime: C{int}
222 @param modtime: the modification time to use for an 'If-Modified-Since'
223 header, as seconds since the epoch
224 (optional, defaults to not sending that header)
226 headers = self.setCommonHeaders()
228 headers.setHeader('If-Modified-Since', modtime)
229 return self.submitRequest(ClientRequest(method, path, headers, None))
231 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
232 """Add a new request with a Range header to the queue.
234 @type path: C{string}
235 @param path: the path to request from the peer
236 @type rangeStart: C{int}
237 @param rangeStart: the byte to begin the request at
238 @type rangeEnd: C{int}
239 @param rangeEnd: the byte to end the request at (inclusive)
240 @type method: C{string}
241 @param method: the HTTP method to use, 'GET' or 'HEAD'
242 (optional, defaults to 'GET')
244 headers = self.setCommonHeaders()
245 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
246 return self.submitRequest(ClientRequest(method, path, headers, None))
250 """Check whether the peer is idle or not."""
251 return not self.busy and not self.request_queue and not self.response_queue
253 def _processLastResponse(self):
254 """Save the download time of the last request for speed calculations."""
255 if self._lastResponse is not None:
257 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
258 self._lastResponse = None
260 def downloadSpeed(self):
261 """Gets the latest average download speed for the peer.
263 The average is over the last 10 responses that occurred in the last hour.
268 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
269 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
270 self._downloadSpeeds.pop(0)
272 # If there are none, then you get 0
273 if not self._downloadSpeeds:
276 for download in self._downloadSpeeds:
277 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
278 total_download += download[2]
280 return total_download / total_time
282 def responseTime(self):
283 """Gets the latest average response time for the peer.
285 Response time is the time from receiving the request, to the time
286 the download begins. The average is over the last 10 responses that
287 occurred in the last hour.
291 while self._responseTimes and (len(self._responseTimes) > 10 or
292 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
293 self._responseTimes.pop(0)
295 # If there are none, give it the benefit of the doubt
296 if not self._responseTimes:
299 for response in self._responseTimes:
300 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
302 return total_response / len(self._responseTimes)
305 """Determine the ranking value for the peer.
307 The ranking value is composed of 5 numbers, each exponentially
308 decreasing from 1 to 0 based on:
309 - if a connection to the peer is open
310 - the number of pending requests
311 - the time to download a single piece
312 - the number of errors
318 rank *= exp(-(len(self.request_queue) + len(self.response_queue)))
319 speed = self.downloadSpeed()
321 rank *= exp(-512.0*1024 / speed)
323 rank *= exp(-10.0 * self._errors / self._completed)
324 rank *= exp(-self.responseTime() / 5.0)
327 class TestClientManager(unittest.TestCase):
328 """Unit tests for the Peer."""
333 def gotResp(self, resp, num, expect):
334 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
335 if expect is not None:
336 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
341 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
343 def test_download(self):
344 """Tests a normal download."""
345 host = 'www.ietf.org'
346 self.client = Peer(host, 80)
349 d = self.client.get('/rfc/rfc0013.txt')
350 d.addCallback(self.gotResp, 1, 1070)
354 """Tests a 'HEAD' request."""
355 host = 'www.ietf.org'
356 self.client = Peer(host, 80)
359 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
360 d.addCallback(self.gotResp, 1, 0)
363 def test_multiple_downloads(self):
364 """Tests multiple downloads with queueing and connection closing."""
365 host = 'www.ietf.org'
366 self.client = Peer(host, 80)
368 lastDefer = defer.Deferred()
370 def newRequest(path, num, expect, last=False):
371 d = self.client.get(path)
372 d.addCallback(self.gotResp, num, expect)
374 d.addBoth(lastDefer.callback)
377 newRequest("/rfc/rfc0006.txt", 1, 1776)
378 newRequest("/rfc/rfc2362.txt", 2, 159833)
379 newRequest("/rfc/rfc0801.txt", 3, 40824)
381 # This one will probably be queued
382 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
384 # Connection should still be open, but idle
385 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
387 #Connection should be closed
388 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
389 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
390 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
391 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
393 # Now it should definitely be closed
394 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
397 def test_multiple_quick_downloads(self):
398 """Tests lots of multiple downloads with queueing."""
399 host = 'www.ietf.org'
400 self.client = Peer(host, 80)
402 lastDefer = defer.Deferred()
404 def newRequest(path, num, expect, last=False):
405 d = self.client.get(path)
406 d.addCallback(self.gotResp, num, expect)
408 d.addBoth(lastDefer.callback)
410 newRequest("/rfc/rfc0006.txt", 1, 1776)
411 newRequest("/rfc/rfc2362.txt", 2, 159833)
412 newRequest("/rfc/rfc0801.txt", 3, 40824)
413 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
414 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
415 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
416 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
417 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
418 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
419 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
423 log.msg('Rank is: %r' % self.client.rank)
424 log.msg('Download speed is: %r' % self.client.downloadSpeed())
425 log.msg('Response Time is: %r' % self.client.responseTime())
427 def test_peer_info(self):
428 """Test retrieving the peer info during a download."""
429 host = 'www.ietf.org'
430 self.client = Peer(host, 80)
432 lastDefer = defer.Deferred()
434 def newRequest(path, num, expect, last=False):
435 d = self.client.get(path)
436 d.addCallback(self.gotResp, num, expect)
438 d.addBoth(lastDefer.callback)
440 newRequest("/rfc/rfc0006.txt", 1, 1776)
441 newRequest("/rfc/rfc2362.txt", 2, 159833)
442 newRequest("/rfc/rfc0801.txt", 3, 40824)
443 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
444 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
445 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
446 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
447 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
448 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
449 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
451 for i in xrange(2, 122, 2):
452 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
456 def test_range(self):
457 """Test a Range request."""
458 host = 'www.ietf.org'
459 self.client = Peer(host, 80)
462 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
463 d.addCallback(self.gotResp, 1, 100)
467 for p in self.pending_calls:
470 self.pending_calls = []