2 """Manage all download requests to a single site."""
5 from datetime import datetime, timedelta
7 from twisted.internet import reactor, defer, protocol
8 from twisted.internet.protocol import ClientFactory
9 from twisted import version as twisted_version
10 from twisted.python import log
11 from twisted.web2.client.interfaces import IHTTPClientManager
12 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
13 from twisted.web2 import stream as stream_mod, http_headers
14 from twisted.web2 import version as web2_version
15 from twisted.trial import unittest
16 from zope.interface import implements
18 from apt_p2p_conf import version
20 class Peer(ClientFactory):
21 """A manager for all HTTP requests to a single peer.
23 Controls all requests that go to a single peer (host and port).
24 This includes buffering requests until they can be sent and reconnecting
25 in the event of the connection being closed.
29 implements(IHTTPClientManager)
31 def __init__(self, host, port=80):
38 self.connecting = False
39 self.request_queue = []
40 self.response_queue = []
45 self._downloadSpeeds = []
46 self._lastResponse = None
47 self._responseTimes = []
49 #{ Manage the request queue
51 """Connect to the peer."""
52 assert self.closed and not self.connecting
53 self.connecting = True
54 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
55 d.addCallback(self.connected)
57 def connected(self, proto):
58 """Begin processing the queued requests."""
60 self.connecting = False
65 """Close the connection to the peer."""
67 self.proto.transport.loseConnection()
69 def submitRequest(self, request):
70 """Add a new request to the queue.
72 @type request: L{twisted.web2.client.http.ClientRequest}
73 @return: deferred that will fire with the completed request
75 request.submissionTime = datetime.now()
76 request.deferRequest = defer.Deferred()
77 self.request_queue.append(request)
80 return request.deferRequest
82 def processQueue(self):
83 """Check the queue to see if new requests can be sent to the peer."""
84 if not self.request_queue:
91 if self.busy and not self.pipeline:
93 if self.response_queue and not self.pipeline:
96 req = self.request_queue.pop(0)
97 self.response_queue.append(req)
99 req.deferResponse = self.proto.submitRequest(req, False)
100 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
102 def requestComplete(self, resp):
103 """Process a completed request."""
104 self._processLastResponse()
105 req = self.response_queue.pop(0)
106 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
109 self._responseTimes.append((now, now - req.submissionTime))
110 self._lastResponse = (now, resp.stream.length)
112 req.deferRequest.callback(resp)
114 def requestError(self, error):
115 """Process a request that ended with an error."""
116 self._processLastResponse()
117 req = self.response_queue.pop(0)
118 log.msg('Download of %s generated error %r' % (req.uri, error))
122 req.deferRequest.errback(error)
124 def hashError(self, error):
125 """Log that a hash error occurred from the peer."""
126 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
130 #{ IHTTPClientManager interface
131 def clientBusy(self, proto):
132 """Save the busy state."""
135 def clientIdle(self, proto):
136 """Try to send a new request."""
137 self._processLastResponse()
142 def clientPipelining(self, proto):
143 """Try to send a new request."""
147 def clientGone(self, proto):
148 """Mark sent requests as errors."""
149 self._processLastResponse()
150 for req in self.response_queue:
151 req.deferRequest.errback(ProtocolError('lost connection'))
153 self.pipeline = False
155 self.connecting = False
156 self.response_queue = []
159 if self.request_queue:
162 #{ Downloading request interface
163 def setCommonHeaders(self):
164 """Get the common HTTP headers for all requests."""
165 headers = http_headers.Headers()
166 headers.setHeader('Host', self.host)
167 headers.setHeader('User-Agent', 'apt-p2p/%s (twisted/%s twisted.web2/%s)' %
168 (version.short(), twisted_version.short(), web2_version.short()))
171 def get(self, path, method="GET", modtime=None):
172 """Add a new request to the queue.
174 @type path: C{string}
175 @param path: the path to request from the peer
176 @type method: C{string}
177 @param method: the HTTP method to use, 'GET' or 'HEAD'
178 (optional, defaults to 'GET')
179 @type modtime: C{int}
180 @param modtime: the modification time to use for an 'If-Modified-Since'
181 header, as seconds since the epoch
182 (optional, defaults to not sending that header)
184 headers = self.setCommonHeaders()
186 headers.setHeader('If-Modified-Since', modtime)
187 return self.submitRequest(ClientRequest(method, path, headers, None))
189 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
190 """Add a new request with a Range header to the queue.
192 @type path: C{string}
193 @param path: the path to request from the peer
194 @type rangeStart: C{int}
195 @param rangeStart: the byte to begin the request at
196 @type rangeEnd: C{int}
197 @param rangeEnd: the byte to end the request at (inclusive)
198 @type method: C{string}
199 @param method: the HTTP method to use, 'GET' or 'HEAD'
200 (optional, defaults to 'GET')
202 headers = self.setCommonHeaders()
203 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
204 return self.submitRequest(ClientRequest(method, path, headers, None))
208 """Check whether the peer is idle or not."""
209 return not self.busy and not self.request_queue and not self.response_queue
211 def _processLastResponse(self):
212 """Save the download time of the last request for speed calculations."""
213 if self._lastResponse is not None:
215 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
216 self._lastResponse = None
218 def downloadSpeed(self):
219 """Gets the latest average download speed for the peer.
221 The average is over the last 10 responses that occurred in the last hour.
226 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
227 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
228 self._downloadSpeeds.pop(0)
230 # If there are none, then you get 0
231 if not self._downloadSpeeds:
234 for download in self._downloadSpeeds:
235 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
236 total_download += download[2]
238 return total_download / total_time
240 def responseTime(self):
241 """Gets the latest average response time for the peer.
243 Response time is the time from receiving the request, to the time
244 the download begins. The average is over the last 10 responses that
245 occurred in the last hour.
249 while self._responseTimes and (len(self._responseTimes) > 10 or
250 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
251 self._responseTimes.pop(0)
253 # If there are none, give it the benefit of the doubt
254 if not self._responseTimes:
257 for response in self._responseTimes:
258 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
260 return total_response / len(self._responseTimes)
263 """Determine the ranking value for the peer.
265 The ranking value is composed of 5 numbers, each exponentially
266 decreasing from 1 to 0 based on:
267 - if a connection to the peer is open
268 - the number of pending requests
269 - the time to download a single piece
270 - the number of errors
276 rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
277 speed = self.downloadSpeed()
279 rank *= exp(-512.0*1024 / speed)
281 rank *= exp(-float(self._errors) / self._completed)
282 rank *= exp(-self.responseTime() / 5.0)
285 class TestClientManager(unittest.TestCase):
286 """Unit tests for the Peer."""
291 def gotResp(self, resp, num, expect):
292 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
293 if expect is not None:
294 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
299 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
301 def test_download(self):
302 """Tests a normal download."""
303 host = 'www.ietf.org'
304 self.client = Peer(host, 80)
307 d = self.client.get('/rfc/rfc0013.txt')
308 d.addCallback(self.gotResp, 1, 1070)
312 """Tests a 'HEAD' request."""
313 host = 'www.ietf.org'
314 self.client = Peer(host, 80)
317 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
318 d.addCallback(self.gotResp, 1, 0)
321 def test_multiple_downloads(self):
322 """Tests multiple downloads with queueing and connection closing."""
323 host = 'www.ietf.org'
324 self.client = Peer(host, 80)
326 lastDefer = defer.Deferred()
328 def newRequest(path, num, expect, last=False):
329 d = self.client.get(path)
330 d.addCallback(self.gotResp, num, expect)
332 d.addBoth(lastDefer.callback)
335 newRequest("/rfc/rfc0006.txt", 1, 1776)
336 newRequest("/rfc/rfc2362.txt", 2, 159833)
337 newRequest("/rfc/rfc0801.txt", 3, 40824)
339 # This one will probably be queued
340 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
342 # Connection should still be open, but idle
343 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
345 #Connection should be closed
346 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
347 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
348 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
349 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
351 # Now it should definitely be closed
352 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
355 def test_multiple_quick_downloads(self):
356 """Tests lots of multiple downloads with queueing."""
357 host = 'www.ietf.org'
358 self.client = Peer(host, 80)
360 lastDefer = defer.Deferred()
362 def newRequest(path, num, expect, last=False):
363 d = self.client.get(path)
364 d.addCallback(self.gotResp, num, expect)
366 d.addBoth(lastDefer.callback)
368 newRequest("/rfc/rfc0006.txt", 1, 1776)
369 newRequest("/rfc/rfc2362.txt", 2, 159833)
370 newRequest("/rfc/rfc0801.txt", 3, 40824)
371 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
372 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
373 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
374 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
375 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
376 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
377 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
381 log.msg('Rank is: %r' % self.client.rank)
382 log.msg('Download speed is: %r' % self.client.downloadSpeed())
383 log.msg('Response Time is: %r' % self.client.responseTime())
385 def test_peer_info(self):
386 """Test retrieving the peer info during a download."""
387 host = 'www.ietf.org'
388 self.client = Peer(host, 80)
390 lastDefer = defer.Deferred()
392 def newRequest(path, num, expect, last=False):
393 d = self.client.get(path)
394 d.addCallback(self.gotResp, num, expect)
396 d.addBoth(lastDefer.callback)
398 newRequest("/rfc/rfc0006.txt", 1, 1776)
399 newRequest("/rfc/rfc2362.txt", 2, 159833)
400 newRequest("/rfc/rfc0801.txt", 3, 40824)
401 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
402 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
403 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
404 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
405 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
406 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
407 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
409 for i in xrange(2, 122, 2):
410 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
414 def test_range(self):
415 """Test a Range request."""
416 host = 'www.ietf.org'
417 self.client = Peer(host, 80)
420 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
421 d.addCallback(self.gotResp, 1, 100)
425 for p in self.pending_calls:
428 self.pending_calls = []