-class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
- def __init__(self, host, port):
- self.node = Node().init(newID(), host, port)
- self.table = KTable(self.node)
- self.app = Application("xmlrpc")
- self.app.listenTCP(port, server.Site(self))
-
- ## these databases may be more suited to on-disk rather than in-memory
- # h((key, value)) -> (key, value, time) mappings
- self.store = db.DB()
- self.store.open(None, None, db.DB_BTREE)
-
- # <insert time> -> h((key, value))
- self.itime = db.DB()
- self.itime.set_flags(db.DB_DUP)
- self.itime.open(None, None, db.DB_BTREE)
-
- # key -> h((key, value))
- self.kw = db.DB()
- self.kw.set_flags(db.DB_DUP)
- self.kw.open(None, None, db.DB_BTREE)
-
- KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
-
- def render(self, request):
- """
- Override the built in render so we can have access to the request object!
- note, crequest is probably only valid on the initial call (not after deferred!)
- """
- self.crequest = request
- return xmlrpc.XMLRPC.render(self, request)
+class Khashmir(protocol.Factory):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.port = port
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self.app = service.Application("krpc")
+ self.udp = krpc.hostbroker(self)
+ self.udp.protocol = krpc.KRPC
+ self.listenport = reactor.listenUDP(port, self.udp)
+ self.last = time.time()
+ self._loadRoutingTable()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def __del__(self):
+ self.listenport.stopListening()
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0]
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id binary primary key, host text, port number);
+
+ create table self (num number primary key, id binary);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+
+ def _dumpRoutingTable(self):
+ """
+ save routing table nodes to the database
+ """
+ self.store.autocommit = 0;
+ c = self.store.cursor()
+ c.execute("delete from nodes where id not NULL;")
+ for bucket in self.table.buckets:
+ for node in bucket.l:
+ c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
+ self.store.commit()
+ self.store.autocommit = 1;
+
+ def _loadRoutingTable(self):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ c = self.store.cursor()
+ c.execute("select * from nodes;")
+ for rec in c.fetchall():
+ n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ self.table.insertNode(n, contacted=0)
+