from apt_p2p_conf import version
+class LoggingHTTPClientProtocol(HTTPClientProtocol):
+ """A modified client protocol that logs the number of bytes received."""
+
+ def __init__(self, factory, stats = None, mirror = False):
+ HTTPClientProtocol.__init__(self, factory)
+ self.stats = stats
+ self.mirror = mirror
+
+ def lineReceived(self, line):
+ if self.stats:
+ self.stats.receivedBytes(len(line) + 2, self.mirror)
+ HTTPClientProtocol.lineReceived(self, line)
+
+ def rawDataReceived(self, data):
+ if self.stats:
+ self.stats.receivedBytes(len(data), self.mirror)
+ HTTPClientProtocol.rawDataReceived(self, data)
+
class Peer(ClientFactory):
"""A manager for all HTTP requests to a single peer.
implements(IHTTPClientManager)
- def __init__(self, host, port=80):
+ def __init__(self, host, port = 80, stats = None):
self.host = host
self.port = port
+ self.stats = stats
self.mirror = False
- self.rank = 0.5
+ self.rank = 0.1
self.busy = False
self.pipeline = False
self.closed = True
"""Connect to the peer."""
assert self.closed and not self.connecting
self.connecting = True
- d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
- d.addCallback(self.connected)
+ d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
+ stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
+ d.addCallbacks(self.connected, self.connectionError)
def connected(self, proto):
"""Begin processing the queued requests."""
self.closed = False
self.connecting = False
self.proto = proto
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
+
+ def connectionError(self, err):
+ """Cancel the requests."""
+ log.msg('Failed to connect to the peer by HTTP.')
+ log.err(err)
+
+ # Remove one request so that we don't loop indefinitely
+ if self.request_queue:
+ req = self.request_queue.pop(0)
+ req.deferRequest.errback(err)
+
+ self._completed += 1
+ self._errors += 1
+ self.rerank()
+ if self.connecting:
+ self.connecting = False
+ self.clientGone(None)
def close(self):
"""Close the connection to the peer."""
request.deferRequest = defer.Deferred()
self.request_queue.append(request)
self.rerank()
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
return request.deferRequest
def processQueue(self):
"""Try to send a new request."""
self._processLastResponse()
self.busy = False
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
self.rerank()
def clientPipelining(self, proto):
"""Try to send a new request."""
self.pipeline = True
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
def clientGone(self, proto):
"""Mark sent requests as errors."""
self._processLastResponse()
for req in self.response_queue:
- req.deferRequest.errback(ProtocolError('lost connection'))
+ reactor.callLater(0, req.deferRequest.errback,
+ ProtocolError('lost connection'))
self.busy = False
self.pipeline = False
self.closed = True
self.proto = None
self.rerank()
if self.request_queue:
- self.processQueue()
+ reactor.callLater(0, self.processQueue)
#{ Downloading request interface
def setCommonHeaders(self):
rank = 1.0
if self.closed:
rank *= 0.9
- rank *= exp(-(len(self.request_queue) - len(self.response_queue)))
+ rank *= exp(-(len(self.request_queue) + len(self.response_queue)))
speed = self.downloadSpeed()
if speed > 0.0:
rank *= exp(-512.0*1024 / speed)
if self._completed:
- rank *= exp(-float(self._errors) / self._completed)
+ rank *= exp(-10.0 * self._errors / self._completed)
rank *= exp(-self.responseTime() / 5.0)
self.rank = rank
client = None
pending_calls = []
+ length = []
def gotResp(self, resp, num, expect):
self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
if expect is not None:
self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
- def print_(n):
- pass
- def printdone(n):
- pass
- stream_mod.readStream(resp.stream, print_).addCallback(printdone)
+ while len(self.length) <= num:
+ self.length.append(0)
+ self.length[num] = 0
+ def addData(data, self = self, num = num):
+ self.length[num] += len(data)
+ def checkLength(resp, self = self, num = num, length = resp.stream.length):
+ self.failUnlessEqual(self.length[num], length)
+ return resp
+ df = stream_mod.readStream(resp.stream, addData)
+ df.addCallback(checkLength)
+ return df
def test_download(self):
"""Tests a normal download."""
newRequest("/rfc/rfc0801.txt", 3, 40824)
# This one will probably be queued
- self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
+ self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070))
# Connection should still be open, but idle
self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))