]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/krpc.py
Rename all apt-dht files to apt-p2p.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py
deleted file mode 100644 (file)
index a4fbacc..0000000
+++ /dev/null
@@ -1,561 +0,0 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-"""The KRPC communication protocol implementation.
-
-@var KRPC_TIMEOUT: the number of seconds after which requests timeout
-@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
-    UDP packet without fragmentation
-
-@var KRPC_ERROR: the code for a generic error
-@var KRPC_ERROR_SERVER_ERROR: the code for a server error
-@var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
-@var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
-@var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
-@var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
-@var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
-
-@var KRPC_ERROR_INTERNAL: the code for an internal error
-@var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
-@var KRPC_ERROR_TIMEOUT: the code for a timeout error
-@var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
-
-@var TID: the identifier for the transaction ID
-@var REQ: the identifier for a request packet
-@var RSP: the identifier for a response packet
-@var TYP: the identifier for the type of packet
-@var ARG: the identifier for the argument to the request
-@var ERR: the identifier for an error packet
-
-@group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
-    KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
-    KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
-    KRPC_ERROR_RESPONSE_TOO_LONG
-@group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
-    KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
-@group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
-
-"""
-
-from bencode import bencode, bdecode
-from time import asctime
-from math import ceil
-
-from twisted.internet.defer import Deferred
-from twisted.internet import protocol, reactor
-from twisted.python import log
-from twisted.trial import unittest
-
-from khash import newID
-
-KRPC_TIMEOUT = 20
-UDP_PACKET_LIMIT = 1472
-
-# Remote node errors
-KRPC_ERROR = 200
-KRPC_ERROR_SERVER_ERROR = 201
-KRPC_ERROR_MALFORMED_PACKET = 202
-KRPC_ERROR_METHOD_UNKNOWN = 203
-KRPC_ERROR_MALFORMED_REQUEST = 204
-KRPC_ERROR_INVALID_TOKEN = 205
-KRPC_ERROR_RESPONSE_TOO_LONG = 206
-
-# Local errors
-KRPC_ERROR_INTERNAL = 100
-KRPC_ERROR_RECEIVED_UNKNOWN = 101
-KRPC_ERROR_TIMEOUT = 102
-KRPC_ERROR_PROTOCOL_STOPPED = 103
-
-# commands
-TID = 't'
-REQ = 'q'
-RSP = 'r'
-TYP = 'y'
-ARG = 'a'
-ERR = 'e'
-
-class KrpcError(Exception):
-    """An error occurred in the KRPC protocol."""
-    pass
-
-def verifyMessage(msg):
-    """Check received message for corruption and errors.
-    
-    @type msg: C{dictionary}
-    @param msg: the dictionary of information received on the connection
-    @raise KrpcError: if the message is corrupt
-    """
-    
-    if type(msg) != dict:
-        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
-    if TYP not in msg:
-        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
-    if msg[TYP] == REQ:
-        if REQ not in msg:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
-        if type(msg[REQ]) != str:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
-        if ARG not in msg:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
-        if type(msg[ARG]) != dict:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
-    elif msg[TYP] == RSP:
-        if RSP not in msg:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
-        if type(msg[RSP]) != dict:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
-    elif msg[TYP] == ERR:
-        if ERR not in msg:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
-        if type(msg[ERR]) != list:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
-        if len(msg[ERR]) != 2:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
-        if type(msg[ERR][0]) not in (int, long):
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
-        if type(msg[ERR][1]) != str:
-            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
-#    else:
-#        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
-    if TID not in msg:
-        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
-    if type(msg[TID]) != str:
-        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
-
-class hostbroker(protocol.DatagramProtocol):
-    """The factory for the KRPC protocol.
-    
-    @type server: L{khashmir.Khashmir}
-    @ivar server: the main Khashmir program
-    @type config: C{dictionary}
-    @ivar config: the configuration parameters for the DHT
-    @type connections: C{dictionary}
-    @ivar connections: all the connections that have ever been made to the
-        protocol, keys are IP address and port pairs, values are L{KRPC}
-        protocols for the addresses
-    @ivar protocol: the protocol to use to handle incoming connections
-        (added externally)
-    @type addr: (C{string}, C{int})
-    @ivar addr: the IP address and port of this node
-    """
-    
-    def __init__(self, server, config):
-        """Initialize the factory.
-        
-        @type server: L{khashmir.Khashmir}
-        @param server: the main DHT program
-        @type config: C{dictionary}
-        @param config: the configuration parameters for the DHT
-        """
-        self.server = server
-        self.config = config
-        # this should be changed to storage that drops old entries
-        self.connections = {}
-        
-    def datagramReceived(self, datagram, addr):
-        """Optionally create a new protocol object, and handle the new datagram.
-        
-        @type datagram: C{string}
-        @param datagram: the data received from the transport.
-        @type addr: (C{string}, C{int})
-        @param addr: source IP address and port of datagram.
-        """
-        c = self.connectionForAddr(addr)
-        c.datagramReceived(datagram, addr)
-        #if c.idle():
-        #    del self.connections[addr]
-
-    def connectionForAddr(self, addr):
-        """Get a protocol object for the source.
-        
-        @type addr: (C{string}, C{int})
-        @param addr: source IP address and port of datagram.
-        """
-        # Don't connect to ourself
-        if addr == self.addr:
-            raise KrcpError
-        
-        # Create a new protocol object if necessary
-        if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
-            self.connections[addr] = conn
-        else:
-            conn = self.connections[addr]
-        return conn
-
-    def makeConnection(self, transport):
-        """Make a connection to a transport and save our address."""
-        protocol.DatagramProtocol.makeConnection(self, transport)
-        tup = transport.getHost()
-        self.addr = (tup.host, tup.port)
-        
-    def stopProtocol(self):
-        """Stop all the open connections."""
-        for conn in self.connections.values():
-            conn.stop()
-        protocol.DatagramProtocol.stopProtocol(self)
-
-class KRPC:
-    """The KRPC protocol implementation.
-    
-    @ivar transport: the transport to use for the protocol
-    @type factory: L{khashmir.Khashmir}
-    @ivar factory: the main Khashmir program
-    @type addr: (C{string}, C{int})
-    @ivar addr: the IP address and port of the source node
-    @type noisy: C{boolean}
-    @ivar noisy: whether to log additional details of the protocol
-    @type tids: C{dictionary}
-    @ivar tids: the transaction IDs outstanding for requests, keys are the
-        transaction ID of the request, values are the deferreds to call with
-        the results
-    @type stopped: C{boolean}
-    @ivar stopped: whether the protocol has been stopped
-    """
-    
-    def __init__(self, addr, server, transport, spew = False):
-        """Initialize the protocol.
-        
-        @type addr: (C{string}, C{int})
-        @param addr: the IP address and port of the source node
-        @type server: L{khashmir.Khashmir}
-        @param server: the main Khashmir program
-        @param transport: the transport to use for the protocol
-        @type spew: C{boolean}
-        @param spew: whether to log additional details of the protocol
-            (optional, defaults to False)
-        """
-        self.transport = transport
-        self.factory = server
-        self.addr = addr
-        self.noisy = spew
-        self.tids = {}
-        self.stopped = False
-
-    def datagramReceived(self, data, addr):
-        """Process the new datagram.
-        
-        @type data: C{string}
-        @param data: the data received from the transport.
-        @type addr: (C{string}, C{int})
-        @param addr: source IP address and port of datagram.
-        """
-        if self.stopped:
-            if self.noisy:
-                log.msg("stopped, dropping message from %r: %s" % (addr, data))
-
-        # Bdecode the message
-        try:
-            msg = bdecode(data)
-        except Exception, e:
-            if self.noisy:
-                log.msg("krpc bdecode error: ")
-                log.err(e)
-            return
-
-        # Make sure the remote node isn't trying anything funny
-        try:
-            verifyMessage(msg)
-        except Exception, e:
-            log.msg("krpc message verification error: ")
-            log.err(e)
-            return
-
-        if self.noisy:
-            log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
-
-        # Process it based on its type
-        if msg[TYP]  == REQ:
-            ilen = len(data)
-            
-            # Requests are handled by the factory
-            f = getattr(self.factory ,"krpc_" + msg[REQ], None)
-            msg[ARG]['_krpc_sender'] =  self.addr
-            if f and callable(f):
-                try:
-                    ret = f(*(), **msg[ARG])
-                except KrpcError, e:
-                    log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
-                    log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]])
-                except TypeError, e:
-                    log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
-                    log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR,
-                                              [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
-                except Exception, e:
-                    log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
-                    log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR,
-                                              [KRPC_ERROR_SERVER_ERROR, str(e)])
-                else:
-                    olen = self._sendResponse(addr, msg[TID], RSP, ret)
-            else:
-                # Request for unknown method
-                log.msg("ERROR: don't know about method %s" % msg[REQ])
-                olen = self._sendResponse(addr, msg[TID], ERR,
-                                          [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
-            if self.noisy:
-                log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
-                                                  ilen, msg[REQ], olen))
-        elif msg[TYP] == RSP:
-            # Responses get processed by their TID's deferred
-            if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
-                #      callback
-                del(self.tids[msg[TID]])
-                df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
-            else:
-                # no tid, this transaction timed out already...
-                if self.noisy:
-                    log.msg('timeout: %r' % msg[RSP]['id'])
-        elif msg[TYP] == ERR:
-            # Errors get processed by their TID's deferred's errback
-            if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
-                del(self.tids[msg[TID]])
-                # callback
-                df.errback(KrpcError(*msg[ERR]))
-            else:
-                # day late and dollar short, just log it
-                log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
-                pass
-        else:
-            # Received an unknown message type
-            if self.noisy:
-                log.msg("unknown message type: %r" % msg)
-            if msg[TID] in self.tids:
-                df = self.tids[msg[TID]]
-                del(self.tids[msg[TID]])
-                # callback
-                df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
-                                     "Received an unknown message type: %r" % msg[TYP]))
-                
-    def _sendResponse(self, addr, tid, msgType, response):
-        """Helper function for sending responses to nodes.
-        
-        @type addr: (C{string}, C{int})
-        @param addr: source IP address and port of datagram.
-        @param tid: the transaction ID of the request
-        @param msgType: the type of message to respond with
-        @param response: the arguments for the response
-        """
-        if not response:
-            response = {}
-        
-        try:
-            # Create the response message
-            msg = {TID : tid, TYP : msgType, msgType : response}
-    
-            if self.noisy:
-                log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
-    
-            out = bencode(msg)
-            
-            # Make sure its not too long
-            if len(out) > UDP_PACKET_LIMIT:
-                # Can we remove some values to shorten it?
-                if 'values' in response:
-                    # Save the original list of values
-                    orig_values = response['values']
-                    len_orig_values = len(bencode(orig_values))
-                    
-                    # Caclulate the maximum value length possible
-                    max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
-                    assert max_len_values > 0
-                    
-                    # Start with a calculation of how many values should be included
-                    # (assumes all values are the same length)
-                    per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
-                    num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
-    
-                    # Do a linear search for the actual maximum number possible
-                    bencoded_values = len(bencode(orig_values[:num_values]))
-                    while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
-                        bencoded_values += len(bencode(orig_values[num_values]))
-                        num_values += 1
-                    while bencoded_values > max_len_values and num_values > 0:
-                        num_values -= 1
-                        bencoded_values -= len(bencode(orig_values[num_values]))
-                    assert num_values > 0
-    
-                    # Encode the result
-                    response['values'] = orig_values[:num_values]
-                    out = bencode(msg)
-                    assert len(out) < UDP_PACKET_LIMIT
-                    log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
-                            (len(orig_values), num_values, len(out)))
-                else:
-                    # Too long a response, send an error
-                    log.msg('Could not send response, too long: %d bytes' % len(out))
-                    msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
-                    out = bencode(msg)
-
-        except Exception, e:
-            # Unknown error, send an error message
-            msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
-            out = bencode(msg)
-                    
-        self.transport.write(out, addr)
-        return len(out)
-    
-    def sendRequest(self, method, args):
-        """Send a request to the remote node.
-        
-        @type method: C{string}
-        @param method: the methiod name to call on the remote node
-        @param args: the arguments to send to the remote node's method
-        """
-        if self.stopped:
-            raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
-
-        # Create the request message
-        msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
-        if self.noisy:
-            log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
-        data = bencode(msg)
-        
-        # Create the deferred and save it with the TID
-        d = Deferred()
-        self.tids[msg[TID]] = d
-
-        # Schedule a later timeout call
-        def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
-            """Call the deferred's errback if a timeout occurs."""
-            if tids.has_key(id):
-                df = tids[id]
-                del(tids[id])
-                df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
-        later = reactor.callLater(KRPC_TIMEOUT, timeOut)
-        
-        # Cancel the timeout call if a response is received
-        def dropTimeOut(dict, later_call = later):
-            """Cancel the timeout call when a response is received."""
-            if later_call.active():
-                later_call.cancel()
-            return dict
-        d.addBoth(dropTimeOut)
-        
-        self.transport.write(data, self.addr)
-        return d
-    
-    def stop(self):
-        """Timeout all pending requests."""
-        for df in self.tids.values():
-            df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
-        self.tids = {}
-        self.stopped = True
-
-#{ For testing the KRPC protocol
-def connectionForAddr(host, port):
-    return host
-    
-class Receiver(protocol.Factory):
-    protocol = KRPC
-    def __init__(self):
-        self.buf = []
-    def krpc_store(self, msg, _krpc_sender):
-        self.buf += [msg]
-        return {}
-    def krpc_echo(self, msg, _krpc_sender):
-        return {'msg': msg}
-    def krpc_values(self, length, num, _krpc_sender):
-        return {'values': ['1'*length]*num}
-
-def make(port):
-    af = Receiver()
-    a = hostbroker(af, {'SPEW': False})
-    a.protocol = KRPC
-    p = reactor.listenUDP(port, a)
-    return af, a, p
-    
-class KRPCTests(unittest.TestCase):
-    timeout = 2
-    
-    def setUp(self):
-        self.af, self.a, self.ap = make(1180)
-        self.bf, self.b, self.bp = make(1181)
-
-    def tearDown(self):
-        self.ap.stopListening()
-        self.bp.stopListening()
-
-    def bufEquals(self, result, value):
-        self.failUnlessEqual(self.bf.buf, value)
-
-    def testSimpleMessage(self):
-        d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
-        d.addCallback(self.bufEquals, ["This is a test."])
-        return d
-
-    def testMessageBlast(self):
-        for i in range(100):
-            d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
-        d.addCallback(self.bufEquals, ["This is a test."] * 100)
-        return d
-
-    def testEcho(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
-        df.addCallback(self.gotMsg, "This is a test.")
-        return df
-
-    def gotMsg(self, dict, should_be):
-        _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
-        self.failUnlessEqual(msg['msg'], should_be)
-
-    def testManyEcho(self):
-        for i in xrange(100):
-            df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
-            df.addCallback(self.gotMsg, "This is a test.")
-        return df
-
-    def testMultiEcho(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
-        df.addCallback(self.gotMsg, "This is a test.")
-
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
-        df.addCallback(self.gotMsg, "This is another test.")
-
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
-        df.addCallback(self.gotMsg, "This is yet another test.")
-        
-        return df
-
-    def testEchoReset(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
-        df.addCallback(self.gotMsg, "This is a test.")
-
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
-        df.addCallback(self.gotMsg, "This is another test.")
-        df.addCallback(self.echoReset)
-        return df
-    
-    def echoReset(self, dict):
-        del(self.a.connections[('127.0.0.1', 1181)])
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
-        df.addCallback(self.gotMsg, "This is yet another test.")
-        return df
-
-    def testUnknownMeth(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
-        df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
-        return df
-
-    def testMalformedRequest(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
-        df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST)
-        return df
-
-    def gotErr(self, err, should_be):
-        self.failUnlessEqual(err.value[0], should_be)
-        
-    def testLongPackets(self):
-        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
-        df.addCallback(self.gotLongRsp)
-        return df
-
-    def gotLongRsp(self, dict):
-        # Not quite accurate, but good enough
-        self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
-        
\ No newline at end of file