3 from datetime import datetime, timedelta
5 from twisted.internet import reactor, defer, protocol
6 from twisted.internet.protocol import ClientFactory
7 from twisted import version as twisted_version
8 from twisted.python import log
9 from twisted.web2.client.interfaces import IHTTPClientManager
10 from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol
11 from twisted.web2 import stream as stream_mod, http_headers
12 from twisted.web2 import version as web2_version
13 from twisted.trial import unittest
14 from zope.interface import implements
16 from apt_dht_conf import version
18 class Peer(ClientFactory):
19 """A manager for all HTTP requests to a single peer.
21 Controls all requests that go to a single peer (host and port).
22 This includes buffering requests until they can be sent and reconnecting
23 in the event of the connection being closed.
27 implements(IHTTPClientManager)
29 def __init__(self, host, port=80):
35 self.connecting = False
36 self.request_queue = []
37 self.response_queue = []
42 self._downloadSpeeds = []
43 self._lastResponse = None
44 self._responseTimes = []
47 assert self.closed and not self.connecting
48 self.connecting = True
49 d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
50 d.addCallback(self.connected)
52 def connected(self, proto):
54 self.connecting = False
60 self.proto.transport.loseConnection()
62 def submitRequest(self, request):
63 request.submissionTime = datetime.now()
64 request.deferRequest = defer.Deferred()
65 self.request_queue.append(request)
67 return request.deferRequest
69 def processQueue(self):
70 if not self.request_queue:
77 if self.busy and not self.pipeline:
79 if self.response_queue and not self.pipeline:
82 req = self.request_queue.pop(0)
83 self.response_queue.append(req)
84 req.deferResponse = self.proto.submitRequest(req, False)
85 req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
87 def requestComplete(self, resp):
88 self._processLastResponse()
89 req = self.response_queue.pop(0)
90 log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
95 self._responseTimes.append((now, now - req.submissionTime))
96 self._lastResponse = (now, resp.stream.length)
97 req.deferRequest.callback(resp)
99 def requestError(self, error):
100 self._processLastResponse()
101 req = self.response_queue.pop(0)
102 log.msg('Download of %s generated error %r' % (req.uri, error))
105 req.deferRequest.errback(error)
107 def hashError(self, error):
108 """Log that a hash error occurred from the peer."""
109 log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
112 # The IHTTPClientManager interface functions
113 def clientBusy(self, proto):
116 def clientIdle(self, proto):
117 self._processLastResponse()
121 def clientPipelining(self, proto):
125 def clientGone(self, proto):
126 self._processLastResponse()
127 for req in self.response_queue:
128 req.deferRequest.errback(ProtocolError('lost connection'))
130 self.pipeline = False
132 self.connecting = False
133 self.response_queue = []
135 if self.request_queue:
138 # The downloading request interface functions
139 def setCommonHeaders(self):
140 headers = http_headers.Headers()
141 headers.setHeader('Host', self.host)
142 headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' %
143 (version.short(), twisted_version.short(), web2_version.short()))
146 def get(self, path, method="GET", modtime=None):
147 headers = self.setCommonHeaders()
149 headers.setHeader('If-Modified-Since', modtime)
150 return self.submitRequest(ClientRequest(method, path, headers, None))
152 def getRange(self, path, rangeStart, rangeEnd, method="GET"):
153 headers = self.setCommonHeaders()
154 headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
155 return self.submitRequest(ClientRequest(method, path, headers, None))
157 # Functions that return information about the peer
159 return not self.busy and not self.request_queue and not self.response_queue
161 def _processLastResponse(self):
162 if self._lastResponse is not None:
164 self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
165 self._lastResponse = None
167 def downloadSpeed(self):
168 """Gets the latest average download speed for the peer.
170 The average is over the last 10 responses that occurred in the last hour.
175 while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
176 now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
177 self._downloadSpeeds.pop(0)
179 # If there are none, then you get 0
180 if not self._downloadSpeeds:
183 for download in self._downloadSpeeds:
184 total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
185 total_download += download[2]
187 return total_download / total_time
189 def responseTime(self):
190 """Gets the latest average response time for the peer.
192 Response time is the time from receiving the request, to the time
193 the download begins. The average is over the last 10 responses that
194 occurred in the last hour.
198 while self._responseTimes and (len(self._responseTimes) > 10 or
199 now - self._responseTimes[0][0] > timedelta(seconds=3600)):
200 self._responseTimes.pop(0)
202 # If there are none, give it the benefit of the doubt
203 if not self._responseTimes:
206 for response in self._responseTimes:
207 total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
209 return total_response / len(self._responseTimes)
211 def rank(self, fastest):
212 """Determine the ranking value for the peer.
214 The ranking value is composed of 5 numbers:
215 - 1 if a connection to the peer is open, 0.9 otherwise
216 - 1 if there are no pending requests, to 0 if there are a maximum
217 - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
218 - 1 if all requests are good, 0 if all produced errors
219 - an exponentially decreasing number based on the response time
224 rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
226 rank *= min(1.0, self.downloadSpeed() / fastest)
228 rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
229 rank *= exp(-self.responseTime() / 5.0)
232 class TestClientManager(unittest.TestCase):
233 """Unit tests for the Peer."""
238 def gotResp(self, resp, num, expect):
239 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
240 if expect is not None:
241 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
246 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
248 def test_download(self):
249 host = 'www.ietf.org'
250 self.client = Peer(host, 80)
253 d = self.client.get('/rfc/rfc0013.txt')
254 d.addCallback(self.gotResp, 1, 1070)
258 host = 'www.ietf.org'
259 self.client = Peer(host, 80)
262 d = self.client.get('/rfc/rfc0013.txt', "HEAD")
263 d.addCallback(self.gotResp, 1, 0)
266 def test_multiple_downloads(self):
267 host = 'www.ietf.org'
268 self.client = Peer(host, 80)
270 lastDefer = defer.Deferred()
272 def newRequest(path, num, expect, last=False):
273 d = self.client.get(path)
274 d.addCallback(self.gotResp, num, expect)
276 d.addBoth(lastDefer.callback)
278 newRequest("/rfc/rfc0006.txt", 1, 1776)
279 newRequest("/rfc/rfc2362.txt", 2, 159833)
280 newRequest("/rfc/rfc0801.txt", 3, 40824)
281 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
282 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
283 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
284 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
285 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
286 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
287 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
290 def test_multiple_quick_downloads(self):
291 host = 'www.ietf.org'
292 self.client = Peer(host, 80)
294 lastDefer = defer.Deferred()
296 def newRequest(path, num, expect, last=False):
297 d = self.client.get(path)
298 d.addCallback(self.gotResp, num, expect)
300 d.addBoth(lastDefer.callback)
302 newRequest("/rfc/rfc0006.txt", 1, 1776)
303 newRequest("/rfc/rfc2362.txt", 2, 159833)
304 newRequest("/rfc/rfc0801.txt", 3, 40824)
305 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0013.txt', 4, 1070))
306 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0022.txt', 5, 4606))
307 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0048.txt', 6, 41696))
308 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc3261.txt', 7, 647976))
309 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0014.txt', 8, 27))
310 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc0001.txt', 9, 21088))
311 self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
315 log.msg('Rank is: %r' % self.client.rank(250.0*1024))
316 log.msg('Download speed is: %r' % self.client.downloadSpeed())
317 log.msg('Response Time is: %r' % self.client.responseTime())
319 def test_peer_info(self):
320 host = 'www.ietf.org'
321 self.client = Peer(host, 80)
323 lastDefer = defer.Deferred()
325 def newRequest(path, num, expect, last=False):
326 d = self.client.get(path)
327 d.addCallback(self.gotResp, num, expect)
329 d.addBoth(lastDefer.callback)
331 newRequest("/rfc/rfc0006.txt", 1, 1776)
332 newRequest("/rfc/rfc2362.txt", 2, 159833)
333 newRequest("/rfc/rfc0801.txt", 3, 40824)
334 self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
335 self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
336 self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
337 self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
338 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
339 self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
340 self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
342 for i in xrange(2, 122, 2):
343 self.pending_calls.append(reactor.callLater(i, self.checkInfo))
347 def test_range(self):
348 host = 'www.ietf.org'
349 self.client = Peer(host, 80)
352 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)
353 d.addCallback(self.gotResp, 1, 100)
357 for p in self.pending_calls:
360 self.pending_calls = []