2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
38 self.connecting = False
39 self.request_queue = []
40 self.response_queue = []
45 self._downloadSpeeds = []
46 self._lastResponse = None
47 self._responseTimes = []
50 return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank)
52 #{ Manage the request queue
54 """Connect to the peer."""
55 assert self.closed and not self.connecting
56 self.connecting = True
57 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
58 d.addCallback(self.connected)
60 def connected(self, proto):
61 """Begin processing the queued requests."""
63 self.connecting = False
68 """Close the connection to the peer."""
70 self.proto.transport.loseConnection()
72 def submitRequest(self, request):
73 """Add a new request to the queue.
75 @type request: L{twisted.web2.client.http.ClientRequest}
76 @return: deferred that will fire with the completed request
78 request.submissionTime = datetime.now()
79 request.deferRequest = defer.Deferred()
80 self.request_queue.append(request)
83 return request.deferRequest
85 def processQueue(self):
86 """Check the queue to see if new requests can be sent to the peer."""
87 if not self.request_queue:
94 if self.busy and not self.pipeline:
96 if self.response_queue and not self.pipeline:
99 req = self.request_queue.pop(0)
100 self.response_queue.append(req)
102 req.deferResponse = self.proto.submitRequest(req, False)
103 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
105 def requestComplete(self, resp):
106 """Process a completed request."""
107 self._processLastResponse()
108 req = self.response_queue.pop(0)
109 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
112 self._responseTimes.append((now, now - req.submissionTime))
113 self._lastResponse = (now, resp.stream.length)
115 req.deferRequest.callback(resp)
117 def requestError(self, error):
118 """Process a request that ended with an error."""
119 self._processLastResponse()
120 req = self.response_queue.pop(0)
121 log.msg('Download of %s generated error %r' % (req.uri, error))
125 req.deferRequest.errback(error)
127 def hashError(self, error):
128 """Log that a hash error occurred from the peer."""
129 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
133 #{ IHTTPClientManager interface
134 def clientBusy(self, proto):
135 """Save the busy state."""
138 def clientIdle(self, proto):
139 """Try to send a new request."""
140 self._processLastResponse()
145 def clientPipelining(self, proto):
146 """Try to send a new request."""
150 def clientGone(self, proto):
151 """Mark sent requests as errors."""
152 self._processLastResponse()
153 for req in self.response_queue:
154 req.deferRequest.errback(ProtocolError('lost connection'))
156 self.pipeline = False
158 self.connecting = False
159 self.response_queue = []
162 if self.request_queue:
165 #{ Downloading request interface
166 def setCommonHeaders(self):
167 """Get the common HTTP headers for all requests."""
168 headers = http_headers.Headers()
169 headers.setHeader('Host', self.host)
170 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
171 (version.short(), twisted_version.short(), web2_version.short()))
174 def get(self, path, method="GET", modtime=None):
175 """Add a new request to the queue.
177 @type path: C{string}
178 @param path: the path to request from the peer
179 @type method: C{string}
180 @param method: the HTTP method to use, 'GET' or 'HEAD'
181 (optional, defaults to 'GET')
182 @type modtime: C{int}
183 @param modtime: the modification time to use for an 'If-Modified-Since'
184 header, as seconds since the epoch
185 (optional, defaults to not sending that header)
187 headers = self.setCommonHeaders()
189 headers.setHeader('If-Modified-Since', modtime)
190 return self.submitRequest(ClientRequest(method, path, headers, None))
192 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
193 """Add a new request with a Range header to the queue.
195 @type path: C{string}
196 @param path: the path to request from the peer
197 @type rangeStart: C{int}
198 @param rangeStart: the byte to begin the request at
199 @type rangeEnd: C{int}
200 @param rangeEnd: the byte to end the request at (inclusive)
201 @type method: C{string}
202 @param method: the HTTP method to use, 'GET' or 'HEAD'
203 (optional, defaults to 'GET')
205 headers = self.setCommonHeaders()
206 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
207 return self.submitRequest(ClientRequest(method, path, headers, None))
211 """Check whether the peer is idle or not."""
212 return not self.busy and not self.request_queue and not self.response_queue
214 def _processLastResponse(self):
215 """Save the download time of the last request for speed calculations."""
216 if self._lastResponse is not None:
218 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
219 self._lastResponse = None
221 def downloadSpeed(self):
222 """Gets the latest average download speed for the peer.
224 The average is over the last 10 responses that occurred in the last hour.
229 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
230 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
231 self._downloadSpeeds.pop(0)
233 # If there are none, then you get 0
234 if not self._downloadSpeeds:
237 for download in self._downloadSpeeds:
238 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
239 total_download += download[2]
241 return total_download / total_time
243 def responseTime(self):
244 """Gets the latest average response time for the peer.
246 Response time is the time from receiving the request, to the time
247 the download begins. The average is over the last 10 responses that
248 occurred in the last hour.
252 while self._responseTimes and (len(self._responseTimes) > 10 or
253 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
254 self._responseTimes.pop(0)
256 # If there are none, give it the benefit of the doubt
257 if not self._responseTimes:
260 for response in self._responseTimes:
261 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
263 return total_response / len(self._responseTimes)
266 """Determine the ranking value for the peer.
268 The ranking value is composed of 5 numbers, each exponentially
269 decreasing from 1 to 0 based on:
270 - if a connection to the peer is open
271 - the number of pending requests
272 - the time to download a single piece
273 - the number of errors
279 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
280 speed = self.downloadSpeed()
282 rank *= exp(-512.0*1024 / speed)
284 rank *= exp(-float(self._errors) / self._completed)
285 rank *= exp(-self.responseTime() / 5.0)
288 class TestClientManager(unittest.TestCase):
289 """Unit tests for the Peer."""
294 def gotResp(self, resp, num, expect):
295 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
296 if expect is not None:
297 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
302 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
304 def test_download(self):
305 """Tests a normal download."""
306 host = 'www.ietf.org'
307 self.client = Peer(host, 80)
310 d = self.client.get('/rfc/rfc0013.txt')
311 d.addCallback(self.gotResp, 1, 1070)
315 """Tests a 'HEAD' request."""
316 host = 'www.ietf.org'
317 self.client = Peer(host, 80)
320 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
321 d.addCallback(self.gotResp, 1, 0)
324 def test_multiple_downloads(self):
325 """Tests multiple downloads with queueing and connection closing."""
326 host = 'www.ietf.org'
327 self.client = Peer(host, 80)
329 lastDefer = defer.Deferred()
331 def newRequest(path, num, expect, last=False):
332 d = self.client.get(path)
333 d.addCallback(self.gotResp, num, expect)
335 d.addBoth(lastDefer.callback)
338 newRequest("/rfc/rfc0006.txt", 1, 1776)
339 newRequest("/rfc/rfc2362.txt", 2, 159833)
340 newRequest("/rfc/rfc0801.txt", 3, 40824)
342 # This one will probably be queued
343 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
345 # Connection should still be open, but idle
346 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
348 #Connection should be closed
349 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
350 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
351 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
352 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
354 # Now it should definitely be closed
355 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
358 def test_multiple_quick_downloads(self):
359 """Tests lots of multiple downloads with queueing."""
360 host = 'www.ietf.org'
361 self.client = Peer(host, 80)
363 lastDefer = defer.Deferred()
365 def newRequest(path, num, expect, last=False):
366 d = self.client.get(path)
367 d.addCallback(self.gotResp, num, expect)
369 d.addBoth(lastDefer.callback)
371 newRequest("/rfc/rfc0006.txt", 1, 1776)
372 newRequest("/rfc/rfc2362.txt", 2, 159833)
373 newRequest("/rfc/rfc0801.txt", 3, 40824)
374 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
375 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
376 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
377 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
378 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
379 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
380 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
384 log.msg('Rank is: %r' % self.client.rank)
385 log.msg('Download speed is: %r' % self.client.downloadSpeed())
386 log.msg('Response Time is: %r' % self.client.responseTime())
388 def test_peer_info(self):
389 """Test retrieving the peer info during a download."""
390 host = 'www.ietf.org'
391 self.client = Peer(host, 80)
393 lastDefer = defer.Deferred()
395 def newRequest(path, num, expect, last=False):
396 d = self.client.get(path)
397 d.addCallback(self.gotResp, num, expect)
399 d.addBoth(lastDefer.callback)
401 newRequest("/rfc/rfc0006.txt", 1, 1776)
402 newRequest("/rfc/rfc2362.txt", 2, 159833)
403 newRequest("/rfc/rfc0801.txt", 3, 40824)
404 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
405 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
406 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
407 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
408 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
409 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
410 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
412 for i in xrange(2, 122, 2):
413 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
417 def test_range(self):
418 """Test a Range request."""
419 host = 'www.ietf.org'
420 self.client = Peer(host, 80)
423 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
424 d.addCallback(self.gotResp, 1, 100)
428 for p in self.pending_calls:
431 self.pending_calls = []