X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=15ee5640696bc777b76b6a1ae4f7c900fcf9de2a;hb=d65239f77bc6c4d9f02de88a4eec71254f7e6936;hp=eb369326b7f309e40c22bc39485fe8ae1fc6c671;hpb=7b1167d8ce780312d3689c9309c7e9c64060c085;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index eb36932..15ee564 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -31,6 +31,7 @@ class Peer(ClientFactory): def __init__(self, host, port=80): self.host = host self.port = port + self.rank = 0.5 self.busy = False self.pipeline = False self.closed = True @@ -44,6 +45,9 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): @@ -74,6 +78,7 @@ class Peer(ClientFactory): request.submissionTime = datetime.now() request.deferRequest = defer.Deferred() self.request_queue.append(request) + self.rerank() self.processQueue() return request.deferRequest @@ -93,6 +98,7 @@ class Peer(ClientFactory): req = self.request_queue.pop(0) self.response_queue.append(req) + self.rerank() req.deferResponse = self.proto.submitRequest(req, False) req.deferResponse.addCallbacks(self.requestComplete, self.requestError) @@ -102,11 +108,10 @@ class Peer(ClientFactory): req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() self._responseTimes.append((now, now - req.submissionTime)) self._lastResponse = (now, resp.stream.length) + self.rerank() req.deferRequest.callback(resp) def requestError(self, error): @@ -116,12 +121,14 @@ class Peer(ClientFactory): log.msg('Download of %s generated error %r' % (req.uri, error)) self._completed += 1 self._errors += 1 + self.rerank() req.deferRequest.errback(error) def hashError(self, error): """Log that a hash error occurred from the peer.""" log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error)) self._errors += 1 + self.rerank() #{ IHTTPClientManager interface def clientBusy(self, proto): @@ -133,6 +140,7 @@ class Peer(ClientFactory): self._processLastResponse() self.busy = False self.processQueue() + self.rerank() def clientPipelining(self, proto): """Try to send a new request.""" @@ -150,6 +158,7 @@ class Peer(ClientFactory): self.connecting = False self.response_queue = [] self.proto = None + self.rerank() if self.request_queue: self.processQueue() @@ -253,26 +262,28 @@ class Peer(ClientFactory): return total_response / len(self._responseTimes) - def rank(self, fastest): + def rerank(self): """Determine the ranking value for the peer. - The ranking value is composed of 5 numbers: - - 1 if a connection to the peer is open, 0.9 otherwise - - 1 if there are no pending requests, to 0 if there are a maximum - - 1 if the peer is the fastest of all peers, to 0 if the speed is 0 - - 1 if all requests are good, 0 if all produced errors - - an exponentially decreasing number based on the response time + The ranking value is composed of 5 numbers, each exponentially + decreasing from 1 to 0 based on: + - if a connection to the peer is open + - the number of pending requests + - the time to download a single piece + - the number of errors + - the response time """ rank = 1.0 if self.closed: rank *= 0.9 - rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0 - if fastest > 0.0: - rank *= min(1.0, self.downloadSpeed() / fastest) + rank *= exp(-(len(self.request_queue) - len(self.response_queue))) + speed = self.downloadSpeed() + if speed > 0.0: + rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= max(0.0, 1.0 - float(self._errors) / self._completed) + rank *= exp(-float(self._errors) / self._completed) rank *= exp(-self.responseTime() / 5.0) - return rank + self.rank = rank class TestClientManager(unittest.TestCase): """Unit tests for the Peer.""" @@ -370,7 +381,7 @@ class TestClientManager(unittest.TestCase): return lastDefer def checkInfo(self): - log.msg('Rank is: %r' % self.client.rank(250.0*1024)) + log.msg('Rank is: %r' % self.client.rank) log.msg('Download speed is: %r' % self.client.downloadSpeed()) log.msg('Response Time is: %r' % self.client.responseTime())