-class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
- def __init__(self, host, port, db='khashmir.db'):
- self.setup(host, port, db)
- def setup(self, host, port, db='khashmir.db'):
- self.node = Node().init(newID(), host, port)
- self.table = KTable(self.node)
- self.app = Application("xmlrpc")
- self.app.listenTCP(port, server.Site(self))
- self.findDB(db)
- self.last = time.time()
- KeyExpirer(store=self.store)
-
- def findDB(self, db):
- import os
- try:
- os.stat(db)
- except OSError:
- self.createNewDB(db)
- else:
- self.loadDB(db)
-
- def loadDB(self, db):
- try:
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- except:
- import traceback
- raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
-
- def createNewDB(self, db):
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- s = """
- create table kv (key text, value text, time timestamp, primary key (key, value));
- create index kv_key on kv(key);
- create index kv_timestamp on kv(time);
-
- create table nodes (id text primary key, host text, port number);
- """
- c = self.store.cursor()
- c.execute(s)
-
- def render(self, request):
- """
- Override the built in render so we can have access to the request object!
- note, crequest is probably only valid on the initial call (not after deferred!)
- """
- self.crequest = request
- return xmlrpc.XMLRPC.render(self, request)
-
+class Khashmir(protocol.Factory):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.port = port
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self.app = service.Application("krpc")
+ self.udp = krpc.hostbroker(self)
+ self.udp.protocol = krpc.KRPC
+ self.listenport = reactor.listenUDP(port, self.udp)
+ self.last = time.time()
+ self._loadRoutingTable()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def __del__(self):
+ self.listenport.stopListening()
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0]
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id binary primary key, host text, port number);
+
+ create table self (num number primary key, id binary);
+ """
+ c = self.store.cursor()
+ c.execute(s)