now we store and retrieve node information from our database...
authorburris <burris>
Tue, 3 Dec 2002 03:49:58 +0000 (03:49 +0000)
committerburris <burris>
Tue, 3 Dec 2002 03:49:58 +0000 (03:49 +0000)
const.py
khashmir.py

index 4926f008690d5a3d8e4cb953f67e765e883ed0d5..d81ecbd45f021de20eebd6c2c3faca37240a0f94 100644 (file)
--- a/const.py
+++ b/const.py
@@ -7,12 +7,15 @@ main.installReactor(reactor)
 # magic id to use before we know a peer's id
 NULL_ID =  20 * '\0'
 
-# Kademlia "K" constant
+# Kademlia "K" constant, this should be an even number
 K = 8
 
 # SHA1 is 160 bits long
 HASH_LENGTH = 160
 
+# checkpoint every this many seconds
+CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes
+
 
 ### SEARCHING/STORING
 # concurrent xmlrpc calls per find node/value request!
index a9bc5b55835bdbdadad3fd233c9b77de2bfc49e7..5e7c7171219af99385588b21fbb9d45348ecb20e 100644 (file)
@@ -30,15 +30,41 @@ class Khashmir(xmlrpc.XMLRPC):
        __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
        def __init__(self, host, port, db='khashmir.db'):
                self.setup(host, port, db)
+               
        def setup(self, host, port, db='khashmir.db'):
-               self.node = Node().init(newID(), host, port)
+               self.findDB(db)
+               self.node = self.loadSelfNode(host, port)
                self.table = KTable(self.node)
+               self.loadRoutingTable()
                self.app = Application("xmlrpc")
                self.app.listenTCP(port, server.Site(self))
-               self.findDB(db)
                self.last = time.time()
                KeyExpirer(store=self.store)
-
+               reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+               self.refreshTable(force=1)
+               
+       def loadSelfNode(self, host, port):
+               c = self.store.cursor()
+               c.execute('select id from self where num = 0;')
+               if c.rowcount > 0:
+                       id = c.fetchone()[0].decode('base64')
+               else:
+                       id = newID()
+               return Node().init(id, host, port)
+               
+       def saveSelfNode(self):
+               self.store.autocommit = 0
+               c = self.store.cursor()
+               c.execute('delete from self where num = 0;')
+               c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
+               self.store.commit()
+               self.store.autocommit = 1
+               
+       def checkpoint(self):
+               self.saveSelfNode()
+               self.dumpRoutingTable()
+               reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+               
        def findDB(self, db):
                import os
                try:
@@ -65,10 +91,37 @@ class Khashmir(xmlrpc.XMLRPC):
                        create index kv_timestamp on kv(time);
                        
                        create table nodes (id text primary key, host text, port number);
+                       
+                       create table self (num number primary key, id text);
                        """
                c = self.store.cursor()
                c.execute(s)
 
+       def dumpRoutingTable(self):
+               """
+                       save routing table nodes to the database
+               """
+               self.store.autocommit = 0;
+               c = self.store.cursor()
+               c.execute("delete from nodes where id not NULL;")
+               for bucket in self.table.buckets:
+                       for node in bucket.l:
+                               d = node.senderDict()
+                               c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
+               self.store.commit()
+               self.store.autocommit = 1;
+               
+       def loadRoutingTable(self):
+               """
+                       load routing table nodes from database
+                       it's usually a good idea to call refreshTable(force=1) after loading the table
+               """
+               c = self.store.cursor()
+               c.execute("select * from nodes;")
+               for rec in c.fetchall():
+                       n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+                       self.table.insertNode(n, contacted=0)
+                       
        def render(self, request):
                """
                        Override the built in render so we can have access to the request object!
@@ -182,14 +235,14 @@ class Khashmir(xmlrpc.XMLRPC):
                """
                df = node.ping(self.node.senderDict())
                ## these are the callbacks we use when we issue a PING
-               def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
+               def _pongHandler(args, node=node, table=self.table):
                        l, sender = args
-                       if id != const.NULL_ID and id != sender['id'].decode('base64'):
+                       if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
                                # whoah, got response from different peer than we were expecting
-                               pass
+                               self.table.invalidateNode(node)
                        else:
-                               sender['host'] = host
-                               sender['port'] = port
+                               sender['host'] = node.host
+                               sender['port'] = node.port
                                n = Node().initWithDict(sender)
                                table.insertNode(n)
                                return
@@ -207,15 +260,15 @@ class Khashmir(xmlrpc.XMLRPC):
                id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
                self.findNode(id, callback)
 
-       def refreshTable(self):
+       def refreshTable(self, force=0):
                """
-                       
+                       force=1 will refresh table regardless of last bucket access time
                """
                def callback(nodes):
                        pass
        
                for bucket in self.table.buckets:
-                       if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
+                       if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
                                id = newIDInRange(bucket.min, bucket.max)
                                self.findNode(id, callback)
 
@@ -286,12 +339,12 @@ class Khashmir(xmlrpc.XMLRPC):
                        return {'nodes' : nodes}, self.node.senderDict()
 #------ testing
 
-def test_build_net(quiet=0, peers=24, host='localhost',  pause=0):
+def test_build_net(quiet=0, peers=24, host='localhost',  pause=0, startport=2001):
        from whrandom import randrange
        import threading
        import thread
        import sys
-       port = 2001
+       port = startport
        l = []
                
        if not quiet: