__slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
def __init__(self, host, port, db='khashmir.db'):
self.setup(host, port, db)
+
def setup(self, host, port, db='khashmir.db'):
- self.node = Node().init(newID(), host, port)
+ self.findDB(db)
+ self.node = self.loadSelfNode(host, port)
self.table = KTable(self.node)
+ self.loadRoutingTable()
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
- self.findDB(db)
self.last = time.time()
KeyExpirer(store=self.store)
-
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+ self.refreshTable(force=1)
+
+ def loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0].decode('base64')
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self):
+ self.saveSelfNode()
+ self.dumpRoutingTable()
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
def findDB(self, db):
import os
try:
create index kv_timestamp on kv(time);
create table nodes (id text primary key, host text, port number);
+
+ create table self (num number primary key, id text);
"""
c = self.store.cursor()
c.execute(s)
+ def dumpRoutingTable(self):
+ """
+ save routing table nodes to the database
+ """
+ self.store.autocommit = 0;
+ c = self.store.cursor()
+ c.execute("delete from nodes where id not NULL;")
+ for bucket in self.table.buckets:
+ for node in bucket.l:
+ d = node.senderDict()
+ c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
+ self.store.commit()
+ self.store.autocommit = 1;
+
+ def loadRoutingTable(self):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ c = self.store.cursor()
+ c.execute("select * from nodes;")
+ for rec in c.fetchall():
+ n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ self.table.insertNode(n, contacted=0)
+
def render(self, request):
"""
Override the built in render so we can have access to the request object!
"""
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
- def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+ def _pongHandler(args, node=node, table=self.table):
l, sender = args
- if id != const.NULL_ID and id != sender['id'].decode('base64'):
+ if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
# whoah, got response from different peer than we were expecting
- pass
+ self.table.invalidateNode(node)
else:
- sender['host'] = host
- sender['port'] = port
+ sender['host'] = node.host
+ sender['port'] = node.port
n = Node().initWithDict(sender)
table.insertNode(n)
return
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
self.findNode(id, callback)
- def refreshTable(self):
+ def refreshTable(self, force=0):
"""
-
+ force=1 will refresh table regardless of last bucket access time
"""
def callback(nodes):
pass
for bucket in self.table.buckets:
- if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+ if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
return {'nodes' : nodes}, self.node.senderDict()
#------ testing
-def test_build_net(quiet=0, peers=24, host='localhost', pause=0):
+def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001):
from whrandom import randrange
import threading
import thread
import sys
- port = 2001
+ port = startport
l = []
if not quiet: