- __slots__ = ['listener', 'node', 'table', 'store', 'app']
- def __init__(self, host, port):
- self.node = Node(newID(), host, port)
- self.table = KTable(self.node)
- from twisted.internet.app import Application
- from twisted.web import server
- self.app = Application("xmlrpc")
- self.app.listenTCP(port, server.Site(self))
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
-
- def render(self, request):
- """
- Override the built in render so we can have access to the request object!
- note, crequest is probably only valid on the initial call (not after deferred!)
- """
- self.crequest = request
- return xmlrpc.XMLRPC.render(self, request)
-
-
- #######
- ####### LOCAL INTERFACE - use these methods!
- def addContact(self, host, port):
- """
- ping this node and add the contact info to the table on pong!
- """
- n =Node(" "*20, host, port) # note, we
- self.sendPing(n)
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self._loadRoutingTable()
+ self.app = Application("xmlrpc")
+ self.app.listenTCP(port, server.Site(self))
+ self.last = time.time()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0].decode('base64')
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key text, value text, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id text primary key, host text, port number);
+
+ create table self (num number primary key, id text);
+ """
+ c = self.store.cursor()
+ c.execute(s)