2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21 """A modified client protocol that logs the number of bytes received."""
23 def __init__(self, factory, stats = None, mirror = False):
24 HTTPClientProtocol.__init__(self, factory)
28 def lineReceived(self, line):
30 self.stats.receivedBytes(len(line) + 2, self.mirror)
31 HTTPClientProtocol.lineReceived(self, line)
33 def rawDataReceived(self, data):
35 self.stats.receivedBytes(len(data), self.mirror)
36 HTTPClientProtocol.rawDataReceived(self, data)
38 class Peer(ClientFactory):
39 """A manager for all HTTP requests to a single peer.
41 Controls all requests that go to a single peer (host and port).
42 This includes buffering requests until they can be sent and reconnecting
43 in the event of the connection being closed.
47 implements(IHTTPClientManager)
49 def __init__(self, host, port = 80, stats = None):
58 self.connecting = False
59 self.request_queue = []
60 self.response_queue = []
65 self._downloadSpeeds = []
66 self._lastResponse = None
67 self._responseTimes = []
70 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
72 #{ Manage the request queue
74 """Connect to the peer."""
75 assert self.closed and not self.connecting
76 self.connecting = True
77 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79 d.addCallbacks(self.connected, self.connectionError)
81 def connected(self, proto):
82 """Begin processing the queued requests."""
84 self.connecting = False
88 def connectionError(self, err):
89 """Cancel the requests."""
90 log.msg('Failed to connect to the peer by HTTP.')
93 # Remove one request so that we don't loop indefinitely
94 if self.request_queue:
95 req = self.request_queue.pop(0)
96 req.deferRequest.errback(err)
102 self.connecting = False
103 self.clientGone(None)
106 """Close the connection to the peer."""
108 self.proto.transport.loseConnection()
110 def submitRequest(self, request):
111 """Add a new request to the queue.
113 @type request: L{twisted.web2.client.http.ClientRequest}
114 @return: deferred that will fire with the completed request
116 request.submissionTime = datetime.now()
117 request.deferRequest = defer.Deferred()
118 self.request_queue.append(request)
121 return request.deferRequest
123 def processQueue(self):
124 """Check the queue to see if new requests can be sent to the peer."""
125 if not self.request_queue:
132 if self.busy and not self.pipeline:
134 if self.response_queue and not self.pipeline:
137 req = self.request_queue.pop(0)
138 self.response_queue.append(req)
140 req.deferResponse = self.proto.submitRequest(req, False)
141 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
143 def requestComplete(self, resp):
144 """Process a completed request."""
145 self._processLastResponse()
146 req = self.response_queue.pop(0)
147 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
150 self._responseTimes.append((now, now - req.submissionTime))
151 self._lastResponse = (now, resp.stream.length)
153 req.deferRequest.callback(resp)
155 def requestError(self, error):
156 """Process a request that ended with an error."""
157 self._processLastResponse()
158 req = self.response_queue.pop(0)
159 log.msg('Download of %s generated error %r' % (req.uri, error))
163 req.deferRequest.errback(error)
165 def hashError(self, error):
166 """Log that a hash error occurred from the peer."""
167 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
171 #{ IHTTPClientManager interface
172 def clientBusy(self, proto):
173 """Save the busy state."""
176 def clientIdle(self, proto):
177 """Try to send a new request."""
178 self._processLastResponse()
183 def clientPipelining(self, proto):
184 """Try to send a new request."""
188 def clientGone(self, proto):
189 """Mark sent requests as errors."""
190 self._processLastResponse()
191 for req in self.response_queue:
192 req.deferRequest.errback(ProtocolError('lost connection'))
194 self.pipeline = False
196 self.connecting = False
197 self.response_queue = []
200 if self.request_queue:
203 #{ Downloading request interface
204 def setCommonHeaders(self):
205 """Get the common HTTP headers for all requests."""
206 headers = http_headers.Headers()
207 headers.setHeader('Host', self.host)
208 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
209 (version.short(), twisted_version.short(), web2_version.short()))
212 def get(self, path, method="GET", modtime=None):
213 """Add a new request to the queue.
215 @type path: C{string}
216 @param path: the path to request from the peer
217 @type method: C{string}
218 @param method: the HTTP method to use, 'GET' or 'HEAD'
219 (optional, defaults to 'GET')
220 @type modtime: C{int}
221 @param modtime: the modification time to use for an 'If-Modified-Since'
222 header, as seconds since the epoch
223 (optional, defaults to not sending that header)
225 headers = self.setCommonHeaders()
227 headers.setHeader('If-Modified-Since', modtime)
228 return self.submitRequest(ClientRequest(method, path, headers, None))
230 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
231 """Add a new request with a Range header to the queue.
233 @type path: C{string}
234 @param path: the path to request from the peer
235 @type rangeStart: C{int}
236 @param rangeStart: the byte to begin the request at
237 @type rangeEnd: C{int}
238 @param rangeEnd: the byte to end the request at (inclusive)
239 @type method: C{string}
240 @param method: the HTTP method to use, 'GET' or 'HEAD'
241 (optional, defaults to 'GET')
243 headers = self.setCommonHeaders()
244 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
245 return self.submitRequest(ClientRequest(method, path, headers, None))
249 """Check whether the peer is idle or not."""
250 return not self.busy and not self.request_queue and not self.response_queue
252 def _processLastResponse(self):
253 """Save the download time of the last request for speed calculations."""
254 if self._lastResponse is not None:
256 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
257 self._lastResponse = None
259 def downloadSpeed(self):
260 """Gets the latest average download speed for the peer.
262 The average is over the last 10 responses that occurred in the last hour.
267 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
268 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
269 self._downloadSpeeds.pop(0)
271 # If there are none, then you get 0
272 if not self._downloadSpeeds:
275 for download in self._downloadSpeeds:
276 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
277 total_download += download[2]
279 return total_download / total_time
281 def responseTime(self):
282 """Gets the latest average response time for the peer.
284 Response time is the time from receiving the request, to the time
285 the download begins. The average is over the last 10 responses that
286 occurred in the last hour.
290 while self._responseTimes and (len(self._responseTimes) > 10 or
291 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
292 self._responseTimes.pop(0)
294 # If there are none, give it the benefit of the doubt
295 if not self._responseTimes:
298 for response in self._responseTimes:
299 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
301 return total_response / len(self._responseTimes)
304 """Determine the ranking value for the peer.
306 The ranking value is composed of 5 numbers, each exponentially
307 decreasing from 1 to 0 based on:
308 - if a connection to the peer is open
309 - the number of pending requests
310 - the time to download a single piece
311 - the number of errors
317 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
318 speed = self.downloadSpeed()
320 rank *= exp(-512.0*1024 / speed)
322 rank *= exp(-float(self._errors) / self._completed)
323 rank *= exp(-self.responseTime() / 5.0)
326 class TestClientManager(unittest.TestCase):
327 """Unit tests for the Peer."""
332 def gotResp(self, resp, num, expect):
333 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
334 if expect is not None:
335 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
340 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
342 def test_download(self):
343 """Tests a normal download."""
344 host = 'www.ietf.org'
345 self.client = Peer(host, 80)
348 d = self.client.get('/rfc/rfc0013.txt')
349 d.addCallback(self.gotResp, 1, 1070)
353 """Tests a 'HEAD' request."""
354 host = 'www.ietf.org'
355 self.client = Peer(host, 80)
358 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
359 d.addCallback(self.gotResp, 1, 0)
362 def test_multiple_downloads(self):
363 """Tests multiple downloads with queueing and connection closing."""
364 host = 'www.ietf.org'
365 self.client = Peer(host, 80)
367 lastDefer = defer.Deferred()
369 def newRequest(path, num, expect, last=False):
370 d = self.client.get(path)
371 d.addCallback(self.gotResp, num, expect)
373 d.addBoth(lastDefer.callback)
376 newRequest("/rfc/rfc0006.txt", 1, 1776)
377 newRequest("/rfc/rfc2362.txt", 2, 159833)
378 newRequest("/rfc/rfc0801.txt", 3, 40824)
380 # This one will probably be queued
381 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
383 # Connection should still be open, but idle
384 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
386 #Connection should be closed
387 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
388 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
389 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
390 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
392 # Now it should definitely be closed
393 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
396 def test_multiple_quick_downloads(self):
397 """Tests lots of multiple downloads with queueing."""
398 host = 'www.ietf.org'
399 self.client = Peer(host, 80)
401 lastDefer = defer.Deferred()
403 def newRequest(path, num, expect, last=False):
404 d = self.client.get(path)
405 d.addCallback(self.gotResp, num, expect)
407 d.addBoth(lastDefer.callback)
409 newRequest("/rfc/rfc0006.txt", 1, 1776)
410 newRequest("/rfc/rfc2362.txt", 2, 159833)
411 newRequest("/rfc/rfc0801.txt", 3, 40824)
412 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
413 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
414 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
415 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
416 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
417 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
418 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
422 log.msg('Rank is: %r' % self.client.rank)
423 log.msg('Download speed is: %r' % self.client.downloadSpeed())
424 log.msg('Response Time is: %r' % self.client.responseTime())
426 def test_peer_info(self):
427 """Test retrieving the peer info during a download."""
428 host = 'www.ietf.org'
429 self.client = Peer(host, 80)
431 lastDefer = defer.Deferred()
433 def newRequest(path, num, expect, last=False):
434 d = self.client.get(path)
435 d.addCallback(self.gotResp, num, expect)
437 d.addBoth(lastDefer.callback)
439 newRequest("/rfc/rfc0006.txt", 1, 1776)
440 newRequest("/rfc/rfc2362.txt", 2, 159833)
441 newRequest("/rfc/rfc0801.txt", 3, 40824)
442 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
443 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
444 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
445 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
446 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
447 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
448 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
450 for i in xrange(2, 122, 2):
451 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
455 def test_range(self):
456 """Test a Range request."""
457 host = 'www.ietf.org'
458 self.client = Peer(host, 80)
461 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
462 d.addCallback(self.gotResp, 1, 100)
466 for p in self.pending_calls:
469 self.pending_calls = []