From 2e90d76784ea1a297dcdb1d95a354402a881337a Mon Sep 17 00:00:00 2001 From: burris Date: Tue, 3 Dec 2002 03:49:58 +0000 Subject: [PATCH] now we store and retrieve node information from our database... --- const.py | 5 +++- khashmir.py | 79 ++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/const.py b/const.py index 4926f00..d81ecbd 100644 --- a/const.py +++ b/const.py @@ -7,12 +7,15 @@ main.installReactor(reactor) # magic id to use before we know a peer's id NULL_ID = 20 * '\0' -# Kademlia "K" constant +# Kademlia "K" constant, this should be an even number K = 8 # SHA1 is 160 bits long HASH_LENGTH = 160 +# checkpoint every this many seconds +CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes + ### SEARCHING/STORING # concurrent xmlrpc calls per find node/value request! diff --git a/khashmir.py b/khashmir.py index a9bc5b5..5e7c717 100644 --- a/khashmir.py +++ b/khashmir.py @@ -30,15 +30,41 @@ class Khashmir(xmlrpc.XMLRPC): __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') def __init__(self, host, port, db='khashmir.db'): self.setup(host, port, db) + def setup(self, host, port, db='khashmir.db'): - self.node = Node().init(newID(), host, port) + self.findDB(db) + self.node = self.loadSelfNode(host, port) self.table = KTable(self.node) + self.loadRoutingTable() self.app = Application("xmlrpc") self.app.listenTCP(port, server.Site(self)) - self.findDB(db) self.last = time.time() KeyExpirer(store=self.store) - + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + self.refreshTable(force=1) + + def loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0].decode('base64') + else: + id = newID() + return Node().init(id, host, port) + + def saveSelfNode(self): + self.store.autocommit = 0 + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64')) + self.store.commit() + self.store.autocommit = 1 + + def checkpoint(self): + self.saveSelfNode() + self.dumpRoutingTable() + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + def findDB(self, db): import os try: @@ -65,10 +91,37 @@ class Khashmir(xmlrpc.XMLRPC): create index kv_timestamp on kv(time); create table nodes (id text primary key, host text, port number); + + create table self (num number primary key, id text); """ c = self.store.cursor() c.execute(s) + def dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + self.store.autocommit = 0; + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + d = node.senderDict() + c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port'])) + self.store.commit() + self.store.autocommit = 1; + + def loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + self.table.insertNode(n, contacted=0) + def render(self, request): """ Override the built in render so we can have access to the request object! @@ -182,14 +235,14 @@ class Khashmir(xmlrpc.XMLRPC): """ df = node.ping(self.node.senderDict()) ## these are the callbacks we use when we issue a PING - def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table): + def _pongHandler(args, node=node, table=self.table): l, sender = args - if id != const.NULL_ID and id != sender['id'].decode('base64'): + if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'): # whoah, got response from different peer than we were expecting - pass + self.table.invalidateNode(node) else: - sender['host'] = host - sender['port'] = port + sender['host'] = node.host + sender['port'] = node.port n = Node().initWithDict(sender) table.insertNode(n) return @@ -207,15 +260,15 @@ class Khashmir(xmlrpc.XMLRPC): id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) self.findNode(id, callback) - def refreshTable(self): + def refreshTable(self, force=0): """ - + force=1 will refresh table regardless of last bucket access time """ def callback(nodes): pass for bucket in self.table.buckets: - if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS: + if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) @@ -286,12 +339,12 @@ class Khashmir(xmlrpc.XMLRPC): return {'nodes' : nodes}, self.node.senderDict() #------ testing -def test_build_net(quiet=0, peers=24, host='localhost', pause=0): +def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001): from whrandom import randrange import threading import thread import sys - port = 2001 + port = startport l = [] if not quiet: -- 2.39.5