2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class LoggingHTTPClientProtocol(HTTPClientProtocol):
21 """A modified client protocol that logs the number of bytes received."""
23 def __init__(self, factory, stats = None, mirror = False):
24 HTTPClientProtocol.__init__(self, factory)
28 def lineReceived(self, line):
30 self.stats.receivedBytes(len(line) + 2, self.mirror)
31 HTTPClientProtocol.lineReceived(self, line)
33 def rawDataReceived(self, data):
35 self.stats.receivedBytes(len(data), self.mirror)
36 HTTPClientProtocol.rawDataReceived(self, data)
38 class Peer(ClientFactory):
39 """A manager for all HTTP requests to a single peer.
41 Controls all requests that go to a single peer (host and port).
42 This includes buffering requests until they can be sent and reconnecting
43 in the event of the connection being closed.
47 implements(IHTTPClientManager)
49 def __init__(self, host, port = 80, stats = None):
58 self.connecting = False
59 self.request_queue = []
65 self._downloadSpeeds = []
66 self._lastResponse = None
67 self._responseTimes = []
70 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
72 #{ Manage the request queue
74 """Connect to the peer."""
75 assert self.closed and not self.connecting
76 self.connecting = True
77 d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
78 stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
79 d.addCallbacks(self.connected, self.connectionError)
81 def connected(self, proto):
82 """Begin processing the queued requests."""
84 self.connecting = False
86 reactor.callLater(0, self.processQueue)
88 def connectionError(self, err):
89 """Cancel the requests."""
90 log.msg('Failed to connect to the peer by HTTP.')
93 # Remove one request so that we don't loop indefinitely
94 if self.request_queue:
95 req = self.request_queue.pop(0)
96 req.deferRequest.errback(err)
102 self.connecting = False
103 self.clientGone(None)
106 """Close the connection to the peer."""
108 self.proto.transport.loseConnection()
110 def submitRequest(self, request):
111 """Add a new request to the queue.
113 @type request: L{twisted.web2.client.http.ClientRequest}
114 @return: deferred that will fire with the completed request
116 request.submissionTime = datetime.now()
117 request.deferRequest = defer.Deferred()
118 self.request_queue.append(request)
120 reactor.callLater(0, self.processQueue)
121 return request.deferRequest
123 def processQueue(self):
124 """Check the queue to see if new requests can be sent to the peer."""
125 if not self.request_queue:
132 if self.busy and not self.pipeline:
134 if self.outstanding and not self.pipeline:
137 req = self.request_queue.pop(0)
138 self.outstanding += 1
140 req.deferResponse = self.proto.submitRequest(req, False)
141 req.deferResponse.addCallbacks(self.requestComplete, self.requestError,
142 callbackArgs = (req, ), errbackArgs = (req, ))
144 def requestComplete(self, resp, req):
145 """Process a completed request."""
146 self._processLastResponse()
147 self.outstanding -= 1
148 assert self.outstanding >= 0
149 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
152 self._responseTimes.append((now, now - req.submissionTime))
153 self._lastResponse = (now, resp.stream.length)
155 req.deferRequest.callback(resp)
157 def requestError(self, error, req):
158 """Process a request that ended with an error."""
159 self._processLastResponse()
160 self.outstanding -= 1
161 assert self.outstanding >= 0
162 log.msg('Download of %s generated error %r' % (req.uri, error))
166 req.deferRequest.errback(error)
168 def hashError(self, error):
169 """Log that a hash error occurred from the peer."""
170 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
174 #{ IHTTPClientManager interface
175 def clientBusy(self, proto):
176 """Save the busy state."""
179 def clientIdle(self, proto):
180 """Try to send a new request."""
181 self._processLastResponse()
183 reactor.callLater(0, self.processQueue)
186 def clientPipelining(self, proto):
187 """Try to send a new request."""
189 reactor.callLater(0, self.processQueue)
191 def clientGone(self, proto):
192 """Mark sent requests as errors."""
193 self._processLastResponse()
195 self.pipeline = False
197 self.connecting = False
200 if self.request_queue:
201 reactor.callLater(0, self.processQueue)
203 #{ Downloading request interface
204 def setCommonHeaders(self):
205 """Get the common HTTP headers for all requests."""
206 headers = http_headers.Headers()
207 headers.setHeader('Host', self.host)
208 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
209 (version.short(), twisted_version.short(), web2_version.short()))
212 def get(self, path, method="GET", modtime=None):
213 """Add a new request to the queue.
215 @type path: C{string}
216 @param path: the path to request from the peer
217 @type method: C{string}
218 @param method: the HTTP method to use, 'GET' or 'HEAD'
219 (optional, defaults to 'GET')
220 @type modtime: C{int}
221 @param modtime: the modification time to use for an 'If-Modified-Since'
222 header, as seconds since the epoch
223 (optional, defaults to not sending that header)
225 headers = self.setCommonHeaders()
227 headers.setHeader('If-Modified-Since', modtime)
228 return self.submitRequest(ClientRequest(method, path, headers, None))
230 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
231 """Add a new request with a Range header to the queue.
233 @type path: C{string}
234 @param path: the path to request from the peer
235 @type rangeStart: C{int}
236 @param rangeStart: the byte to begin the request at
237 @type rangeEnd: C{int}
238 @param rangeEnd: the byte to end the request at (inclusive)
239 @type method: C{string}
240 @param method: the HTTP method to use, 'GET' or 'HEAD'
241 (optional, defaults to 'GET')
243 headers = self.setCommonHeaders()
244 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
245 return self.submitRequest(ClientRequest(method, path, headers, None))
249 """Check whether the peer is idle or not."""
250 return not self.busy and not self.request_queue and not self.outstanding
252 def _processLastResponse(self):
253 """Save the download time of the last request for speed calculations."""
254 if self._lastResponse is not None:
256 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
257 self._lastResponse = None
259 def downloadSpeed(self):
260 """Gets the latest average download speed for the peer.
262 The average is over the last 10 responses that occurred in the last hour.
267 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
268 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
269 self._downloadSpeeds.pop(0)
271 # If there are none, then you get 0
272 if not self._downloadSpeeds:
275 for download in self._downloadSpeeds:
276 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
277 total_download += download[2]
279 return total_download / total_time
281 def responseTime(self):
282 """Gets the latest average response time for the peer.
284 Response time is the time from receiving the request, to the time
285 the download begins. The average is over the last 10 responses that
286 occurred in the last hour.
290 while self._responseTimes and (len(self._responseTimes) > 10 or
291 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
292 self._responseTimes.pop(0)
294 # If there are none, give it the benefit of the doubt
295 if not self._responseTimes:
298 for response in self._responseTimes:
299 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
301 return total_response / len(self._responseTimes)
304 """Determine the ranking value for the peer.
306 The ranking value is composed of 5 numbers, each exponentially
307 decreasing from 1 to 0 based on:
308 - if a connection to the peer is open
309 - the number of pending requests
310 - the time to download a single piece
311 - the number of errors
317 rank *= exp(-(len(self.request_queue) + self.outstanding))
318 speed = self.downloadSpeed()
320 rank *= exp(-512.0*1024 / speed)
322 rank *= exp(-10.0 * self._errors / self._completed)
323 rank *= exp(-self.responseTime() / 5.0)
326 class TestClientManager(unittest.TestCase):
327 """Unit tests for the Peer."""
333 def gotResp(self, resp, num, expect):
334 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
335 if expect is not None:
336 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
337 while len(self.length) <= num:
338 self.length.append(0)
340 def addData(data, self = self, num = num):
341 self.length[num] += len(data)
342 def checkLength(resp, self = self, num = num, length = resp.stream.length):
343 self.failUnlessEqual(self.length[num], length)
345 df = stream_mod.readStream(resp.stream, addData)
346 df.addCallback(checkLength)
349 def test_download(self):
350 """Tests a normal download."""
351 host = 'www.ietf.org'
352 self.client = Peer(host, 80)
355 d = self.client.get('/rfc/rfc0013.txt')
356 d.addCallback(self.gotResp, 1, 1070)
360 """Tests a 'HEAD' request."""
361 host = 'www.ietf.org'
362 self.client = Peer(host, 80)
365 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
366 d.addCallback(self.gotResp, 1, 0)
369 def test_multiple_downloads(self):
370 """Tests multiple downloads with queueing and connection closing."""
371 host = 'www.ietf.org'
372 self.client = Peer(host, 80)
374 lastDefer = defer.Deferred()
376 def newRequest(path, num, expect, last=False):
377 d = self.client.get(path)
378 d.addCallback(self.gotResp, num, expect)
380 d.addBoth(lastDefer.callback)
383 newRequest("/rfc/rfc0006.txt", 1, 1776)
384 newRequest("/rfc/rfc2362.txt", 2, 159833)
385 newRequest("/rfc/rfc0801.txt", 3, 40824)
387 # This one will probably be queued
388 self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
390 # Connection should still be open, but idle
391 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
393 #Connection should be closed
394 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
395 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
396 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
397 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
399 # Now it should definitely be closed
400 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
403 def test_multiple_quick_downloads(self):
404 """Tests lots of multiple downloads with queueing."""
405 host = 'www.ietf.org'
406 self.client = Peer(host, 80)
408 lastDefer = defer.Deferred()
410 def newRequest(path, num, expect, last=False):
411 d = self.client.get(path)
412 d.addCallback(self.gotResp, num, expect)
414 d.addBoth(lastDefer.callback)
416 newRequest("/rfc/rfc0006.txt", 1, 1776)
417 newRequest("/rfc/rfc2362.txt", 2, 159833)
418 newRequest("/rfc/rfc0801.txt", 3, 40824)
419 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
420 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
421 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
422 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
423 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
424 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
425 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
429 log.msg('Rank is: %r' % self.client.rank)
430 log.msg('Download speed is: %r' % self.client.downloadSpeed())
431 log.msg('Response Time is: %r' % self.client.responseTime())
433 def test_peer_info(self):
434 """Test retrieving the peer info during a download."""
435 host = 'www.ietf.org'
436 self.client = Peer(host, 80)
438 lastDefer = defer.Deferred()
440 def newRequest(path, num, expect, last=False):
441 d = self.client.get(path)
442 d.addCallback(self.gotResp, num, expect)
444 d.addBoth(lastDefer.callback)
446 newRequest("/rfc/rfc0006.txt", 1, 1776)
447 newRequest("/rfc/rfc2362.txt", 2, 159833)
448 newRequest("/rfc/rfc0801.txt", 3, 40824)
449 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
450 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
451 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
452 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
453 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
454 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
455 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
457 for i in xrange(2, 122, 2):
458 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
462 def test_range(self):
463 """Test a Range request."""
464 host = 'www.ietf.org'
465 self.client = Peer(host, 80)
468 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
469 d.addCallback(self.gotResp, 1, 100)
473 for p in self.pending_calls:
476 self.pending_calls = []