2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
38 self.connecting = False
39 self.request_queue = []
40 self.response_queue = []
45 self._downloadSpeeds = []
46 self._lastResponse = None
47 self._responseTimes = []
49 #{ Manage the request queue
51 """Connect to the peer."""
52 assert self.closed and not self.connecting
53 self.connecting = True
54 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
55 d.addCallback(self.connected)
57 def connected(self, proto):
58 """Begin processing the queued requests."""
60 self.connecting = False
65 """Close the connection to the peer."""
67 self.proto.transport.loseConnection()
69 def submitRequest(self, request):
70 """Add a new request to the queue.
72 @type request: L{twisted.web2.client.http.ClientRequest}
73 @return: deferred that will fire with the completed request
75 request.submissionTime = datetime.now()
76 request.deferRequest = defer.Deferred()
77 self.request_queue.append(request)
80 return request.deferRequest
82 def processQueue(self):
83 """Check the queue to see if new requests can be sent to the peer."""
84 if not self.request_queue:
91 if self.busy and not self.pipeline:
93 if self.response_queue and not self.pipeline:
96 req = self.request_queue.pop(0)
97 self.response_queue.append(req)
99 req.deferResponse = self.proto.submitRequest(req, False)
100 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
102 def requestComplete(self, resp):
103 """Process a completed request."""
104 self._processLastResponse()
105 req = self.response_queue.pop(0)
106 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
111 self._responseTimes.append((now, now - req.submissionTime))
112 self._lastResponse = (now, resp.stream.length)
114 req.deferRequest.callback(resp)
116 def requestError(self, error):
117 """Process a request that ended with an error."""
118 self._processLastResponse()
119 req = self.response_queue.pop(0)
120 log.msg('Download of %s generated error %r' % (req.uri, error))
124 req.deferRequest.errback(error)
126 def hashError(self, error):
127 """Log that a hash error occurred from the peer."""
128 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
132 #{ IHTTPClientManager interface
133 def clientBusy(self, proto):
134 """Save the busy state."""
137 def clientIdle(self, proto):
138 """Try to send a new request."""
139 self._processLastResponse()
144 def clientPipelining(self, proto):
145 """Try to send a new request."""
149 def clientGone(self, proto):
150 """Mark sent requests as errors."""
151 self._processLastResponse()
152 for req in self.response_queue:
153 req.deferRequest.errback(ProtocolError('lost connection'))
155 self.pipeline = False
157 self.connecting = False
158 self.response_queue = []
161 if self.request_queue:
164 #{ Downloading request interface
165 def setCommonHeaders(self):
166 """Get the common HTTP headers for all requests."""
167 headers = http_headers.Headers()
168 headers.setHeader('Host', self.host)
169 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
170 (version.short(), twisted_version.short(), web2_version.short()))
173 def get(self, path, method="GET", modtime=None):
174 """Add a new request to the queue.
176 @type path: C{string}
177 @param path: the path to request from the peer
178 @type method: C{string}
179 @param method: the HTTP method to use, 'GET' or 'HEAD'
180 (optional, defaults to 'GET')
181 @type modtime: C{int}
182 @param modtime: the modification time to use for an 'If-Modified-Since'
183 header, as seconds since the epoch
184 (optional, defaults to not sending that header)
186 headers = self.setCommonHeaders()
188 headers.setHeader('If-Modified-Since', modtime)
189 return self.submitRequest(ClientRequest(method, path, headers, None))
191 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
192 """Add a new request with a Range header to the queue.
194 @type path: C{string}
195 @param path: the path to request from the peer
196 @type rangeStart: C{int}
197 @param rangeStart: the byte to begin the request at
198 @type rangeEnd: C{int}
199 @param rangeEnd: the byte to end the request at (inclusive)
200 @type method: C{string}
201 @param method: the HTTP method to use, 'GET' or 'HEAD'
202 (optional, defaults to 'GET')
204 headers = self.setCommonHeaders()
205 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
206 return self.submitRequest(ClientRequest(method, path, headers, None))
210 """Check whether the peer is idle or not."""
211 return not self.busy and not self.request_queue and not self.response_queue
213 def _processLastResponse(self):
214 """Save the download time of the last request for speed calculations."""
215 if self._lastResponse is not None:
217 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
218 self._lastResponse = None
220 def downloadSpeed(self):
221 """Gets the latest average download speed for the peer.
223 The average is over the last 10 responses that occurred in the last hour.
228 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
229 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
230 self._downloadSpeeds.pop(0)
232 # If there are none, then you get 0
233 if not self._downloadSpeeds:
236 for download in self._downloadSpeeds:
237 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
238 total_download += download[2]
240 return total_download / total_time
242 def responseTime(self):
243 """Gets the latest average response time for the peer.
245 Response time is the time from receiving the request, to the time
246 the download begins. The average is over the last 10 responses that
247 occurred in the last hour.
251 while self._responseTimes and (len(self._responseTimes) > 10 or
252 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
253 self._responseTimes.pop(0)
255 # If there are none, give it the benefit of the doubt
256 if not self._responseTimes:
259 for response in self._responseTimes:
260 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
262 return total_response / len(self._responseTimes)
265 """Determine the ranking value for the peer.
267 The ranking value is composed of 5 numbers, each exponentially
268 decreasing from 1 to 0 based on:
269 - if a connection to the peer is open
270 - the number of pending requests
271 - the time to download a single piece
272 - the number of errors
278 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
279 speed = self.downloadSpeed()
281 rank *= exp(-512.0*1024 / speed)
283 rank *= exp(-float(self._errors) / self._completed)
284 rank *= exp(-self.responseTime() / 5.0)
287 class TestClientManager(unittest.TestCase):
288 """Unit tests for the Peer."""
293 def gotResp(self, resp, num, expect):
294 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
295 if expect is not None:
296 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
301 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
303 def test_download(self):
304 """Tests a normal download."""
305 host = 'www.ietf.org'
306 self.client = Peer(host, 80)
309 d = self.client.get('/rfc/rfc0013.txt')
310 d.addCallback(self.gotResp, 1, 1070)
314 """Tests a 'HEAD' request."""
315 host = 'www.ietf.org'
316 self.client = Peer(host, 80)
319 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
320 d.addCallback(self.gotResp, 1, 0)
323 def test_multiple_downloads(self):
324 """Tests multiple downloads with queueing and connection closing."""
325 host = 'www.ietf.org'
326 self.client = Peer(host, 80)
328 lastDefer = defer.Deferred()
330 def newRequest(path, num, expect, last=False):
331 d = self.client.get(path)
332 d.addCallback(self.gotResp, num, expect)
334 d.addBoth(lastDefer.callback)
337 newRequest("/rfc/rfc0006.txt", 1, 1776)
338 newRequest("/rfc/rfc2362.txt", 2, 159833)
339 newRequest("/rfc/rfc0801.txt", 3, 40824)
341 # This one will probably be queued
342 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
344 # Connection should still be open, but idle
345 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
347 #Connection should be closed
348 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
349 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
350 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
351 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
353 # Now it should definitely be closed
354 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
357 def test_multiple_quick_downloads(self):
358 """Tests lots of multiple downloads with queueing."""
359 host = 'www.ietf.org'
360 self.client = Peer(host, 80)
362 lastDefer = defer.Deferred()
364 def newRequest(path, num, expect, last=False):
365 d = self.client.get(path)
366 d.addCallback(self.gotResp, num, expect)
368 d.addBoth(lastDefer.callback)
370 newRequest("/rfc/rfc0006.txt", 1, 1776)
371 newRequest("/rfc/rfc2362.txt", 2, 159833)
372 newRequest("/rfc/rfc0801.txt", 3, 40824)
373 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
374 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
375 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
376 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
377 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
378 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
379 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
383 log.msg('Rank is: %r' % self.client.rank)
384 log.msg('Download speed is: %r' % self.client.downloadSpeed())
385 log.msg('Response Time is: %r' % self.client.responseTime())
387 def test_peer_info(self):
388 """Test retrieving the peer info during a download."""
389 host = 'www.ietf.org'
390 self.client = Peer(host, 80)
392 lastDefer = defer.Deferred()
394 def newRequest(path, num, expect, last=False):
395 d = self.client.get(path)
396 d.addCallback(self.gotResp, num, expect)
398 d.addBoth(lastDefer.callback)
400 newRequest("/rfc/rfc0006.txt", 1, 1776)
401 newRequest("/rfc/rfc2362.txt", 2, 159833)
402 newRequest("/rfc/rfc0801.txt", 3, 40824)
403 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
404 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
405 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
406 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
407 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
408 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
409 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
411 for i in xrange(2, 122, 2):
412 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
416 def test_range(self):
417 """Test a Range request."""
418 host = 'www.ietf.org'
419 self.client = Peer(host, 80)
422 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
423 d.addCallback(self.gotResp, 1, 100)
427 for p in self.pending_calls:
430 self.pending_calls = []