X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkrpc.py;h=9290cac60822ab44b5ce0663a3629ffee879d14c;hp=a2afac7db6303bd277680562649aa52e71672513;hb=96ea4dd2eaf55c5e94f1a9f13ec36fac10387158;hpb=0e0be9bfe1a9c1627caa6422202b76008701740c diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index a2afac7..9290cac 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -100,6 +100,8 @@ def verifyMessage(msg): raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified") if type(msg[RSP]) != dict: raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary") +# if 'nodes' in msg[RSP] and type(msg[RSP]['nodes']) != list: +# raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent") elif msg[TYP] == ERR: if ERR not in msg: raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified") @@ -117,6 +119,8 @@ def verifyMessage(msg): raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified") if type(msg[TID]) != str: raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string") + if len(msg[TID]) != 20: + raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent") class hostbroker(protocol.DatagramProtocol): """The factory for the KRPC protocol. @@ -212,8 +216,10 @@ class KrpcRequest(defer.Deferred): @ivar delay: the last timeout delay sent @type start: C{datetime} @ivar start: the time to request was started at - @type later: L{twisted.internet.interfaces.IDelayedCall} - @ivar later: the pending call to timeout the last sent request + @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall} + @ivar laterNextTimeout: the pending call to timeout the last sent request + @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall} + @ivar laterFinalTimeout: the pending call to timeout the entire request """ def __init__(self, protocol, newTID, method, data, config): @@ -237,26 +243,24 @@ class KrpcRequest(defer.Deferred): self.config = config self.delay = self.config.get('KRPC_INITIAL_DELAY', 2) self.start = datetime.now() - self.later = None + self.laterNextTimeout = None + self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout) reactor.callLater(0, self.send) def send(self): """Send the request to the remote node.""" - assert not self.later, 'There is already a pending request' - self.later = reactor.callLater(self.delay, self.timeOut) + assert not self.laterNextTimeout, 'There is already a pending request' + self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout) try: self.protocol.sendData(self.method, self.data) except: log.err() - def timeOut(self): + def nextTimeout(self): """Check for a unrecoverable timeout, otherwise resend.""" - self.later = None - delay = datetime.now() - self.start - if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)): - log.msg('%r timed out after %0.2f sec' % - (self.tid, delay.seconds + delay.microseconds/1000000.0)) - self.protocol.timeOut(self.tid, self.method) + self.laterNextTimeout = None + if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)): + self.finalTimeout() elif self.protocol.stopped: log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid) else: @@ -264,6 +268,14 @@ class KrpcRequest(defer.Deferred): log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay)) reactor.callLater(0, self.send) + def finalTimeout(self): + """Timeout the request after an unrecoverable timeout.""" + self.dropTimeOut() + delay = datetime.now() - self.start + log.msg('%r timed out after %0.2f sec' % + (self.tid, delay.seconds + delay.microseconds/1000000.0)) + self.protocol.timeOut(self.tid, self.method) + def callback(self, resp): self.dropTimeOut() defer.Deferred.callback(self, resp) @@ -274,9 +286,12 @@ class KrpcRequest(defer.Deferred): def dropTimeOut(self): """Cancel the timeout call when a response is received.""" - if self.later and self.later.active(): - self.later.cancel() - self.later = None + if self.laterFinalTimeout and self.laterFinalTimeout.active(): + self.laterFinalTimeout.cancel() + self.laterFinalTimeout = None + if self.laterNextTimeout and self.laterNextTimeout.active(): + self.laterNextTimeout.cancel() + self.laterNextTimeout = None class KRPC: """The KRPC protocol implementation. @@ -338,7 +353,7 @@ class KRPC: msg = bdecode(data) except Exception, e: if self.config.get('SPEW', False): - log.msg("krpc bdecode error: ") + log.msg("krpc bdecode error from %r: " % (addr, )) log.err(e) return @@ -346,8 +361,7 @@ class KRPC: try: verifyMessage(msg) except Exception, e: - log.msg("krpc message verification error: ") - log.err(e) + log.msg("krpc message verification error from %r: %r" % (addr, e)) return if self.config.get('SPEW', False): @@ -368,6 +382,8 @@ class KRPC: log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ]) if e[0] != KRPC_ERROR_INVALID_TOKEN: log.err(e) + else: + log.msg('Node sent us an invalid token, not storing') self.stats.errorAction(msg[REQ]) olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, [e[0], e[1]]) @@ -416,7 +432,7 @@ class KRPC: else: # no tid, this transaction was finished already... log.msg('received an error %r from %r for completed request: %r' % - (msg[ERR], msg[RSP]['id'], msg[TID])) + (msg[ERR], addr, msg[TID])) else: # Received an unknown message type if self.config.get('SPEW', False): @@ -584,7 +600,7 @@ def make(port): from stats import StatsLogger af = Receiver() a = hostbroker(af, StatsLogger(None, None), - {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False}) + {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False}) a.protocol = KRPC p = reactor.listenUDP(port, a) return af, a, p