X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=d1ea0a93cebe1ee9095355d3f6e9eb2edc4eab27;hb=9fc63fde5b46f6afcfb957d685f910d36535c67b;hp=eb369326b7f309e40c22bc39485fe8ae1fc6c671;hpb=7b1167d8ce780312d3689c9309c7e9c64060c085;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index eb36932..d1ea0a9 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -9,7 +9,8 @@ from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest +from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version from twisted.trial import unittest @@ -17,6 +18,110 @@ from zope.interface import implements from apt_p2p_conf import version +class PipelineError(Exception): + """An error has occurred in pipelining requests.""" + +class FixedHTTPClientChannelRequest(HTTPClientChannelRequest): + """Fix the broken _error function.""" + + def __init__(self, channel, request, closeAfter): + HTTPClientChannelRequest.__init__(self, channel, request, closeAfter) + self.started = False + + def _error(self, err): + """ + Abort parsing, and depending of the status of the request, either fire + the C{responseDefer} if no response has been sent yet, or close the + stream. + """ + if self.started: + self.abortParse() + if hasattr(self, 'stream') and self.stream is not None: + self.stream.finish(err) + else: + self.responseDefer.errback(err) + + def gotInitialLine(self, initialLine): + self.started = True + HTTPClientChannelRequest.gotInitialLine(self, initialLine) + +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + + def submitRequest(self, request, closeAfter=True): + """ + @param request: The request to send to a remote server. + @type request: L{ClientRequest} + + @param closeAfter: If True the 'Connection: close' header will be sent, + otherwise 'Connection: keep-alive' + @type closeAfter: C{bool} + + @rtype: L{twisted.internet.defer.Deferred} + @return: A Deferred which will be called back with the + L{twisted.web2.http.Response} from the server. + """ + + # Assert we're in a valid state to submit more + assert self.outRequest is None + assert ((self.readPersistent is PERSIST_NO_PIPELINE + and not self.inRequests) + or self.readPersistent is PERSIST_PIPELINE) + + self.manager.clientBusy(self) + if closeAfter: + self.readPersistent = False + + self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self, + request, closeAfter) + self.inRequests.append(chanRequest) + + chanRequest.submit() + return chanRequest.responseDefer + + def setReadPersistent(self, persist): + oldPersist = self.readPersistent + self.readPersistent = persist + if not persist: + # Tell all requests but first to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + elif (oldPersist is PERSIST_NO_PIPELINE and + persist is PERSIST_PIPELINE and + self.outRequest is None): + self.manager.clientPipelining(self) + + def connectionLost(self, reason): + self.readPersistent = False + self.setTimeout(None) + self.manager.clientGone(self) + # Cancel the current request + if self.inRequests and self.inRequests[0] is not None: + self.inRequests[0].connectionLost(reason) + # Tell all remaining requests to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + if request is not None: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,15 +133,18 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port + self.stats = stats + self.mirror = False + self.rank = 0.01 self.busy = False self.pipeline = False self.closed = True self.connecting = False self.request_queue = [] - self.response_queue = [] + self.outstanding = 0 self.proto = None self.connector = None self._errors = 0 @@ -44,21 +152,44 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%r, %r, %r)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting + log.msg('Connecting to (%s, %d)' % (self.host, self.port)) self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" + log.msg('Connected to (%s, %d)' % (self.host, self.port)) self.closed = False self.connecting = False self.proto = proto - self.processQueue() + reactor.callLater(0, self.processQueue) + + def connectionError(self, err): + """Cancel the requests.""" + log.msg('Failed to connect to the peer by HTTP.') + log.err(err) + + # Remove one request so that we don't loop indefinitely + if self.request_queue: + req, deferRequest, submissionTime = self.request_queue.pop(0) + deferRequest.errback(err) + + self._completed += 1 + self._errors += 1 + self.rerank() + if self.connecting: + self.connecting = False + self.clientGone(None) def close(self): """Close the connection to the peer.""" @@ -71,11 +202,12 @@ class Peer(ClientFactory): @type request: L{twisted.web2.client.http.ClientRequest} @return: deferred that will fire with the completed request """ - request.submissionTime = datetime.now() - request.deferRequest = defer.Deferred() - self.request_queue.append(request) - self.processQueue() - return request.deferRequest + submissionTime = datetime.now() + deferRequest = defer.Deferred() + self.request_queue.append((request, deferRequest, submissionTime)) + self.rerank() + reactor.callLater(0, self.processQueue) + return deferRequest def processQueue(self): """Check the queue to see if new requests can be sent to the peer.""" @@ -88,40 +220,61 @@ class Peer(ClientFactory): return if self.busy and not self.pipeline: return - if self.response_queue and not self.pipeline: + if self.outstanding and not self.pipeline: + return + if not ((self.proto.readPersistent is PERSIST_NO_PIPELINE + and not self.proto.inRequests) + or self.proto.readPersistent is PERSIST_PIPELINE): + log.msg('HTTP protocol is not ready though we were told to pipeline: %r, %r' % + (self.proto.readPersistent, self.proto.inRequests)) return - req = self.request_queue.pop(0) - self.response_queue.append(req) - req.deferResponse = self.proto.submitRequest(req, False) - req.deferResponse.addCallbacks(self.requestComplete, self.requestError) + req, deferRequest, submissionTime = self.request_queue.pop(0) + try: + deferResponse = self.proto.submitRequest(req, False) + except: + # Try again later + log.msg('Got an error trying to submit a new HTTP request %s' % (request.uri, )) + log.err() + self.request_queue.insert(0, (request, deferRequest, submissionTime)) + ractor.callLater(1, self.processQueue) + return + + self.outstanding += 1 + self.rerank() + deferResponse.addCallbacks(self.requestComplete, self.requestError, + callbackArgs = (req, deferRequest, submissionTime), + errbackArgs = (req, deferRequest)) - def requestComplete(self, resp): + def requestComplete(self, resp, req, deferRequest, submissionTime): """Process a completed request.""" self._processLastResponse() - req = self.response_queue.pop(0) - log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) + self.outstanding -= 1 + assert self.outstanding >= 0 + log.msg('%s of %s completed with code %d (%r)' % (req.method, req.uri, resp.code, resp.headers)) self._completed += 1 - if resp.code >= 400: - self._errors += 1 now = datetime.now() - self._responseTimes.append((now, now - req.submissionTime)) + self._responseTimes.append((now, now - submissionTime)) self._lastResponse = (now, resp.stream.length) - req.deferRequest.callback(resp) + self.rerank() + deferRequest.callback(resp) - def requestError(self, error): + def requestError(self, error, req, deferRequest): """Process a request that ended with an error.""" self._processLastResponse() - req = self.response_queue.pop(0) + self.outstanding -= 1 + assert self.outstanding >= 0 log.msg('Download of %s generated error %r' % (req.uri, error)) self._completed += 1 self._errors += 1 - req.deferRequest.errback(error) + self.rerank() + deferRequest.errback(error) def hashError(self, error): """Log that a hash error occurred from the peer.""" log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error)) self._errors += 1 + self.rerank() #{ IHTTPClientManager interface def clientBusy(self, proto): @@ -132,26 +285,26 @@ class Peer(ClientFactory): """Try to send a new request.""" self._processLastResponse() self.busy = False - self.processQueue() + reactor.callLater(0, self.processQueue) + self.rerank() def clientPipelining(self, proto): """Try to send a new request.""" self.pipeline = True - self.processQueue() + reactor.callLater(0, self.processQueue) def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() - for req in self.response_queue: - req.deferRequest.errback(ProtocolError('lost connection')) + log.msg('Lost the connection to (%s, %d)' % (self.host, self.port)) self.busy = False self.pipeline = False self.closed = True self.connecting = False - self.response_queue = [] self.proto = None + self.rerank() if self.request_queue: - self.processQueue() + reactor.callLater(0, self.processQueue) #{ Downloading request interface def setCommonHeaders(self): @@ -200,7 +353,7 @@ class Peer(ClientFactory): #{ Peer information def isIdle(self): """Check whether the peer is idle or not.""" - return not self.busy and not self.request_queue and not self.response_queue + return not self.busy and not self.request_queue and not self.outstanding def _processLastResponse(self): """Save the download time of the last request for speed calculations.""" @@ -253,42 +406,51 @@ class Peer(ClientFactory): return total_response / len(self._responseTimes) - def rank(self, fastest): + def rerank(self): """Determine the ranking value for the peer. - The ranking value is composed of 5 numbers: - - 1 if a connection to the peer is open, 0.9 otherwise - - 1 if there are no pending requests, to 0 if there are a maximum - - 1 if the peer is the fastest of all peers, to 0 if the speed is 0 - - 1 if all requests are good, 0 if all produced errors - - an exponentially decreasing number based on the response time + The ranking value is composed of 5 numbers, each exponentially + decreasing from 1 to 0 based on: + - if a connection to the peer is open + - the number of pending requests + - the time to download a single piece + - the number of errors + - the response time """ rank = 1.0 if self.closed: rank *= 0.9 - rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0 - if fastest > 0.0: - rank *= min(1.0, self.downloadSpeed() / fastest) + rank *= exp(-(len(self.request_queue) + self.outstanding)) + speed = self.downloadSpeed() + if speed > 0.0: + rank *= exp(-512.0*1024 / speed) if self._completed: - rank *= max(0.0, 1.0 - float(self._errors) / self._completed) + rank *= exp(-10.0 * self._errors / self._completed) rank *= exp(-self.responseTime() / 5.0) - return rank + self.rank = rank class TestClientManager(unittest.TestCase): """Unit tests for the Peer.""" client = None pending_calls = [] + length = [] def gotResp(self, resp, num, expect): self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) if expect is not None: self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream_mod.readStream(resp.stream, print_).addCallback(printdone) + while len(self.length) <= num: + self.length.append(0) + self.length[num] = 0 + def addData(data, self = self, num = num): + self.length[num] += len(data) + def checkLength(resp, self = self, num = num, length = resp.stream.length): + self.failUnlessEqual(self.length[num], length) + return resp + df = stream_mod.readStream(resp.stream, addData) + df.addCallback(checkLength) + return df def test_download(self): """Tests a normal download.""" @@ -329,7 +491,7 @@ class TestClientManager(unittest.TestCase): newRequest("/rfc/rfc0801.txt", 3, 40824) # This one will probably be queued - self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070)) + self.pending_calls.append(reactor.callLater(6, newRequest, '/rfc/rfc0013.txt', 4, 1070)) # Connection should still be open, but idle self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606)) @@ -370,7 +532,7 @@ class TestClientManager(unittest.TestCase): return lastDefer def checkInfo(self): - log.msg('Rank is: %r' % self.client.rank(250.0*1024)) + log.msg('Rank is: %r' % self.client.rank) log.msg('Download speed is: %r' % self.client.downloadSpeed()) log.msg('Response Time is: %r' % self.client.responseTime())