self.setup(host, port, db)
def setup(self, host, port, db='khashmir.db'):
- self.findDB(db)
- self.node = self.loadSelfNode(host, port)
+ self._findDB(db)
+ self.node = self._loadSelfNode(host, port)
self.table = KTable(self.node)
- self.loadRoutingTable()
+ self._loadRoutingTable()
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
self.last = time.time()
KeyExpirer(store=self.store)
- reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
- self.refreshTable(force=1)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
- def loadSelfNode(self, host, port):
+ def _loadSelfNode(self, host, port):
c = self.store.cursor()
c.execute('select id from self where num = 0;')
if c.rowcount > 0:
id = newID()
return Node().init(id, host, port)
- def saveSelfNode(self):
+ def _saveSelfNode(self):
self.store.autocommit = 0
c = self.store.cursor()
c.execute('delete from self where num = 0;')
self.store.commit()
self.store.autocommit = 1
- def checkpoint(self):
- self.saveSelfNode()
- self.dumpRoutingTable()
- reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
- def findDB(self, db):
+ def _findDB(self, db):
import os
try:
os.stat(db)
except OSError:
- self.createNewDB(db)
+ self._createNewDB(db)
else:
- self.loadDB(db)
+ self._loadDB(db)
- def loadDB(self, db):
+ def _loadDB(self, db):
try:
self.store = sqlite.connect(db=db)
self.store.autocommit = 1
import traceback
raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
- def createNewDB(self, db):
+ def _createNewDB(self, db):
self.store = sqlite.connect(db=db)
self.store.autocommit = 1
s = """
c = self.store.cursor()
c.execute(s)
- def dumpRoutingTable(self):
+ def _dumpRoutingTable(self):
"""
save routing table nodes to the database
"""
self.store.commit()
self.store.autocommit = 1;
- def loadRoutingTable(self):
+ def _loadRoutingTable(self):
"""
load routing table nodes from database
it's usually a good idea to call refreshTable(force=1) after loading the table
from hash import newID
-def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001):
+def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
+ import thread
+ l = []
+ for i in xrange(peers):
+ a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
+ l.append(a)
+ thread.start_new_thread(l[0].app.run, ())
+ for peer in l[1:]:
+ peer.app.run()
+ return l
+
+def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'):
+ from whrandom import randrange
+ import threading
+ import thread
+ import sys
port = startport
l = []
-
if not quiet:
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
+ a = Khashmir(host, port + i, db = dbprefix +`i`)
l.append(a)
from const import reactor, NULL_ID
class KNode(Node):
- def makeResponse(self, df):
- def _callback(args, d=df):
- try:
- l, sender = args
- except:
- d.callback(args)
- else:
- if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
- d.errback()
- else:
- d.callback(args)
- return _callback
- def ping(self, sender):
- df = Deferred()
- f = factory('ping', (sender,), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def findNode(self, target, sender):
- df = Deferred()
- f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def storeValue(self, key, value, sender):
- df = Deferred()
- f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def findValue(self, key, sender):
- df = Deferred()
- f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
+ def makeResponse(self, df):
+ """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """
+ def _callback(args, d=df):
+ try:
+ l, sender = args
+ except:
+ d.callback(args)
+ else:
+ if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
+ d.errback()
+ else:
+ d.callback(args)
+ return _callback
+ def ping(self, sender):
+ df = Deferred()
+ f = factory('ping', (sender,), self.makeResponse(df), df.errback)
+ reactor.connectTCP(self.host, self.port, f)
+ return df
+ def findNode(self, target, sender):
+ df = Deferred()
+ f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
+ reactor.connectTCP(self.host, self.port, f)
+ return df
+ def storeValue(self, key, value, sender):
+ df = Deferred()
+ f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
+ reactor.connectTCP(self.host, self.port, f)
+ return df
+ def findValue(self, key, sender):
+ df = Deferred()
+ f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
+ reactor.connectTCP(self.host, self.port, f)
+ return df