2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
37 self.connecting = False
38 self.request_queue = []
39 self.response_queue = []
44 self._downloadSpeeds = []
45 self._lastResponse = None
46 self._responseTimes = []
48 #{ Manage the request queue
50 """Connect to the peer."""
51 assert self.closed and not self.connecting
52 self.connecting = True
53 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
54 d.addCallback(self.connected)
56 def connected(self, proto):
57 """Begin processing the queued requests."""
59 self.connecting = False
64 """Close the connection to the peer."""
66 self.proto.transport.loseConnection()
68 def submitRequest(self, request):
69 """Add a new request to the queue.
71 @type request: L{twisted.web2.client.http.ClientRequest}
72 @return: deferred that will fire with the completed request
74 request.submissionTime = datetime.now()
75 request.deferRequest = defer.Deferred()
76 self.request_queue.append(request)
78 return request.deferRequest
80 def processQueue(self):
81 """Check the queue to see if new requests can be sent to the peer."""
82 if not self.request_queue:
89 if self.busy and not self.pipeline:
91 if self.response_queue and not self.pipeline:
94 req = self.request_queue.pop(0)
95 self.response_queue.append(req)
96 req.deferResponse = self.proto.submitRequest(req, False)
97 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
99 def requestComplete(self, resp):
100 """Process a completed request."""
101 self._processLastResponse()
102 req = self.response_queue.pop(0)
103 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
108 self._responseTimes.append((now, now - req.submissionTime))
109 self._lastResponse = (now, resp.stream.length)
110 req.deferRequest.callback(resp)
112 def requestError(self, error):
113 """Process a request that ended with an error."""
114 self._processLastResponse()
115 req = self.response_queue.pop(0)
116 log.msg('Download of %s generated error %r' % (req.uri, error))
119 req.deferRequest.errback(error)
121 def hashError(self, error):
122 """Log that a hash error occurred from the peer."""
123 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
126 #{ IHTTPClientManager interface
127 def clientBusy(self, proto):
128 """Save the busy state."""
131 def clientIdle(self, proto):
132 """Try to send a new request."""
133 self._processLastResponse()
137 def clientPipelining(self, proto):
138 """Try to send a new request."""
142 def clientGone(self, proto):
143 """Mark sent requests as errors."""
144 self._processLastResponse()
145 for req in self.response_queue:
146 req.deferRequest.errback(ProtocolError('lost connection'))
148 self.pipeline = False
150 self.connecting = False
151 self.response_queue = []
153 if self.request_queue:
156 #{ Downloading request interface
157 def setCommonHeaders(self):
158 """Get the common HTTP headers for all requests."""
159 headers = http_headers.Headers()
160 headers.setHeader('Host', self.host)
161 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
162 (version.short(), twisted_version.short(), web2_version.short()))
165 def get(self, path, method="GET", modtime=None):
166 """Add a new request to the queue.
168 @type path: C{string}
169 @param path: the path to request from the peer
170 @type method: C{string}
171 @param method: the HTTP method to use, 'GET' or 'HEAD'
172 (optional, defaults to 'GET')
173 @type modtime: C{int}
174 @param modtime: the modification time to use for an 'If-Modified-Since'
175 header, as seconds since the epoch
176 (optional, defaults to not sending that header)
178 headers = self.setCommonHeaders()
180 headers.setHeader('If-Modified-Since', modtime)
181 return self.submitRequest(ClientRequest(method, path, headers, None))
183 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
184 """Add a new request with a Range header to the queue.
186 @type path: C{string}
187 @param path: the path to request from the peer
188 @type rangeStart: C{int}
189 @param rangeStart: the byte to begin the request at
190 @type rangeEnd: C{int}
191 @param rangeEnd: the byte to end the request at (inclusive)
192 @type method: C{string}
193 @param method: the HTTP method to use, 'GET' or 'HEAD'
194 (optional, defaults to 'GET')
196 headers = self.setCommonHeaders()
197 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
198 return self.submitRequest(ClientRequest(method, path, headers, None))
202 """Check whether the peer is idle or not."""
203 return not self.busy and not self.request_queue and not self.response_queue
205 def _processLastResponse(self):
206 """Save the download time of the last request for speed calculations."""
207 if self._lastResponse is not None:
209 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
210 self._lastResponse = None
212 def downloadSpeed(self):
213 """Gets the latest average download speed for the peer.
215 The average is over the last 10 responses that occurred in the last hour.
220 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
221 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
222 self._downloadSpeeds.pop(0)
224 # If there are none, then you get 0
225 if not self._downloadSpeeds:
228 for download in self._downloadSpeeds:
229 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
230 total_download += download[2]
232 return total_download / total_time
234 def responseTime(self):
235 """Gets the latest average response time for the peer.
237 Response time is the time from receiving the request, to the time
238 the download begins. The average is over the last 10 responses that
239 occurred in the last hour.
243 while self._responseTimes and (len(self._responseTimes) > 10 or
244 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
245 self._responseTimes.pop(0)
247 # If there are none, give it the benefit of the doubt
248 if not self._responseTimes:
251 for response in self._responseTimes:
252 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
254 return total_response / len(self._responseTimes)
257 """Determine the ranking value for the peer.
259 The ranking value is composed of 5 numbers, each exponentially
260 decreasing from 1 to 0 based on:
261 - if a connection to the peer is open
262 - the number of pending requests
263 - the time to download a single piece
264 - the number of errors
270 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
271 speed = self.downloadSpeed()
273 rank *= exp(-512.0*1024 / speed)
275 rank *= exp(-float(self._errors) / self._completed)
276 rank *= exp(-self.responseTime() / 5.0)
279 class TestClientManager(unittest.TestCase):
280 """Unit tests for the Peer."""
285 def gotResp(self, resp, num, expect):
286 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
287 if expect is not None:
288 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
293 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
295 def test_download(self):
296 """Tests a normal download."""
297 host = 'www.ietf.org'
298 self.client = Peer(host, 80)
301 d = self.client.get('/rfc/rfc0013.txt')
302 d.addCallback(self.gotResp, 1, 1070)
306 """Tests a 'HEAD' request."""
307 host = 'www.ietf.org'
308 self.client = Peer(host, 80)
311 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
312 d.addCallback(self.gotResp, 1, 0)
315 def test_multiple_downloads(self):
316 """Tests multiple downloads with queueing and connection closing."""
317 host = 'www.ietf.org'
318 self.client = Peer(host, 80)
320 lastDefer = defer.Deferred()
322 def newRequest(path, num, expect, last=False):
323 d = self.client.get(path)
324 d.addCallback(self.gotResp, num, expect)
326 d.addBoth(lastDefer.callback)
329 newRequest("/rfc/rfc0006.txt", 1, 1776)
330 newRequest("/rfc/rfc2362.txt", 2, 159833)
331 newRequest("/rfc/rfc0801.txt", 3, 40824)
333 # This one will probably be queued
334 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
336 # Connection should still be open, but idle
337 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
339 #Connection should be closed
340 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
341 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
342 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
343 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
345 # Now it should definitely be closed
346 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
349 def test_multiple_quick_downloads(self):
350 """Tests lots of multiple downloads with queueing."""
351 host = 'www.ietf.org'
352 self.client = Peer(host, 80)
354 lastDefer = defer.Deferred()
356 def newRequest(path, num, expect, last=False):
357 d = self.client.get(path)
358 d.addCallback(self.gotResp, num, expect)
360 d.addBoth(lastDefer.callback)
362 newRequest("/rfc/rfc0006.txt", 1, 1776)
363 newRequest("/rfc/rfc2362.txt", 2, 159833)
364 newRequest("/rfc/rfc0801.txt", 3, 40824)
365 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
366 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
367 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
368 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
369 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
370 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
371 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
375 log.msg('Rank is: %r' % self.client.rank(250.0*1024))
376 log.msg('Download speed is: %r' % self.client.downloadSpeed())
377 log.msg('Response Time is: %r' % self.client.responseTime())
379 def test_peer_info(self):
380 """Test retrieving the peer info during a download."""
381 host = 'www.ietf.org'
382 self.client = Peer(host, 80)
384 lastDefer = defer.Deferred()
386 def newRequest(path, num, expect, last=False):
387 d = self.client.get(path)
388 d.addCallback(self.gotResp, num, expect)
390 d.addBoth(lastDefer.callback)
392 newRequest("/rfc/rfc0006.txt", 1, 1776)
393 newRequest("/rfc/rfc2362.txt", 2, 159833)
394 newRequest("/rfc/rfc0801.txt", 3, 40824)
395 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
396 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
397 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
398 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
399 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
400 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
401 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
403 for i in xrange(2, 122, 2):
404 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
408 def test_range(self):
409 """Test a Range request."""
410 host = 'www.ietf.org'
411 self.client = Peer(host, 80)
414 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
415 d.addCallback(self.gotResp, 1, 100)
419 for p in self.pending_calls:
422 self.pending_calls = []