2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
39 self.connecting = False
40 self.request_queue = []
41 self.response_queue = []
46 self._downloadSpeeds = []
47 self._lastResponse = None
48 self._responseTimes = []
51 return "(%r, %r, %r)" % (self.host, self.port, self.rank)
53 #{ Manage the request queue
55 """Connect to the peer."""
56 assert self.closed and not self.connecting
57 self.connecting = True
58 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
59 d.addCallback(self.connected)
61 def connected(self, proto):
62 """Begin processing the queued requests."""
64 self.connecting = False
69 """Close the connection to the peer."""
71 self.proto.transport.loseConnection()
73 def submitRequest(self, request):
74 """Add a new request to the queue.
76 @type request: L{twisted.web2.client.http.ClientRequest}
77 @return: deferred that will fire with the completed request
79 request.submissionTime = datetime.now()
80 request.deferRequest = defer.Deferred()
81 self.request_queue.append(request)
84 return request.deferRequest
86 def processQueue(self):
87 """Check the queue to see if new requests can be sent to the peer."""
88 if not self.request_queue:
95 if self.busy and not self.pipeline:
97 if self.response_queue and not self.pipeline:
100 req = self.request_queue.pop(0)
101 self.response_queue.append(req)
103 req.deferResponse = self.proto.submitRequest(req, False)
104 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
106 def requestComplete(self, resp):
107 """Process a completed request."""
108 self._processLastResponse()
109 req = self.response_queue.pop(0)
110 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
113 self._responseTimes.append((now, now - req.submissionTime))
114 self._lastResponse = (now, resp.stream.length)
116 req.deferRequest.callback(resp)
118 def requestError(self, error):
119 """Process a request that ended with an error."""
120 self._processLastResponse()
121 req = self.response_queue.pop(0)
122 log.msg('Download of %s generated error %r' % (req.uri, error))
126 req.deferRequest.errback(error)
128 def hashError(self, error):
129 """Log that a hash error occurred from the peer."""
130 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
134 #{ IHTTPClientManager interface
135 def clientBusy(self, proto):
136 """Save the busy state."""
139 def clientIdle(self, proto):
140 """Try to send a new request."""
141 self._processLastResponse()
146 def clientPipelining(self, proto):
147 """Try to send a new request."""
151 def clientGone(self, proto):
152 """Mark sent requests as errors."""
153 self._processLastResponse()
154 for req in self.response_queue:
155 req.deferRequest.errback(ProtocolError('lost connection'))
157 self.pipeline = False
159 self.connecting = False
160 self.response_queue = []
163 if self.request_queue:
166 #{ Downloading request interface
167 def setCommonHeaders(self):
168 """Get the common HTTP headers for all requests."""
169 headers = http_headers.Headers()
170 headers.setHeader('Host', self.host)
171 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
172 (version.short(), twisted_version.short(), web2_version.short()))
175 def get(self, path, method="GET", modtime=None):
176 """Add a new request to the queue.
178 @type path: C{string}
179 @param path: the path to request from the peer
180 @type method: C{string}
181 @param method: the HTTP method to use, 'GET' or 'HEAD'
182 (optional, defaults to 'GET')
183 @type modtime: C{int}
184 @param modtime: the modification time to use for an 'If-Modified-Since'
185 header, as seconds since the epoch
186 (optional, defaults to not sending that header)
188 headers = self.setCommonHeaders()
190 headers.setHeader('If-Modified-Since', modtime)
191 return self.submitRequest(ClientRequest(method, path, headers, None))
193 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
194 """Add a new request with a Range header to the queue.
196 @type path: C{string}
197 @param path: the path to request from the peer
198 @type rangeStart: C{int}
199 @param rangeStart: the byte to begin the request at
200 @type rangeEnd: C{int}
201 @param rangeEnd: the byte to end the request at (inclusive)
202 @type method: C{string}
203 @param method: the HTTP method to use, 'GET' or 'HEAD'
204 (optional, defaults to 'GET')
206 headers = self.setCommonHeaders()
207 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
208 return self.submitRequest(ClientRequest(method, path, headers, None))
212 """Check whether the peer is idle or not."""
213 return not self.busy and not self.request_queue and not self.response_queue
215 def _processLastResponse(self):
216 """Save the download time of the last request for speed calculations."""
217 if self._lastResponse is not None:
219 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
220 self._lastResponse = None
222 def downloadSpeed(self):
223 """Gets the latest average download speed for the peer.
225 The average is over the last 10 responses that occurred in the last hour.
230 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
231 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
232 self._downloadSpeeds.pop(0)
234 # If there are none, then you get 0
235 if not self._downloadSpeeds:
238 for download in self._downloadSpeeds:
239 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
240 total_download += download[2]
242 return total_download / total_time
244 def responseTime(self):
245 """Gets the latest average response time for the peer.
247 Response time is the time from receiving the request, to the time
248 the download begins. The average is over the last 10 responses that
249 occurred in the last hour.
253 while self._responseTimes and (len(self._responseTimes) > 10 or
254 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
255 self._responseTimes.pop(0)
257 # If there are none, give it the benefit of the doubt
258 if not self._responseTimes:
261 for response in self._responseTimes:
262 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
264 return total_response / len(self._responseTimes)
267 """Determine the ranking value for the peer.
269 The ranking value is composed of 5 numbers, each exponentially
270 decreasing from 1 to 0 based on:
271 - if a connection to the peer is open
272 - the number of pending requests
273 - the time to download a single piece
274 - the number of errors
280 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
281 speed = self.downloadSpeed()
283 rank *= exp(-512.0*1024 / speed)
285 rank *= exp(-float(self._errors) / self._completed)
286 rank *= exp(-self.responseTime() / 5.0)
289 class TestClientManager(unittest.TestCase):
290 """Unit tests for the Peer."""
295 def gotResp(self, resp, num, expect):
296 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
297 if expect is not None:
298 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
303 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
305 def test_download(self):
306 """Tests a normal download."""
307 host = 'www.ietf.org'
308 self.client = Peer(host, 80)
311 d = self.client.get('/rfc/rfc0013.txt')
312 d.addCallback(self.gotResp, 1, 1070)
316 """Tests a 'HEAD' request."""
317 host = 'www.ietf.org'
318 self.client = Peer(host, 80)
321 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
322 d.addCallback(self.gotResp, 1, 0)
325 def test_multiple_downloads(self):
326 """Tests multiple downloads with queueing and connection closing."""
327 host = 'www.ietf.org'
328 self.client = Peer(host, 80)
330 lastDefer = defer.Deferred()
332 def newRequest(path, num, expect, last=False):
333 d = self.client.get(path)
334 d.addCallback(self.gotResp, num, expect)
336 d.addBoth(lastDefer.callback)
339 newRequest("/rfc/rfc0006.txt", 1, 1776)
340 newRequest("/rfc/rfc2362.txt", 2, 159833)
341 newRequest("/rfc/rfc0801.txt", 3, 40824)
343 # This one will probably be queued
344 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
346 # Connection should still be open, but idle
347 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
349 #Connection should be closed
350 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
351 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
352 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
353 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
355 # Now it should definitely be closed
356 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
359 def test_multiple_quick_downloads(self):
360 """Tests lots of multiple downloads with queueing."""
361 host = 'www.ietf.org'
362 self.client = Peer(host, 80)
364 lastDefer = defer.Deferred()
366 def newRequest(path, num, expect, last=False):
367 d = self.client.get(path)
368 d.addCallback(self.gotResp, num, expect)
370 d.addBoth(lastDefer.callback)
372 newRequest("/rfc/rfc0006.txt", 1, 1776)
373 newRequest("/rfc/rfc2362.txt", 2, 159833)
374 newRequest("/rfc/rfc0801.txt", 3, 40824)
375 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
376 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
377 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
378 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
379 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
380 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
381 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
385 log.msg('Rank is: %r' % self.client.rank)
386 log.msg('Download speed is: %r' % self.client.downloadSpeed())
387 log.msg('Response Time is: %r' % self.client.responseTime())
389 def test_peer_info(self):
390 """Test retrieving the peer info during a download."""
391 host = 'www.ietf.org'
392 self.client = Peer(host, 80)
394 lastDefer = defer.Deferred()
396 def newRequest(path, num, expect, last=False):
397 d = self.client.get(path)
398 d.addCallback(self.gotResp, num, expect)
400 d.addBoth(lastDefer.callback)
402 newRequest("/rfc/rfc0006.txt", 1, 1776)
403 newRequest("/rfc/rfc2362.txt", 2, 159833)
404 newRequest("/rfc/rfc0801.txt", 3, 40824)
405 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
406 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
407 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
408 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
409 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
410 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
411 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
413 for i in xrange(2, 122, 2):
414 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
418 def test_range(self):
419 """Test a Range request."""
420 host = 'www.ietf.org'
421 self.client = Peer(host, 80)
424 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
425 d.addCallback(self.gotResp, 1, 100)
429 for p in self.pending_calls:
432 self.pending_calls = []