+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self._loadRoutingTable()
+ self.app = Application("xmlrpc")
+ self.app.listenTCP(port, server.Site(self))
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0].decode('base64')
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key text, value text, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id text primary key, host text, port number);
+
+ create table self (num number primary key, id text);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+
+ def _dumpRoutingTable(self):
+ """
+ save routing table nodes to the database
+ """
+ self.store.autocommit = 0;
+ c = self.store.cursor()
+ c.execute("delete from nodes where id not NULL;")
+ for bucket in self.table.buckets:
+ for node in bucket.l:
+ d = node.senderDict()
+ c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
+ self.store.commit()
+ self.store.autocommit = 1;
+
+ def _loadRoutingTable(self):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ c = self.store.cursor()
+ c.execute("select * from nodes;")
+ for rec in c.fetchall():
+ n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ self.table.insertNode(n, contacted=0)
+
+ def render(self, request):
+ """
+ Override the built in render so we can have access to the request object!
+ note, crequest is probably only valid on the initial call (not after deferred!)
+ """
+ self.crequest = request
+ return xmlrpc.XMLRPC.render(self, request)
+
+
+ #######
+ ####### LOCAL INTERFACE - use these methods!
+ def addContact(self, host, port):
+ """
+ ping this node and add the contact info to the table on pong!
+ """
+ n =Node().init(const.NULL_ID, host, port) # note, we
+ self.sendPing(n)
+
+ ## this call is async!
+ def findNode(self, id, callback, errback=None):
+ """ returns the contact info for node, or the k closest nodes, from the global table """
+ # get K nodes out of local table/cache, or the node we want
+ nodes = self.table.findNodes(id)
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+ if len(nodes) == 1 and nodes[0].id == id :
+ d.callback(nodes)
+ else:
+ # create our search state
+ state = FindNode(self, id, d.callback)
+ reactor.callFromThread(state.goWithNodes, nodes)