X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2FHTTPDownloader.py;h=8ecac193f6cba22c71ac1747d5df89d327a4ea4f;hb=a5dd904be839e2b2896483724d6238b5a970b5de;hp=35847f701a433f0f3ea6a505b4ef396d75d3a2ce;hpb=c5ae5c9ad8eb68a8ec01cfa1b19d81cbad49617a;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 35847f7..8ecac19 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -9,7 +9,7 @@ from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol, HTTPClientChannelRequest from twisted.web2.channel.http import PERSIST_NO_PIPELINE, PERSIST_PIPELINE from twisted.web2 import stream as stream_mod, http_headers from twisted.web2 import version as web2_version @@ -20,6 +20,30 @@ from apt_p2p_conf import version class PipelineError(Exception): """An error has occurred in pipelining requests.""" + +class FixedHTTPClientChannelRequest(HTTPClientChannelRequest): + """Fix the broken _error function.""" + + def __init__(self, channel, request, closeAfter): + HTTPClientChannelRequest.__init__(self, channel, request, closeAfter) + self.started = False + + def _error(self, err): + """ + Abort parsing, and depending of the status of the request, either fire + the C{responseDefer} if no response has been sent yet, or close the + stream. + """ + if self.started: + self.abortParse() + if hasattr(self, 'stream') and self.stream is not None: + self.stream.finish(err) + else: + self.responseDefer.errback(err) + + def gotInitialLine(self, initialLine): + self.started = True + HTTPClientChannelRequest.gotInitialLine(self, initialLine) class LoggingHTTPClientProtocol(HTTPClientProtocol): """A modified client protocol that logs the number of bytes received.""" @@ -39,15 +63,65 @@ class LoggingHTTPClientProtocol(HTTPClientProtocol): self.stats.receivedBytes(len(data), self.mirror) HTTPClientProtocol.rawDataReceived(self, data) + def submitRequest(self, request, closeAfter=True): + """ + @param request: The request to send to a remote server. + @type request: L{ClientRequest} + + @param closeAfter: If True the 'Connection: close' header will be sent, + otherwise 'Connection: keep-alive' + @type closeAfter: C{bool} + + @rtype: L{twisted.internet.defer.Deferred} + @return: A Deferred which will be called back with the + L{twisted.web2.http.Response} from the server. + """ + + # Assert we're in a valid state to submit more + assert self.outRequest is None + assert ((self.readPersistent is PERSIST_NO_PIPELINE + and not self.inRequests) + or self.readPersistent is PERSIST_PIPELINE) + + self.manager.clientBusy(self) + if closeAfter: + self.readPersistent = False + + self.outRequest = chanRequest = FixedHTTPClientChannelRequest(self, + request, closeAfter) + self.inRequests.append(chanRequest) + + chanRequest.submit() + return chanRequest.responseDefer + def setReadPersistent(self, persist): + oldPersist = self.readPersistent self.readPersistent = persist if not persist: # Tell all requests but first to abort. lostRequests = self.inRequests[1:] del self.inRequests[1:] for request in lostRequests: - request.connectionLost(PipelineError('The pipelined connection was lost')) + request.connectionLost(PipelineError('Pipelined connection was closed.')) + elif (oldPersist is PERSIST_NO_PIPELINE and + persist is PERSIST_PIPELINE and + self.outRequest is None): + self.manager.clientPipelining(self) + def connectionLost(self, reason): + self.readPersistent = False + self.setTimeout(None) + self.manager.clientGone(self) + # Cancel the current request + if self.inRequests and self.inRequests[0] is not None: + self.inRequests[0].connectionLost(reason) + # Tell all remaining requests to abort. + lostRequests = self.inRequests[1:] + del self.inRequests[1:] + for request in lostRequests: + if request is not None: + request.connectionLost(PipelineError('Pipelined connection was closed.')) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -64,7 +138,7 @@ class Peer(ClientFactory): self.port = port self.stats = stats self.mirror = False - self.rank = 0.1 + self.rank = 0.01 self.busy = False self.pipeline = False self.closed = True @@ -86,13 +160,15 @@ class Peer(ClientFactory): def connect(self): """Connect to the peer.""" assert self.closed and not self.connecting + log.msg('Connecting to (%s, %d)' % (self.host, self.port)) self.connecting = True d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, - stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port, timeout = 10) d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): """Begin processing the queued requests.""" + log.msg('Connected to (%s, %d)' % (self.host, self.port)) self.closed = False self.connecting = False self.proto = proto @@ -220,6 +296,7 @@ class Peer(ClientFactory): def clientGone(self, proto): """Mark sent requests as errors.""" self._processLastResponse() + log.msg('Lost the connection to (%s, %d)' % (self.host, self.port)) self.busy = False self.pipeline = False self.closed = True @@ -281,8 +358,9 @@ class Peer(ClientFactory): def _processLastResponse(self): """Save the download time of the last request for speed calculations.""" if self._lastResponse is not None: - now = datetime.now() - self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) + if self._lastResponse[1] is not None: + now = datetime.now() + self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) self._lastResponse = None def downloadSpeed(self): @@ -299,7 +377,7 @@ class Peer(ClientFactory): # If there are none, then you get 0 if not self._downloadSpeeds: - return 0.0 + return 150000.0 for download in self._downloadSpeeds: total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0 @@ -322,7 +400,7 @@ class Peer(ClientFactory): # If there are none, give it the benefit of the doubt if not self._responseTimes: - return 0.0 + return 0.1 for response in self._responseTimes: total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0 @@ -498,6 +576,45 @@ class TestClientManager(unittest.TestCase): d.addCallback(self.gotResp, 1, 100) return d + def test_timeout(self): + """Tests a connection timeout.""" + from twisted.internet.error import TimeoutError + host = 'steveholt.hopto.org' + self.client = Peer(host, 80) + self.timeout = 60 + + d = self.client.get('/rfc/rfc0013.txt') + d.addCallback(self.gotResp, 1, 1070) + d = self.failUnlessFailure(d, TimeoutError) + d.addCallback(lambda a: self.flushLoggedErrors(TimeoutError)) + return d + + def test_dnserror(self): + """Tests a connection timeout.""" + from twisted.internet.error import DNSLookupError + host = 'hureyfnvbfha.debian.net' + self.client = Peer(host, 80) + self.timeout = 5 + + d = self.client.get('/rfc/rfc0013.txt') + d.addCallback(self.gotResp, 1, 1070) + d = self.failUnlessFailure(d, DNSLookupError) + d.addCallback(lambda a: self.flushLoggedErrors(DNSLookupError)) + return d + + def test_noroute(self): + """Tests a connection timeout.""" + from twisted.internet.error import NoRouteError + host = '1.2.3.4' + self.client = Peer(host, 80) + self.timeout = 5 + + d = self.client.get('/rfc/rfc0013.txt') + d.addCallback(self.gotResp, 1, 1070) + d = self.failUnlessFailure(d, NoRouteError) + d.addCallback(lambda a: self.flushLoggedErrors(NoRouteError)) + return d + def tearDown(self): for p in self.pending_calls: if p.active():