]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/HTTPDownloader.py
Conf time parser works with floats, and lengthened DHT hash expiry to 3 hours.
[quix0rs-apt-p2p.git] / apt_p2p / HTTPDownloader.py
index 2a53897ee64c0b8ca4a03bf19eb6d40773e25f32..1831ceabd19f51363cb1c32fb8a6abc11db0ead8 100644 (file)
@@ -17,6 +17,24 @@ from zope.interface import implements
 
 from apt_p2p_conf import version
 
+class LoggingHTTPClientProtocol(HTTPClientProtocol):
+    """A modified client protocol that logs the number of bytes received."""
+    
+    def __init__(self, factory, stats = None, mirror = False):
+        HTTPClientProtocol.__init__(self, factory)
+        self.stats = stats
+        self.mirror = mirror
+    
+    def lineReceived(self, line):
+        if self.stats:
+            self.stats.receivedBytes(len(line) + 2, self.mirror)
+        HTTPClientProtocol.lineReceived(self, line)
+
+    def rawDataReceived(self, data):
+        if self.stats:
+            self.stats.receivedBytes(len(data), self.mirror)
+        HTTPClientProtocol.rawDataReceived(self, data)
+
 class Peer(ClientFactory):
     """A manager for all HTTP requests to a single peer.
     
@@ -28,10 +46,12 @@ class Peer(ClientFactory):
 
     implements(IHTTPClientManager)
     
-    def __init__(self, host, port=80):
+    def __init__(self, host, port = 80, stats = None):
         self.host = host
         self.port = port
-        self.rank = 0.5
+        self.stats = stats
+        self.mirror = False
+        self.rank = 0.1
         self.busy = False
         self.pipeline = False
         self.closed = True
@@ -45,21 +65,42 @@ class Peer(ClientFactory):
         self._downloadSpeeds = []
         self._lastResponse = None
         self._responseTimes = []
+    
+    def __repr__(self):
+        return "(%r, %r, %r)" % (self.host, self.port, self.rank)
         
     #{ Manage the request queue
     def connect(self):
         """Connect to the peer."""
         assert self.closed and not self.connecting
         self.connecting = True
-        d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
-        d.addCallback(self.connected)
+        d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
+                                   stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
+        d.addCallbacks(self.connected, self.connectionError)
 
     def connected(self, proto):
         """Begin processing the queued requests."""
         self.closed = False
         self.connecting = False
         self.proto = proto
-        self.processQueue()
+        reactor.callLater(0, self.processQueue)
+        
+    def connectionError(self, err):
+        """Cancel the requests."""
+        log.msg('Failed to connect to the peer by HTTP.')
+        log.err(err)
+
+        # Remove one request so that we don't loop indefinitely
+        if self.request_queue:
+            req = self.request_queue.pop(0)
+            req.deferRequest.errback(err)
+            
+        self._completed += 1
+        self._errors += 1
+        self.rerank()
+        if self.connecting:
+            self.connecting = False
+            self.clientGone(None)
         
     def close(self):
         """Close the connection to the peer."""
@@ -76,7 +117,7 @@ class Peer(ClientFactory):
         request.deferRequest = defer.Deferred()
         self.request_queue.append(request)
         self.rerank()
-        self.processQueue()
+        reactor.callLater(0, self.processQueue)
         return request.deferRequest
 
     def processQueue(self):
@@ -105,8 +146,6 @@ class Peer(ClientFactory):
         req = self.response_queue.pop(0)
         log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
         self._completed += 1
-        if resp.code >= 400:
-            self._errors += 1
         now = datetime.now()
         self._responseTimes.append((now, now - req.submissionTime))
         self._lastResponse = (now, resp.stream.length)
@@ -138,19 +177,20 @@ class Peer(ClientFactory):
         """Try to send a new request."""
         self._processLastResponse()
         self.busy = False
-        self.processQueue()
+        reactor.callLater(0, self.processQueue)
         self.rerank()
 
     def clientPipelining(self, proto):
         """Try to send a new request."""
         self.pipeline = True
-        self.processQueue()
+        reactor.callLater(0, self.processQueue)
 
     def clientGone(self, proto):
         """Mark sent requests as errors."""
         self._processLastResponse()
         for req in self.response_queue:
-            req.deferRequest.errback(ProtocolError('lost connection'))
+            reactor.callLater(0, req.deferRequest.errback,
+                                 ProtocolError('lost connection'))
         self.busy = False
         self.pipeline = False
         self.closed = True
@@ -159,7 +199,7 @@ class Peer(ClientFactory):
         self.proto = None
         self.rerank()
         if self.request_queue:
-            self.processQueue()
+            reactor.callLater(0, self.processQueue)
             
     #{ Downloading request interface
     def setCommonHeaders(self):
@@ -275,12 +315,12 @@ class Peer(ClientFactory):
         rank = 1.0
         if self.closed:
             rank *= 0.9
-        rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
+        rank *= exp(-(len(self.request_queue) + len(self.response_queue)))
         speed = self.downloadSpeed()
         if speed > 0.0:
             rank *= exp(-512.0*1024 / speed)
         if self._completed:
-            rank *= exp(-float(self._errors) / self._completed)
+            rank *= exp(-10.0 * self._errors / self._completed)
         rank *= exp(-self.responseTime() / 5.0)
         self.rank = rank
         
@@ -289,16 +329,23 @@ class TestClientManager(unittest.TestCase):
     
     client = None
     pending_calls = []
+    length = []
     
     def gotResp(self, resp, num, expect):
         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
         if expect is not None:
             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
-        def print_(n):
-            pass
-        def printdone(n):
-            pass
-        stream_mod.readStream(resp.stream, print_).addCallback(printdone)
+        while len(self.length) <= num:
+            self.length.append(0)
+        self.length[num] = 0
+        def addData(data, self = self, num = num):
+            self.length[num] += len(data)
+        def checkLength(resp, self = self, num = num, length = resp.stream.length):
+            self.failUnlessEqual(self.length[num], length)
+            return resp
+        df = stream_mod.readStream(resp.stream, addData)
+        df.addCallback(checkLength)
+        return df
     
     def test_download(self):
         """Tests a normal download."""
@@ -339,7 +386,7 @@ class TestClientManager(unittest.TestCase):
         newRequest("/rfc/rfc0801.txt", 3, 40824)
         
         # This one will probably be queued
-        self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
+        self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
         
         # Connection should still be open, but idle
         self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))