X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=f1488e540481a51a75b445a2bf2fa48ca465ec4c;hb=09472169824fc47c359cdee65652a12a7ba46806;hp=eb369326b7f309e40c22bc39485fe8ae1fc6c671;hpb=7b1167d8ce780312d3689c9309c7e9c64060c085;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index eb36932..f1488e5 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -17,6 +17,24 @@ from zope.interface import implements from apt_p2p_conf import version +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,9 +46,12 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port + self.stats = stats + self.mirror = False + self.rank = 0.5 self.busy = False self.pipeline = False self.closed = True @@ -44,14 +65,18 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%r, %r, %r)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" @@ -60,6 +85,23 @@ class Peer(ClientFactory): self.proto = proto self.processQueue() + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req = self.request_queue.pop(0) + req.deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) + def close(self): """Close the connection to the peer.""" if not self.closed: @@ -74,6 +116,7 @@ class Peer(ClientFactory): request.submissionTime = datetime.now() request.deferRequest = defer.Deferred() self.request_queue.append(request) + self.rerank() self.processQueue() return request.deferRequest @@ -93,6 +136,7 @@ class Peer(ClientFactory): req = self.request_queue.pop(0) self.response_queue.append(req) + self.rerank() req.deferResponse = self.proto.submitRequest(req, False) req.deferResponse.addCallbacks(self.requestComplete, self.requestError) @@ -102,11 +146,10 @@ class Peer(ClientFactory): req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() self._responseTimes.append((now, now - req.submissionTime)) self._lastResponse = (now, resp.stream.length) + self.rerank() req.deferRequest.callback(resp) def requestError(self, error): @@ -116,12 +159,14 @@ class Peer(ClientFactory): log.msg('Download of %s generated error %r' % (req.uri, error)) self._completed += 1 self._errors += 1 + self.rerank() req.deferRequest.errback(error) def hashError(self, error): """Log that a hash error occurred from the peer.""" log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error)) self._errors += 1 + self.rerank() #{ IHTTPClientManager interface def clientBusy(self, proto): @@ -133,6 +178,7 @@ class Peer(ClientFactory): self._processLastResponse() self.busy = False self.processQueue() + self.rerank() def clientPipelining(self, proto): """Try to send a new request.""" @@ -150,6 +196,7 @@ class Peer(ClientFactory): self.connecting = False self.response_queue = [] self.proto = None + self.rerank() if self.request_queue: self.processQueue() @@ -253,26 +300,28 @@ class Peer(ClientFactory): return total_response / len(self._responseTimes) - def rank(self, fastest): + def rerank(self): """Determine the ranking value for the peer. - The ranking value is composed of 5 numbers: - - 1 if a connection to the peer is open, 0.9 otherwise - - 1 if there are no pending requests, to 0 if there are a maximum - - 1 if the peer is the fastest of all peers, to 0 if the speed is 0 - - 1 if all requests are good, 0 if all produced errors - - an exponentially decreasing number based on the response time + The ranking value is composed of 5 numbers, each exponentially + decreasing from 1 to 0 based on: + - if a connection to the peer is open + - the number of pending requests + - the time to download a single piece + - the number of errors + - the response time """ rank = 1.0 if self.closed: rank *= 0.9 - rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0 - if fastest > 0.0: - rank *= min(1.0, self.downloadSpeed() / fastest) + rank *= exp(-(len(self.request_queue) - len(self.response_queue))) + speed = self.downloadSpeed() + if speed > 0.0: + rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= max(0.0, 1.0 - float(self._errors) / self._completed) + rank *= exp(-float(self._errors) / self._completed) rank *= exp(-self.responseTime() / 5.0) - return rank + self.rank = rank class TestClientManager(unittest.TestCase): """Unit tests for the Peer.""" @@ -370,7 +419,7 @@ class TestClientManager(unittest.TestCase): return lastDefer def checkInfo(self): - log.msg('Rank is: %r' % self.client.rank(250.0*1024)) + log.msg('Rank is: %r' % self.client.rank) log.msg('Download speed is: %r' % self.client.downloadSpeed()) log.msg('Response Time is: %r' % self.client.responseTime())