]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/HTTPDownloader.py
Save the files downloaded from peers in a peers subdirectory.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
index eb369326b7f309e40c22bc39485fe8ae1fc6c671..f1488e540481a51a75b445a2bf2fa48ca465ec4c 100644 (file)
@@ -17,6 +17,24 @@ from zope.interface import implements
 
 from apt_p2p_conf import version
 
+class LoggingHTTPClientProtocol(HTTPClientProtocol):
+    """A modified client protocol that logs the number of bytes received."""
+    
+    def __init__(self, factory, stats = None, mirror = False):
+        HTTPClientProtocol.__init__(self, factory)
+        self.stats = stats
+        self.mirror = mirror
+    
+    def lineReceived(self, line):
+        if self.stats:
+            self.stats.receivedBytes(len(line) + 2, self.mirror)
+        HTTPClientProtocol.lineReceived(self, line)
+
+    def rawDataReceived(self, data):
+        if self.stats:
+            self.stats.receivedBytes(len(data), self.mirror)
+        HTTPClientProtocol.rawDataReceived(self, data)
+
 class Peer(ClientFactory):
     """A manager for all HTTP requests to a single peer.
     
@@ -28,9 +46,12 @@ class Peer(ClientFactory):
 
     implements(IHTTPClientManager)
     
-    def __init__(self, host, port=80):
+    def __init__(self, host, port = 80, stats = None):
         self.host = host
         self.port = port
+        self.stats = stats
+        self.mirror = False
+        self.rank = 0.5
         self.busy = False
         self.pipeline = False
         self.closed = True
@@ -44,14 +65,18 @@ class Peer(ClientFactory):
         self._downloadSpeeds = []
         self._lastResponse = None
         self._responseTimes = []
+    
+    def __repr__(self):
+        return "(%r, %r, %r)" % (self.host, self.port, self.rank)
         
     #{ Manage the request queue
     def connect(self):
         """Connect to the peer."""
         assert self.closed and not self.connecting
         self.connecting = True
-        d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
-        d.addCallback(self.connected)
+        d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
+                                   stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
+        d.addCallbacks(self.connected, self.connectionError)
 
     def connected(self, proto):
         """Begin processing the queued requests."""
@@ -60,6 +85,23 @@ class Peer(ClientFactory):
         self.proto = proto
         self.processQueue()
         
+    def connectionError(self, err):
+        """Cancel the requests."""
+        log.msg('Failed to connect to the peer by HTTP.')
+        log.err(err)
+
+        # Remove one request so that we don't loop indefinitely
+        if self.request_queue:
+            req = self.request_queue.pop(0)
+            req.deferRequest.errback(err)
+            
+        self._completed += 1
+        self._errors += 1
+        self.rerank()
+        if self.connecting:
+            self.connecting = False
+            self.clientGone(None)
+        
     def close(self):
         """Close the connection to the peer."""
         if not self.closed:
@@ -74,6 +116,7 @@ class Peer(ClientFactory):
         request.submissionTime = datetime.now()
         request.deferRequest = defer.Deferred()
         self.request_queue.append(request)
+        self.rerank()
         self.processQueue()
         return request.deferRequest
 
@@ -93,6 +136,7 @@ class Peer(ClientFactory):
 
         req = self.request_queue.pop(0)
         self.response_queue.append(req)
+        self.rerank()
         req.deferResponse = self.proto.submitRequest(req, False)
         req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
 
@@ -102,11 +146,10 @@ class Peer(ClientFactory):
         req = self.response_queue.pop(0)
         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
         self._completed += 1
-        if resp.code >= 400:
-            self._errors += 1
         now = datetime.now()
         self._responseTimes.append((now, now - req.submissionTime))
         self._lastResponse = (now, resp.stream.length)
+        self.rerank()
         req.deferRequest.callback(resp)
 
     def requestError(self, error):
@@ -116,12 +159,14 @@ class Peer(ClientFactory):
         log.msg('Download of %s generated error %r' % (req.uri, error))
         self._completed += 1
         self._errors += 1
+        self.rerank()
         req.deferRequest.errback(error)
         
     def hashError(self, error):
         """Log that a hash error occurred from the peer."""
         log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
         self._errors += 1
+        self.rerank()
 
     #{ IHTTPClientManager interface
     def clientBusy(self, proto):
@@ -133,6 +178,7 @@ class Peer(ClientFactory):
         self._processLastResponse()
         self.busy = False
         self.processQueue()
+        self.rerank()
 
     def clientPipelining(self, proto):
         """Try to send a new request."""
@@ -150,6 +196,7 @@ class Peer(ClientFactory):
         self.connecting = False
         self.response_queue = []
         self.proto = None
+        self.rerank()
         if self.request_queue:
             self.processQueue()
             
@@ -253,26 +300,28 @@ class Peer(ClientFactory):
 
         return total_response / len(self._responseTimes)
     
-    def rank(self, fastest):
+    def rerank(self):
         """Determine the ranking value for the peer.
         
-        The ranking value is composed of 5 numbers:
-         - 1 if a connection to the peer is open, 0.9 otherwise
-         - 1 if there are no pending requests, to 0 if there are a maximum
-         - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
-         - 1 if all requests are good, 0 if all produced errors
-         - an exponentially decreasing number based on the response time
+        The ranking value is composed of 5 numbers, each exponentially
+        decreasing from 1 to 0 based on:
+         - if a connection to the peer is open
+         - the number of pending requests
+         - the time to download a single piece
+         - the number of errors
+         - the response time
         """
         rank = 1.0
         if self.closed:
             rank *= 0.9
-        rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
-        if fastest > 0.0:
-            rank *= min(1.0, self.downloadSpeed() / fastest)
+        rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
+        speed = self.downloadSpeed()
+        if speed > 0.0:
+            rank *= exp(-512.0*1024 / speed)
         if self._completed:
-            rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
+            rank *= exp(-float(self._errors) / self._completed)
         rank *= exp(-self.responseTime() / 5.0)
-        return rank
+        self.rank = rank
         
 class TestClientManager(unittest.TestCase):
     """Unit tests for the Peer."""
@@ -370,7 +419,7 @@ class TestClientManager(unittest.TestCase):
         return lastDefer
         
     def checkInfo(self):
-        log.msg('Rank is: %r' % self.client.rank(250.0*1024))
+        log.msg('Rank is: %r' % self.client.rank)
         log.msg('Download speed is: %r' % self.client.downloadSpeed())
         log.msg('Response Time is: %r' % self.client.responseTime())