def __init__(self, host, port=80):
self.host = host
self.port = port
+ self.rank = 0.5
self.busy = False
self.pipeline = False
self.closed = True
self._downloadSpeeds = []
self._lastResponse = None
self._responseTimes = []
+
+ def __repr__(self):
+ return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank)
#{ Manage the request queue
def connect(self):
request.submissionTime = datetime.now()
request.deferRequest = defer.Deferred()
self.request_queue.append(request)
+ self.rerank()
self.processQueue()
return request.deferRequest
req = self.request_queue.pop(0)
self.response_queue.append(req)
+ self.rerank()
req.deferResponse = self.proto.submitRequest(req, False)
req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
req = self.response_queue.pop(0)
log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
self._completed += 1
- if resp.code >= 400:
- self._errors += 1
now = datetime.now()
self._responseTimes.append((now, now - req.submissionTime))
self._lastResponse = (now, resp.stream.length)
+ self.rerank()
req.deferRequest.callback(resp)
def requestError(self, error):
log.msg('Download of %s generated error %r' % (req.uri, error))
self._completed += 1
self._errors += 1
+ self.rerank()
req.deferRequest.errback(error)
def hashError(self, error):
"""Log that a hash error occurred from the peer."""
log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
self._errors += 1
+ self.rerank()
#{ IHTTPClientManager interface
def clientBusy(self, proto):
self._processLastResponse()
self.busy = False
self.processQueue()
+ self.rerank()
def clientPipelining(self, proto):
"""Try to send a new request."""
self.connecting = False
self.response_queue = []
self.proto = None
+ self.rerank()
if self.request_queue:
self.processQueue()
return total_response / len(self._responseTimes)
- def rank(self, fastest):
+ def rerank(self):
"""Determine the ranking value for the peer.
- The ranking value is composed of 5 numbers:
- - 1 if a connection to the peer is open, 0.9 otherwise
- - 1 if there are no pending requests, to 0 if there are a maximum
- - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
- - 1 if all requests are good, 0 if all produced errors
- - an exponentially decreasing number based on the response time
+ The ranking value is composed of 5 numbers, each exponentially
+ decreasing from 1 to 0 based on:
+ - if a connection to the peer is open
+ - the number of pending requests
+ - the time to download a single piece
+ - the number of errors
+ - the response time
"""
rank = 1.0
if self.closed:
rank *= 0.9
- rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
- if fastest > 0.0:
- rank *= min(1.0, self.downloadSpeed() / fastest)
+ rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
+ speed = self.downloadSpeed()
+ if speed > 0.0:
+ rank *= exp(-512.0*1024 / speed)
if self._completed:
- rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
+ rank *= exp(-float(self._errors) / self._completed)
rank *= exp(-self.responseTime() / 5.0)
- return rank
+ self.rank = rank
class TestClientManager(unittest.TestCase):
"""Unit tests for the Peer."""
return lastDefer
def checkInfo(self):
- log.msg('Rank is: %r' % self.client.rank(250.0*1024))
+ log.msg('Rank is: %r' % self.client.rank)
log.msg('Download speed is: %r' % self.client.downloadSpeed())
log.msg('Response Time is: %r' % self.client.responseTime())