From 2ce2ba6146457d6c321e8ece8116d422cda39a70 Mon Sep 17 00:00:00 2001 From: burris Date: Tue, 10 Dec 2002 04:29:09 +0000 Subject: [PATCH] reformatting plus changed some method names, added a new function for "testing" --- khashmir.py | 57 +++++++++++++++++++++++++++++----------------- knode.py | 65 +++++++++++++++++++++++++++-------------------------- 2 files changed, 69 insertions(+), 53 deletions(-) diff --git a/khashmir.py b/khashmir.py index 8a1bfe2..3ca5d72 100644 --- a/khashmir.py +++ b/khashmir.py @@ -32,18 +32,18 @@ class Khashmir(xmlrpc.XMLRPC): self.setup(host, port, db) def setup(self, host, port, db='khashmir.db'): - self.findDB(db) - self.node = self.loadSelfNode(host, port) + self._findDB(db) + self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) - self.loadRoutingTable() + self._loadRoutingTable() self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) self.last = time.time() KeyExpirer(store=self.store) - reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) - self.refreshTable(force=1) + #self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) - def loadSelfNode(self, host, port): + def _loadSelfNode(self, host, port): c = self.store.cursor() c.execute('select id from self where num = 0;') if c.rowcount > 0: @@ -52,7 +52,7 @@ class Khashmir(xmlrpc.XMLRPC): id = newID() return Node().init(id, host, port) - def saveSelfNode(self): + def _saveSelfNode(self): self.store.autocommit = 0 c = self.store.cursor() c.execute('delete from self where num = 0;') @@ -60,21 +60,22 @@ class Khashmir(xmlrpc.XMLRPC): self.store.commit() self.store.autocommit = 1 - def checkpoint(self): - self.saveSelfNode() - self.dumpRoutingTable() - reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + if auto: + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) - def findDB(self, db): + def _findDB(self, db): import os try: os.stat(db) except OSError: - self.createNewDB(db) + self._createNewDB(db) else: - self.loadDB(db) + self._loadDB(db) - def loadDB(self, db): + def _loadDB(self, db): try: self.store = sqlite.connect(db=db) self.store.autocommit = 1 @@ -82,7 +83,7 @@ class Khashmir(xmlrpc.XMLRPC): import traceback raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback - def createNewDB(self, db): + def _createNewDB(self, db): self.store = sqlite.connect(db=db) self.store.autocommit = 1 s = """ @@ -97,7 +98,7 @@ class Khashmir(xmlrpc.XMLRPC): c = self.store.cursor() c.execute(s) - def dumpRoutingTable(self): + def _dumpRoutingTable(self): """ save routing table nodes to the database """ @@ -111,7 +112,7 @@ class Khashmir(xmlrpc.XMLRPC): self.store.commit() self.store.autocommit = 1; - def loadRoutingTable(self): + def _loadRoutingTable(self): """ load routing table nodes from database it's usually a good idea to call refreshTable(force=1) after loading the table @@ -345,15 +346,29 @@ from sha import sha from hash import newID -def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001): +def test_net(peers=24, startport=2001, dbprefix='/tmp/test'): + import thread + l = [] + for i in xrange(peers): + a = Khashmir('localhost', startport + i, db = dbprefix+`i`) + l.append(a) + thread.start_new_thread(l[0].app.run, ()) + for peer in l[1:]: + peer.app.run() + return l + +def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'): + from whrandom import randrange + import threading + import thread + import sys port = startport l = [] - if not quiet: print "Building %s peer table." % peers for i in xrange(peers): - a = Khashmir(host, port + i, db = '/tmp/test'+`i`) + a = Khashmir(host, port + i, db = dbprefix +`i`) l.append(a) diff --git a/knode.py b/knode.py index a15c568..ef82260 100644 --- a/knode.py +++ b/knode.py @@ -4,35 +4,36 @@ from xmlrpcclient import XMLRPCClientFactory as factory from const import reactor, NULL_ID class KNode(Node): - def makeResponse(self, df): - def _callback(args, d=df): - try: - l, sender = args - except: - d.callback(args) - else: - if self.id != NULL_ID and sender['id'] != self._senderDict['id']: - d.errback() - else: - d.callback(args) - return _callback - def ping(self, sender): - df = Deferred() - f = factory('ping', (sender,), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def findNode(self, target, sender): - df = Deferred() - f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def storeValue(self, key, value, sender): - df = Deferred() - f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def findValue(self, key, sender): - df = Deferred() - f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df + def makeResponse(self, df): + """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """ + def _callback(args, d=df): + try: + l, sender = args + except: + d.callback(args) + else: + if self.id != NULL_ID and sender['id'] != self._senderDict['id']: + d.errback() + else: + d.callback(args) + return _callback + def ping(self, sender): + df = Deferred() + f = factory('ping', (sender,), self.makeResponse(df), df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def findNode(self, target, sender): + df = Deferred() + f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def storeValue(self, key, value, sender): + df = Deferred() + f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback) + reactor.connectTCP(self.host, self.port, f) + return df + def findValue(self, key, sender): + df = Deferred() + f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback) + reactor.connectTCP(self.host, self.port, f) + return df -- 2.39.5