-class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
- def __init__(self, host, port, db='khashmir.db'):
- self.setup(host, port, db)
-
- def setup(self, host, port, db='khashmir.db'):
- self._findDB(db)
- self.node = self._loadSelfNode(host, port)
- self.table = KTable(self.node)
- self._loadRoutingTable()
- self.app = Application("xmlrpc")
- self.app.listenTCP(port, server.Site(self))
- self.last = time.time()
- KeyExpirer(store=self.store)
- #self.refreshTable(force=1)
- reactor.callLater(60, self.checkpoint, (1,))
-
- def _loadSelfNode(self, host, port):
- c = self.store.cursor()
- c.execute('select id from self where num = 0;')
- if c.rowcount > 0:
- id = c.fetchone()[0].decode('base64')
- else:
- id = newID()
- return Node().init(id, host, port)
-
- def _saveSelfNode(self):
- self.store.autocommit = 0
- c = self.store.cursor()
- c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
- self.store.commit()
- self.store.autocommit = 1
-
- def checkpoint(self, auto=0):
- self._saveSelfNode()
- self._dumpRoutingTable()
- if auto:
- reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
-
- def _findDB(self, db):
- import os
- try:
- os.stat(db)
- except OSError:
- self._createNewDB(db)
- else:
- self._loadDB(db)
-
- def _loadDB(self, db):
- try:
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- except:
- import traceback
- raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
-
- def _createNewDB(self, db):
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- s = """
- create table kv (key text, value text, time timestamp, primary key (key, value));
- create index kv_key on kv(key);
- create index kv_timestamp on kv(time);
-
- create table nodes (id text primary key, host text, port number);
-
- create table self (num number primary key, id text);
- """
- c = self.store.cursor()
- c.execute(s)
-
- def _dumpRoutingTable(self):
- """
- save routing table nodes to the database
- """
- self.store.autocommit = 0;
- c = self.store.cursor()
- c.execute("delete from nodes where id not NULL;")
- for bucket in self.table.buckets:
- for node in bucket.l:
- d = node.senderDict()
- c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
- self.store.commit()
- self.store.autocommit = 1;
-
- def _loadRoutingTable(self):
- """
- load routing table nodes from database
- it's usually a good idea to call refreshTable(force=1) after loading the table
- """
- c = self.store.cursor()
- c.execute("select * from nodes;")
- for rec in c.fetchall():
- n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
- self.table.insertNode(n, contacted=0)
-
- def render(self, request):
- """
- Override the built in render so we can have access to the request object!
- note, crequest is probably only valid on the initial call (not after deferred!)
- """
- self.crequest = request
- return xmlrpc.XMLRPC.render(self, request)
-
-
- #######
- ####### LOCAL INTERFACE - use these methods!
- def addContact(self, host, port):
- """
- ping this node and add the contact info to the table on pong!
- """
- n =Node().init(const.NULL_ID, host, port) # note, we
- self.sendPing(n)
-
- ## this call is async!
- def findNode(self, id, callback, errback=None):
- """ returns the contact info for node, or the k closest nodes, from the global table """
- # get K nodes out of local table/cache, or the node we want
- nodes = self.table.findNodes(id)
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
- if len(nodes) == 1 and nodes[0].id == id :
- d.callback(nodes)
- else:
- # create our search state
- state = FindNode(self, id, d.callback)
- reactor.callFromThread(state.goWithNodes, nodes)
-
-
- ## also async
- def valueForKey(self, key, callback):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
- """
- nodes = self.table.findNodes(key)
-
- # get locals
- l = self.retrieveValues(key)
- if len(l) > 0:
- reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
-
- # create our search state
- state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes, l)
+class Khashmir(protocol.Factory):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.port = port
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self.app = service.Application("krpc")
+ self.udp = krpc.hostbroker(self)
+ self.udp.protocol = krpc.KRPC
+ self.listenport = reactor.listenUDP(port, self.udp)
+ self.last = time.time()
+ self._loadRoutingTable()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def __del__(self):
+ self.listenport.stopListening()
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0]
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id binary primary key, host text, port number);
+
+ create table self (num number primary key, id binary);
+ """
+ c = self.store.cursor()
+ c.execute(s)