From: burris Date: Mon, 2 Sep 2002 08:04:29 +0000 (+0000) Subject: this is the new khashmir, now based on Twisted and XML-RPC X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=b142a7b2b4b8a8ddb037377b90b4fe3be9f79a47;p=quix0rs-apt-p2p.git this is the new khashmir, now based on Twisted and XML-RPC store/find value doesn't seem to work yet --- diff --git a/const.py b/const.py new file mode 100644 index 0000000..0aad77d --- /dev/null +++ b/const.py @@ -0,0 +1,5 @@ + +from twisted.internet.default import SelectReactor +reactor = SelectReactor(installSignalHandlers=0) +from twisted.internet import main +main.installReactor(reactor) \ No newline at end of file diff --git a/dispatcher.py b/dispatcher.py deleted file mode 100644 index 08c62db..0000000 --- a/dispatcher.py +++ /dev/null @@ -1,216 +0,0 @@ -## Copyright 2002 Andrew Loewenstern, All Rights Reserved - -from bsddb3 import db ## find this at http://pybsddb.sf.net/ -from bsddb3._db import DBNotFoundError -import time -import hash -from node import Node -from bencode import bencode, bdecode -#from threading import RLock - -# max number of incoming or outgoing messages to process at a time -NUM_EVENTS = 5 - -class Transaction: - __slots__ = ['responseTemplate', 'id', 'dispatcher', 'target', 'payload', 'response', 'default', 'timeout'] - def __init__(self, dispatcher, node, response_handler, default_handler, id = None, payload = None, timeout=60): - if id == None: - id = hash.newID() - self.id = id - self.dispatcher = dispatcher - self.target = node - self.payload = payload - self.response = response_handler - self.default = default_handler - self.timeout = time.time() + timeout - - def setPayload(self, payload): - self.payload = payload - - def setResponseTemplate(self, t): - self.responseTemplate = t - - def responseHandler(self, msg): - if self.responseTemplate and callable(self.responseTemplate): - try: - self.responseTemplate(msg) - except ValueError, reason: - print "response %s" % (reason) - print `msg['id'], self.target.id` - return - self.response(self, msg) - - def defaultHandler(self): - self.default(self) - - def dispatch(self): - if callable(self.response) and callable(self.default): - self.dispatcher.initiate(self) - else: - self.dispatchNoResponse() - def dispatchNoResponse(self): - self.dispatcher.initiateNoResponse(self) - - - -class Dispatcher: - def __init__(self, listener, base_template, id): - self.id = id - self.listener = listener - self.transactions = {} - self.handlers = {} - self.timeout = db.DB() - self.timeout.set_flags(db.DB_DUP) - self.timeout.open(None, None, db.DB_BTREE) - self.BASE = base_template - self.stopped = 0 - #self.tlock = RLock() - - def registerHandler(self, key, handler, template): - assert(callable(handler)) - assert(callable(template)) - self.handlers[key] = (handler, template) - - def initiate(self, transaction): - #self.tlock.acquire() - #ignore messages to ourself - if transaction.target.id == self.id: - return - self.transactions[transaction.id] = transaction - self.timeout.put(`transaction.timeout`, transaction.id) - ## queue the message! - self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port) - #self.tlock.release() - - def initiateNoResponse(self, transaction): - #ignore messages to ourself - if transaction.target.id == self.id: - return - #self.tlock.acquire() - self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port) - #self.tlock.release() - - def postEvent(self, callback, delay, extras=None): - #self.tlock.acquire() - t = Transaction(self, None, None, callback, timeout=delay) - t.extras = extras - self.transactions[t.id] = t - self.timeout.put(`t.timeout`, t.id) - #self.tlock.release() - - def flushExpiredEvents(self): - events = 0 - tstamp = `time.time()` - #self.tlock.acquire() - c = self.timeout.cursor() - e = c.first() - while e and e[0] < tstamp: - events = events + 1 - try: - t = self.transactions[e[1]] - del(self.transactions[e[1]]) - except KeyError: - # transaction must have completed or was otherwise cancelled - pass - ## default callback! - else: - t.defaultHandler() - tmp = c.next() - # handle duplicates in a silly way - if tmp and e != tmp: - self.timeout.delete(e[0]) - e = tmp - #self.tlock.release() - return events - - def flushOutgoing(self): - events = 0 - n = self.listener.qLen() - if n > NUM_EVENTS: - n = NUM_EVENTS - for i in range(n): - self.listener.dispatchMsg() - events = events + 1 - return events - - def handleIncoming(self): - events = 0 - #self.tlock.acquire() - for i in range(NUM_EVENTS): - try: - msg, addr = self.listener.receiveMsg() - except ValueError: - break - - ## decode message, handle message! - try: - msg = bdecode(msg) - except ValueError: - # wrongly encoded message? - print "Bogus message received: %s" % msg - continue - try: - # check base template for correctness - self.BASE(msg) - except ValueError, reason: - # bad message! - print "Incoming message: %s" % reason - continue - try: - # check to see if we already know about this transaction - t = self.transactions[msg['tid']] - if msg['id'] != t.target.id and t.target.id != " "*20: - # we're expecting a response from someone else - if msg['id'] == self.id: - print "received our own response! " + `self.id` - else: - print "response from wrong peer! "+ `msg['id'],t.target.id` - else: - del(self.transactions[msg['tid']]) - self.timeout.delete(`t.timeout`) - t.addr = addr - # call transaction response handler - t.responseHandler(msg) - except KeyError: - # we don't know about it, must be unsolicited - n = Node(msg['id'], addr[0], addr[1]) - t = Transaction(self, n, None, None, msg['tid']) - if self.handlers.has_key(msg['type']): - ## handle this transaction - try: - # check handler template - self.handlers[msg['type']][1](msg) - except ValueError, reason: - print "BAD MESSAGE: %s" % reason - else: - self.handlers[msg['type']][0](t, msg) - else: - ## no transaction, no handler, drop it on the floor! - pass - events = events + 1 - #self.tlock.release() - return events - - def stop(self): - self.stopped = 1 - - def run(self): - self.stopped = 0 - while(not self.stopped): - events = self.runOnce() - ## sleep - if events == 0: - time.sleep(0.1) - - def runOnce(self): - events = 0 - ## handle some incoming messages - events = events + self.handleIncoming() - ## process some outstanding events - events = events + self.flushExpiredEvents() - ## send outgoing messages - events = events + self.flushOutgoing() - return events - - - diff --git a/khashmir.py b/khashmir.py index d624c97..32dab3e 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,14 +1,17 @@ ## Copyright 2002 Andrew Loewenstern, All Rights Reserved -from listener import Listener +from const import reactor +import time + from ktable import KTable, K -from node import Node -from dispatcher import Dispatcher +from knode import KNode as Node + from hash import newID, intify -import messages -import transactions -import time +from twisted.web import xmlrpc +from twisted.internet.defer import Deferred +from twisted.python import threadable +threadable.init() from bsddb3 import db ## find this at http://pybsddb.sf.net/ from bsddb3._db import DBNotFoundError @@ -20,28 +23,29 @@ MAX_PING_INTERVAL = 60 * 15 # fifteen minutes N = 3 + # this is the main class! -class Khashmir: - __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store'] +class Khashmir(xmlrpc.XMLRPC): + __slots__ = ['listener', 'node', 'table', 'store', 'app'] def __init__(self, host, port): - self.listener = Listener(host, port) self.node = Node(newID(), host, port) self.table = KTable(self.node) - self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id) - self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher) - + from twisted.internet.app import Application + from twisted.web import server + self.app = Application("xmlrpc") + self.app.listenTCP(port, server.Site(self)) self.store = db.DB() self.store.open(None, None, db.DB_BTREE) + - #### register unsolicited incoming message handlers - self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING) - - self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE) + def render(self, request): + """ + Override the built in render so we can have access to the request object! + note, crequest is probably only valid on the initial call (not after deferred!) + """ + self.crequest = request + return xmlrpc.XMLRPC.render(self, request) - self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE) - - self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE) - ####### ####### LOCAL INTERFACE - use these methods! @@ -54,20 +58,18 @@ class Khashmir: ## this call is async! - def findNode(self, id, callback): + def findNode(self, id, callback, errback=None): """ returns the contact info for node, or the k closest nodes, from the global table """ # get K nodes out of local table/cache, or the node we want nodes = self.table.findNodes(id) + d = Deferred() + d.addCallbacks(callback, errback) if len(nodes) == 1 and nodes[0].id == id : - # we got it in our table! - def tcall(t, callback=callback): - callback(t.extras) - self.dispatcher.postEvent(tcall, 0, extras=nodes) + d.callback(nodes) else: # create our search state - state = FindNode(self, self.dispatcher, id, callback) - # handle this in our own thread - self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes) + state = FindNode(self, id, d.callback) + reactor.callFromThread(state.goWithNodes, nodes) ## also async @@ -75,9 +77,8 @@ class Khashmir: """ returns the values found for key in global table """ nodes = self.table.findNodes(key) # create our search state - state = GetValue(self, self.dispatcher, key, callback) - # handle this in our own thread - self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes) + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes) ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now @@ -87,11 +88,11 @@ class Khashmir: values are stored in peers on a first-come first-served basis this will probably change so more than one value can be stored under a key """ - def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"): + def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"): for node in nodes: if node.id != self.node.id: - t = tf.StoreValue(node, key, value, response, default) - t.dispatch() + df = node.storeValue(key, value, self.node.senderDict()) + df.addCallbacks(response, default) # this call is asynch self.findNode(key, _storeValueForKey) @@ -108,17 +109,41 @@ class Khashmir: old = self.table.insertNode(n) if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id: # the bucket is full, check to see if old node is still around and if so, replace it - t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler) - t.newnode = n - t.dispatch() + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(sender, old=old): + """ called when we get a ping from the remote node """ + if sender['id'] == old.id: + self.table.insertNode(old) + + df = old.ping() + df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler) def sendPing(self, node): """ ping a node """ - t = self.tf.Ping(node, self._pongHandler, self._defaultPong) - t.dispatch() + df = node.ping(self.node.senderDict()) + ## these are the callbacks we use when we issue a PING + def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table): + if id != 20 * ' ' and id != sender['id']: + # whoah, got response from different peer than we were expecting + pass + else: + #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) + n = Node(sender['id'], host, port) + table.insertNode(n) + return + def _defaultPong(err): + # this should probably increment a failed message counter and dump the node if it gets over a threshold + return + + df.addCallbacks(_pongHandler,_defaultPong) def findCloseNodes(self): @@ -146,71 +171,59 @@ class Khashmir: ##### - ##### UNSOLICITED INCOMING MESSAGE HANDLERS + ##### INCOMING MESSAGE HANDLERS - def _pingHandler(self, t, msg): - #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port) - self.insertNode(t.target) - # respond, no callbacks, we don't care if they get it or not - nt = self.tf.Pong(t) - nt.dispatch() - - def _findNodeHandler(self, t, msg): - #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port) - nodes = self.table.findNodes(msg['target']) - # respond, no callbacks, we don't care if they get it or not - nt = self.tf.GotNodes(t, nodes) - nt.dispatch() - - def _storeValueHandler(self, t, msg): - if not self.store.has_key(msg['key']): - self.store.put(msg['key'], msg['value']) - nt = self.tf.StoredValue(t) - nt.dispatch() + def xmlrpc_ping(self, sender): + """ + takes sender dict = {'id', , 'port', port} optional keys = 'ip' + returns sender dict + """ + ip = self.crequest.getClientIP() + n = Node(sender['id'], ip, sender['port']) + self.insertNode(n) + return self.node.senderDict() + + def xmlrpc_find_node(self, target, sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + ip = self.crequest.getClientIP() + n = Node(sender['id'], ip, sender['port']) + self.insertNode(n) + return nodes, self.node.senderDict() - def _findValueHandler(self, t, msg): - if self.store.has_key(msg['key']): - t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])]) + def xmlrpc_store_value(self, key, value, sender): + if not self.store.has_key(key): + self.store.put(key, value) + ip = self.crequest.getClientIP() + n = Node(sender['id'], ip, sender['port']) + self.insertNode(n) + return self.node.senderDict() + + def xmlrpc_find_value(self, key, sender): + ip = self.crequest.getClientIP() + n = Node(sender['id'], ip, sender['port']) + self.insertNode(n) + if self.store.has_key(key): + return {'values' : self.store[key]}, self.node.senderDict() else: nodes = self.table.findNodes(msg['key']) - t = self.tf.GotNodes(t, nodes) - t.dispatch() - + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes}, self.node.senderDict() ### ### message response callbacks # called when we get a response to store value - def _storedValueHandler(self, t, msg): - self.table.insertNode(t.target) - - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(self, t): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(t.target, t.newnode) + def _storedValueHandler(self, sender): + pass - def _notStaleNodeHandler(self, t, msg): - """ called when we get a ping from the remote node """ - self.table.insertNode(t.target) - ## these are the callbacks we use when we issue a PING - def _pongHandler(self, t, msg): - #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) - n = Node(msg['id'], t.addr[0], t.addr[1]) - self.table.insertNode(n) - - def _defaultPong(self, t): - # this should probably increment a failed message counter and dump the node if it gets over a threshold - print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port) - class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, dispatcher, target, callback): + def __init__(self, table, target, callback): self.table = table - self.dispatcher = dispatcher self.target = target self.int = intify(target) self.found = {} @@ -233,15 +246,20 @@ class ActionBase: def goWithNodes(self, t): pass + + +FIND_NODE_TIMEOUT = 15 + class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, t, msg): - if self.finished or self.answered.has_key(t.id): + def handleGotNodes(self, args): + l, sender = args + if self.finished or self.answered.has_key(sender['id']): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[t.id] = 1 - for node in msg['nodes']: + self.answered[sender['id']] = 1 + for node in l: if not self.found.has_key(node['id']): n = Node(node['id'], node['host'], node['port']) self.found[n.id] = n @@ -262,18 +280,18 @@ class FindNode(ActionBase): self.finished=1 return self.callback([node]) if not self.queried.has_key(node.id) and node.id != self.table.node.id: - t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes) + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - t.timeout = time.time() + 15 - t.dispatch() if self.outstanding >= N: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done!! self.finished=1 - self.callback(l[:K]) + reactor.callFromThread(self.callback, l[:K]) def defaultGotNodes(self, t): if self.finished: @@ -282,46 +300,47 @@ class FindNode(ActionBase): self.schedule() - def goWithNodes(self, t): + def goWithNodes(self, nodes): """ this starts the process, our argument is a transaction with t.extras being our list of nodes it's a transaction since we got called from the dispatcher """ - nodes = t.extras for node in nodes: if node.id == self.table.node.id: continue self.found[node.id] = node - t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes) - t.timeout = time.time() + 15 - t.dispatch() + #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: self.callback(nodes) - +GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ - def handleGotNodes(self, t, msg): - if self.finished or self.answered.has_key(t.id): + def handleGotNodes(self, args): + l, sender = args + l = l[0] + if self.finished or self.answered.has_key(sender['id']): # a day late and a dollar short return self.outstanding = self.outstanding - 1 - self.answered[t.id] = 1 + self.answered[sender['id']] = 1 # go through nodes # if we have any closer than what we already got, query them - if msg['type'] == 'got nodes': - for node in msg['nodes']: + if l.has_key('nodes'): + for node in l['nodes']: if not self.found.has_key(node['id']): n = Node(node['id'], node['host'], node['port']) self.found[n.id] = n self.table.insertNode(n) - elif msg['type'] == 'got values': + elif l.has_key('values'): ## done self.finished = 1 - return self.callback(msg['values']) + return self.callback(l['values']) self.schedule() ## get value @@ -333,42 +352,43 @@ class GetValue(FindNode): for node in l[:K]: if not self.queried.has_key(node.id) and node.id != self.table.node.id: - t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes) + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + df = node.getValue(node, self.target) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - t.timeout = time.time() + 15 - t.dispatch() if self.outstanding >= N: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 - self.callback([]) + reactor.callFromThread(self.callback,[]) ## get value - def goWithNodes(self, t): - nodes = t.extras + def goWithNodes(self, nodes): for node in nodes: if node.id == self.table.node.id: continue self.found[node.id] = node - t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes) - t.timeout = time.time() + 15 - t.dispatch() + #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: - self.callback([]) + reactor.callFromThread(self.callback, []) + #------ + def test_build_net(quiet=0): from whrandom import randrange import thread port = 2001 l = [] - peers = 100 + peers = 16 if not quiet: print "Building %s peer table." % peers @@ -385,10 +405,10 @@ def test_build_net(quiet=0): if events == 0: time.sleep(.25) - for i in range(10): - thread.start_new_thread(run, (l[i*10:(i+1)*10],)) - #thread.start_new_thread(l[i].dispatcher.run, ()) - + thread.start_new_thread(l[0].app.run, ()) + for peer in l[1:]: + peer.app.run() + for peer in l[1:]: n = l[randrange(0, len(l))].node peer.addContact(n.host, n.port) @@ -484,5 +504,3 @@ if __name__ == "__main__": test_find_value(l) test_find_value(l) test_find_value(l) - for i in l: - i.dispatcher.stop() diff --git a/knode.py b/knode.py new file mode 100644 index 0000000..66b8b53 --- /dev/null +++ b/knode.py @@ -0,0 +1,26 @@ +from node import Node +from twisted.internet.defer import Deferred +from xmlrpcclient import XMLRPCClientFactory as factory +from const import reactor + +class KNode(Node): + def ping(self, sender): + df = Deferred() + f = factory('ping', (sender,), df.callback, df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def findNode(self, target, sender): + df = Deferred() + f = factory('find_node', (target, sender), df.callback, df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def storeValue(self, key, value, sender): + df = Deferred() + f = factory('store_value', (key, value, sender), df.callback, df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def findValue(self, key, sender): + f = factory('find_value', (key, sender), df.callback, df.errback) + reactor.connectTCP(self.host, self.port, f) + df = Deferred() + return df diff --git a/ktable.py b/ktable.py index 9cd3732..3bff407 100644 --- a/ktable.py +++ b/ktable.py @@ -8,7 +8,7 @@ from types import * from node import Node # The all-powerful, magical Kademlia "k" constant, bucket depth -K = 20 +K = 8 # how many bits wide is our hash? HASH_LENGTH = 160 @@ -111,6 +111,7 @@ class KTable: this insert the node, returning None if successful, returns the oldest node in the bucket if it's full the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! """ + assert(node.id != " "*20) # get the bucket for this node i = self. _bucketIndexForInt(node.int) ## check to see if node is in the bucket already diff --git a/listener.py b/listener.py deleted file mode 100644 index 3999b32..0000000 --- a/listener.py +++ /dev/null @@ -1,100 +0,0 @@ -## Copyright 2002 Andrew Loewenstern, All Rights Reserved - -from socket import * - -# simple UDP communicator - -class Listener: - def __init__(self, host, port): - self.msgq = [] - self.sock = socket(AF_INET, SOCK_DGRAM) - self.sock.setblocking(0) - self.sock.bind((host, port)) - - def qMsg(self, msg, host, port): - self.msgq.append((msg, host, port)) - - def qLen(self): - return len(self.msgq) - - def dispatchMsg(self): - if self.qLen() > 0: - msg, host, port = self.msgq[0] - del self.msgq[0] - self.sock.sendto(msg, 0, (host, port)) - - def receiveMsg(self): - msg = () - try: - msg = self.sock.recvfrom(65536) - except error, tup: - if tup[1] == "Resource temporarily unavailable": - # no message - return msg - print error, tup - else: - return msg - - def __del__(self): - self.sock.close() - - - -########################### -import unittest - -class ListenerTest(unittest.TestCase): - def setUp(self): - self.a = Listener('localhost', 8080) - self.b = Listener('localhost', 8081) - def tearDown(self): - del(self.a) - del(self.b) - - def testQueue(self): - assert self.a.qLen() == 0, "expected queue to be empty" - self.a.qMsg('hello', 'localhost', 8081) - assert self.a.qLen() == 1, "expected one message to be in queue" - self.a.qMsg('hello', 'localhost', 8081) - assert self.a.qLen() == 2, "expected two messages to be in queue" - self.a.dispatchMsg() - assert self.a.qLen() == 1, "expected one message to be in queue" - self.a.dispatchMsg() - assert self.a.qLen() == 0, "expected all messages to be flushed from queue" - - def testSendReceiveOne(self): - self.a.qMsg('hello', 'localhost', 8081) - self.a.dispatchMsg() - - assert self.b.receiveMsg()[0] == "hello", "did not receive expected message" - assert self.b.receiveMsg() == (), "received unexpected message" - - self.b.qMsg('hello', 'localhost', 8080) - self.b.dispatchMsg() - - assert self.a.receiveMsg()[0] == "hello", "did not receive expected message" - - assert self.a.receiveMsg() == (), "received unexpected message" - - def testSendReceiveInterleaved(self): - self.a.qMsg('hello', 'localhost', 8081) - self.a.qMsg('hello', 'localhost', 8081) - self.a.dispatchMsg() - self.a.dispatchMsg() - - assert self.b.receiveMsg()[0] == "hello", "did not receive expected message" - assert self.b.receiveMsg()[0] == "hello", "did not receive expected message" - assert self.b.receiveMsg() == (), "received unexpected message" - - self.b.qMsg('hello', 'localhost', 8080) - self.b.qMsg('hello', 'localhost', 8080) - self.b.dispatchMsg() - self.b.dispatchMsg() - - assert self.a.receiveMsg()[0] == "hello", "did not receive expected message" - assert self.a.receiveMsg()[0] == "hello", "did not receive expected message" - assert self.a.receiveMsg() == (), "received unexpected message" - - -if __name__ == '__main__': - unittest.main() diff --git a/messages.py b/messages.py deleted file mode 100644 index 542fe23..0000000 --- a/messages.py +++ /dev/null @@ -1,186 +0,0 @@ -## Copyright 2002 Andrew Loewenstern, All Rights Reserved - -from bencode import bencode, bdecode -from btemplate import * -from node import Node - - -# template checker for hash id -def hashid(thing, verbose): - if type(thing) != type(''): - raise ValueError, 'must be a string' - if len(thing) != 20: - raise ValueError, 'must be 20 characters long' - -## our messages -BASE = compile_template({'id' : hashid, 'tid' : hashid, 'type' : string_template}) - -PING = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'ping'}) -PONG = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'pong'}) - -FIND_NODE = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'find node', "target" : hashid}) -GOT_NODES = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'got nodes', "nodes" : ListMarker({'id': hashid, 'host': string_template, 'port': 1})}) - -STORE_VALUE = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'store value', "key" : hashid, "value" : string_template}) -STORED_VALUE = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'stored value'}) - -GET_VALUE = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'get value', "key" : hashid}) -GOT_VALUES = compile_template({'id' : hashid, 'tid' : hashid, 'type' : 'got values', "values" : ListMarker({'key': hashid, 'value': string_template})}) - -GOT_NODES_OR_VALUES = compile_template([GOT_NODES, GOT_VALUES]) - - -class MessageFactory: - def __init__(self, id): - self.id = id - - def encodePing(self, tid): - return bencode({'id' : self.id, 'tid' : tid, 'type' : 'ping'}) - def decodePing(self, msg): - msg = bdecode(msg) - PING(msg) - return msg - - def encodePong(self, tid): - msg = {'id' : self.id, 'tid' : tid, 'type' : 'pong'} - PONG(msg) - return bencode(msg) - def decodePong(self, msg): - msg = bdecode(msg) - PONG(msg) - return msg - - def encodeFindNode(self, tid, target): - return bencode({'id' : self.id, 'tid' : tid, 'type' : 'find node', 'target' : target}) - def decodeFindNode(self, msg): - msg = bdecode(msg) - FIND_NODE(msg) - return msg - - def encodeStoreValue(self, tid, key, value): - return bencode({'id' : self.id, 'tid' : tid, 'key' : key, 'type' : 'store value', 'value' : value}) - def decodeStoreValue(self, msg): - msg = bdecode(msg) - STORE_VALUE(msg) - return msg - - - def encodeStoredValue(self, tid): - return bencode({'id' : self.id, 'tid' : tid, 'type' : 'stored value'}) - def decodeStoredValue(self, msg): - msg = bdecode(msg) - STORED_VALUE(msg) - return msg - - - def encodeGetValue(self, tid, key): - return bencode({'id' : self.id, 'tid' : tid, 'key' : key, 'type' : 'get value'}) - def decodeGetValue(self, msg): - msg = bdecode(msg) - GET_VALUE(msg) - return msg - - def encodeGotNodes(self, tid, nodes): - n = [] - for node in nodes: - n.append({'id' : node.id, 'host' : node.host, 'port' : node.port}) - return bencode({'id' : self.id, 'tid' : tid, 'type' : 'got nodes', 'nodes' : n}) - def decodeGotNodes(self, msg): - msg = bdecode(msg) - GOT_NODES(msg) - return msg - - def encodeGotValues(self, tid, values): - n = [] - for value in values: - n.append({'key' : value[0], 'value' : value[1]}) - return bencode({'id' : self.id, 'tid' : tid, 'type' : 'got values', 'values' : n}) - def decodeGotValues(self, msg): - msg = bdecode(msg) - GOT_VALUES(msg) - return msg - - - -###### -import unittest - -class TestMessageEncoding(unittest.TestCase): - def setUp(self): - from sha import sha - self.a = sha('a').digest() - self.b = sha('b').digest() - - - def test_ping(self): - m = MessageFactory(self.a) - s = m.encodePing(self.b) - msg = m.decodePing(s) - PING(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.b) - - def test_pong(self): - m = MessageFactory(self.a) - s = m.encodePong(self.b) - msg = m.decodePong(s) - PONG(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.b) - - def test_find_node(self): - m = MessageFactory(self.a) - s = m.encodeFindNode(self.a, self.b) - msg = m.decodeFindNode(s) - FIND_NODE(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.a) - assert(msg['target'] == self.b) - - def test_store_value(self): - m = MessageFactory(self.a) - s = m.encodeStoreValue(self.a, self.b, 'foo') - msg = m.decodeStoreValue(s) - STORE_VALUE(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.a) - assert(msg['key'] == self.b) - assert(msg['value'] == 'foo') - - def test_stored_value(self): - m = MessageFactory(self.a) - s = m.encodeStoredValue(self.b) - msg = m.decodeStoredValue(s) - STORED_VALUE(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.b) - - def test_get_value(self): - m = MessageFactory(self.a) - s = m.encodeGetValue(self.a, self.b) - msg = m.decodeGetValue(s) - GET_VALUE(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.a) - assert(msg['key'] == self.b) - - def test_got_nodes(self): - m = MessageFactory(self.a) - s = m.encodeGotNodes(self.a, [Node(self.b, 'localhost', 2002), Node(self.a, 'localhost', 2003)]) - msg = m.decodeGotNodes(s) - GOT_NODES(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.a) - assert(msg['nodes'][0]['id'] == self.b) - - def test_got_values(self): - m = MessageFactory(self.a) - s = m.encodeGotValues(self.a, [(self.b, 'localhost')]) - msg = m.decodeGotValues(s) - GOT_VALUES(msg) - assert(msg['id'] == self.a) - assert(msg['tid'] == self.a) - - -if __name__ == "__main__": - unittest.main() diff --git a/node.py b/node.py index cb1940e..6bc2ef2 100644 --- a/node.py +++ b/node.py @@ -14,6 +14,9 @@ class Node: def updateLastSeen(self): self.lastSeen = time.time() + def senderDict(self): + return {'id': self.id, 'port' : self.port, 'host' : self.host} + def __repr__(self): return `(self.id, self.host, self.port)` diff --git a/test.py b/test.py index 044aeff..2d785ea 100644 --- a/test.py +++ b/test.py @@ -1,10 +1,9 @@ import unittest import hash, node, messages -import listener, dispatcher import ktable, transactions, khashmir - +import protocol import bencode, btemplate -tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'bencode', 'btemplate', 'listener', 'messages', 'dispatcher', 'transactions', 'ktable']) +tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'bencode', 'btemplate', 'messages', 'transactions', 'ktable', 'protocol']) result = unittest.TextTestRunner().run(tests) diff --git a/transactions.py b/transactions.py deleted file mode 100644 index b862aaa..0000000 --- a/transactions.py +++ /dev/null @@ -1,71 +0,0 @@ -import messages -from dispatcher import Transaction - -class TransactionFactory: - def __init__(self, id, dispatcher): - self.id = id - self.dispatcher = dispatcher - self.mf = messages.MessageFactory(self.id) - - def Ping(self, node, response, default): - """ create a ping transaction """ - t = Transaction(self.dispatcher, node, response, default) - str = self.mf.encodePing(t.id) - t.setPayload(str) - t.setResponseTemplate(messages.PONG) - return t - - def FindNode(self, target, key, response, default): - """ find node query """ - t = Transaction(self.dispatcher, target, response, default) - str = self.mf.encodeFindNode(t.id, key) - t.setPayload(str) - t.setResponseTemplate(messages.GOT_NODES) - return t - - def StoreValue(self, target, key, value, response, default): - """ find node query """ - t = Transaction(self.dispatcher, target, response, default) - str = self.mf.encodeStoreValue(t.id, key, value) - t.setPayload(str) - t.setResponseTemplate(messages.STORED_VALUE) - return t - - def GetValue(self, target, key, response, default): - """ find value query, response is GOT_VALUES or GOT_NODES! """ - t = Transaction(self.dispatcher, target, response, default) - str = self.mf.encodeGetValue(t.id, key) - t.setPayload(str) - t.setResponseTemplate(messages.GOT_NODES_OR_VALUES) - return t - - def Pong(self, ping_t): - """ create a pong response to ping transaction """ - t = Transaction(self.dispatcher, ping_t.target, None, None, ping_t.id) - str = self.mf.encodePong(t.id) - t.setPayload(str) - return t - - def GotNodes(self, findNode_t, nodes): - """ respond with gotNodes """ - t = Transaction(self.dispatcher, findNode_t.target, None, None, findNode_t.id) - str = self.mf.encodeGotNodes(t.id, nodes) - t.setPayload(str) - return t - - def GotValues(self, findNode_t, values): - """ respond with gotNodes """ - t = Transaction(self.dispatcher, findNode_t.target, None, None, findNode_t.id) - str = self.mf.encodeGotValues(t.id, values) - t.setPayload(str) - return t - - def StoredValue(self, tr): - """ store value response, really just a pong """ - t = Transaction(self.dispatcher, tr.target, None, None, id = tr.id) - str = self.mf.encodeStoredValue(t.id) - t.setPayload(str) - return t - - -########### diff --git a/xmlrpcclient.py b/xmlrpcclient.py new file mode 100644 index 0000000..866039f --- /dev/null +++ b/xmlrpcclient.py @@ -0,0 +1,38 @@ +from twisted.internet.protocol import ClientFactory +from twisted.protocols.http import HTTPClient +from twisted.internet.defer import Deferred + +from xmlrpclib import loads, dumps + +class XMLRPCClient(HTTPClient): + def connectionMade(self): + payload = dumps(self.args, self.method) + self.sendCommand('POST', '/RPC2') + self.sendHeader('User-Agent', 'Python/Twisted XMLRPC 0.1') + self.sendHeader('Content-Type', 'text/xml') + self.sendHeader('Content-Length', len(payload)) + self.endHeaders() + self.transport.write(payload) + self.transport.write('\r\n') + + def handleResponse(self, buf): + args, name = loads(buf) + apply(self.d.callback, args) + + +class XMLRPCClientFactory(ClientFactory): + def __init__(self, method, args, callback=None, errback=None): + self.method = method + self.args = args + self.d = Deferred() + if callback: + self.d.addCallback(callback) + if errback: + self.d.addErrback(errback) + + def buildProtocol(self, addr): + prot = XMLRPCClient() + prot.method = self.method + prot.args = self.args + prot.d = self.d + return prot