]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
broke up khashmir class into base, read, and write classes, since
authorburris <burris>
Thu, 8 Jul 2004 04:02:59 +0000 (04:02 +0000)
committerburris <burris>
Thu, 8 Jul 2004 04:02:59 +0000 (04:02 +0000)
having generic read/write methods on deployed DHTs is not a great idea

actions now take an "action" method, since it's not always
"storeValueForKey"

buckets now get refreshed

nodes that change their ID are now handled properly

actions.py
const.py
khashmir.py
knode.py
test_khashmir.py

index 6527d2ceb53c120bb60c47ada7552e3959804d9f..e25c82b75ebe5f32d38fd258b7df0fce40de220b 100644 (file)
@@ -7,7 +7,6 @@ from const import reactor
 import const
 
 from khash import intify
-from knode import KNode as Node
 from ktable import KTable, K
 
 class ActionBase:
@@ -49,7 +48,7 @@ class FindNode(ActionBase):
         sender = {'id' : dict["id"]}
         sender['port'] = _krpc_sender[1]        
         sender['host'] = _krpc_sender[0]        
-        sender = Node().initWithDict(sender)
+        sender = self.table.Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
@@ -58,7 +57,7 @@ class FindNode(ActionBase):
         self.outstanding = self.outstanding - 1
         self.answered[sender.id] = 1
         for node in l:
-            n = Node().initWithDict(node)
+            n = self.table.Node().initWithDict(node)
             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
             if not self.found.has_key(n.id):
                 self.found[n.id] = n
@@ -92,7 +91,7 @@ class FindNode(ActionBase):
     
     def makeMsgFailed(self, node):
         def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed %s/%s" % (node.host, node.port)
+            print ">>> find failed %s/%s" % (node.host, node.port), err
             self.table.table.nodeFailed(node)
             self.outstanding = self.outstanding - 1
             self.schedule()
@@ -112,8 +111,12 @@ class FindNode(ActionBase):
         self.schedule()
     
 
-GET_VALUE_TIMEOUT = 15
+get_value_timeout = 15
 class GetValue(FindNode):
+    def __init__(self, table, target, callback, find="findValue"):
+        FindNode.__init__(self, table, target, callback)
+        self.findValue = find
+            
     """ get value task """
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
@@ -121,7 +124,7 @@ class GetValue(FindNode):
         sender = {'id' : dict["id"]}
         sender['port'] = _krpc_sender[1]
         sender['host'] = _krpc_sender[0]                
-        sender = Node().initWithDict(sender)
+        sender = self.table.Node().initWithDict(sender)
         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
         self.table.table.insertNode(sender)
         if self.finished or self.answered.has_key(sender.id):
@@ -133,7 +136,7 @@ class GetValue(FindNode):
         # if we have any closer than what we already got, query them
         if dict.has_key('nodes'):
             for node in dict['nodes']:
-                n = Node().initWithDict(node)
+                n = self.table.Node().initWithDict(node)
                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
                 if not self.found.has_key(n.id):
                     self.found[n.id] = n
@@ -160,11 +163,16 @@ class GetValue(FindNode):
         for node in l[:K]:
             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                df = node.findValue(self.target, self.table.node.id)
-                df.addCallback(self.handleGotNodes)
-                df.addErrback(self.makeMsgFailed(node))
-                self.outstanding = self.outstanding + 1
-                self.queried[node.id] = 1
+                try:
+                    f = getattr(node, self.findValue)
+                except AttributeError:
+                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
+                else:
+                    df = f(self.target, self.table.node.id)
+                    df.addCallback(self.handleGotNodes)
+                    df.addErrback(self.makeMsgFailed(node))
+                    self.outstanding = self.outstanding + 1
+                    self.queried[node.id] = 1
             if self.outstanding >= const.CONCURRENT_REQS:
                 break
         assert(self.outstanding) >=0
@@ -189,11 +197,12 @@ class GetValue(FindNode):
 
 
 class StoreValue(ActionBase):
-    def __init__(self, table, target, value, callback):
+    def __init__(self, table, target, value, callback, store="storeValue"):
         ActionBase.__init__(self, table, target, callback)
         self.value = value
         self.stored = []
-    
+        self.store = store
+        
     def storedValue(self, t, node):
         self.outstanding -= 1
         self.table.insertNode(node)
@@ -206,15 +215,17 @@ class StoreValue(ActionBase):
         else:
             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
                 self.schedule()
-            
+        return t
+    
     def storeFailed(self, t, node):
         print ">>> store failed %s/%s" % (node.host, node.port)
         self.table.nodeFailed(node)
         self.outstanding -= 1
         if self.finished:
-            return
+            return t
         self.schedule()
-        
+        return t
+    
     def schedule(self):
         if self.finished:
             return
@@ -231,13 +242,14 @@ class StoreValue(ActionBase):
             else:
                 if not node.id == self.table.node.id:
                     self.outstanding += 1
-                    if type(self.value) == type([]):
-                        df = node.storeValues(self.target, self.value, self.table.node.id)                        
+                    try:
+                        f = getattr(node, self.store)
+                    except AttributeError:
+                        print ">>> %s doesn't have a %s method!" % (node, self.store)
                     else:
-                        df = node.storeValue(self.target, self.value, self.table.node.id)
-                    
-                    df.addCallback(self.storedValue, node=node)
-                    df.addErrback(self.storeFailed, node=node)
+                        df = f(self.target, self.value, self.table.node.id)
+                        df.addCallback(self.storedValue, node=node)
+                        df.addErrback(self.storeFailed, node=node)
                     
     def goWithNodes(self, nodes):
         self.nodes = nodes
index 42d060ba32bdb5664e875f823316a4d30f94faf1..fc5d1c0a49110071ab1825fd8b0f5b1e4654c128 100644 (file)
--- a/const.py
+++ b/const.py
@@ -47,7 +47,7 @@ MAX_FAILURES = 3
 MIN_PING_INTERVAL = 60 * 15 # fifteen minutes
 
 # refresh buckets that haven't been touched in this long
-BUCKET_STALENESS = 60 # one hour
+BUCKET_STALENESS = 60 * 60 # one hour
 
 
 ###  KEY EXPIRER
index d773a45f8a8cd70e7dc2e7136a67d88ae5f76e1a..46946ebafab4863aa2c83fc0424a94efcf809be4 100644 (file)
@@ -9,7 +9,7 @@ import time
 from sha import sha
 
 from ktable import KTable, K
-from knode import KNode as Node
+from knode import *
 
 from khash import newID, newIDInRange
 
@@ -24,14 +24,17 @@ from twisted.web import server
 threadable.init()
 import sys
 
+from random import randrange
+
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 class KhashmirDBExcept(Exception):
     pass
 
-# this is the main class!
-class Khashmir(protocol.Factory):
+# this is the base class, has base functionality and find node, no key-value mappings
+class KhashmirBase(protocol.Factory):
     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+    _Node = KNodeBase
     def __init__(self, host, port, db='khashmir.db'):
         self.setup(host, port, db)
         
@@ -40,16 +43,21 @@ class Khashmir(protocol.Factory):
         self.port = port
         self.node = self._loadSelfNode(host, port)
         self.table = KTable(self.node)
-        self.app = service.Application("krpc")
+        #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(port, self.udp)
         self.last = time.time()
         self._loadRoutingTable()
         KeyExpirer(store=self.store)
-        #self.refreshTable(force=1)
+        self.refreshTable(force=1)
         reactor.callLater(60, self.checkpoint, (1,))
-        
+
+    def Node(self):
+        n = self._Node()
+        n.table = self.table
+        return n
+    
     def __del__(self):
         self.listenport.stopListening()
         
@@ -60,21 +68,20 @@ class Khashmir(protocol.Factory):
             id = c.fetchone()[0]
         else:
             id = newID()
-        return Node().init(id, host, port)
+        return self._Node().init(id, host, port)
         
     def _saveSelfNode(self):
-        self.store.autocommit = 0
         c = self.store.cursor()
         c.execute('delete from self where num = 0;')
         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
         self.store.commit()
-        self.store.autocommit = 1
         
     def checkpoint(self, auto=0):
         self._saveSelfNode()
         self._dumpRoutingTable()
+        self.refreshTable()
         if auto:
-            reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+            reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
         
     def _findDB(self, db):
         import os
@@ -88,14 +95,13 @@ class Khashmir(protocol.Factory):
     def _loadDB(self, db):
         try:
             self.store = sqlite.connect(db=db)
-            self.store.autocommit = 1
+            #self.store.autocommit = 0
         except:
             import traceback
             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
         
     def _createNewDB(self, db):
         self.store = sqlite.connect(db=db)
-        self.store.autocommit = 1
         s = """
             create table kv (key binary, value binary, time timestamp, primary key (key, value));
             create index kv_key on kv(key);
@@ -107,19 +113,18 @@ class Khashmir(protocol.Factory):
             """
         c = self.store.cursor()
         c.execute(s)
+        self.store.commit()
 
     def _dumpRoutingTable(self):
         """
             save routing table nodes to the database
         """
-        self.store.autocommit = 0;
         c = self.store.cursor()
         c.execute("delete from nodes where id not NULL;")
         for bucket in self.table.buckets:
             for node in bucket.l:
                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
         self.store.commit()
-        self.store.autocommit = 1;
         
     def _loadRoutingTable(self):
         """
@@ -129,7 +134,7 @@ class Khashmir(protocol.Factory):
         c = self.store.cursor()
         c.execute("select * from nodes;")
         for rec in c.fetchall():
-            n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+            n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
             n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
             
@@ -140,7 +145,7 @@ class Khashmir(protocol.Factory):
         """
             ping this node and add the contact info to the table on pong!
         """
-        n =Node().init(const.NULL_ID, host, port) 
+        n =self.Node().init(const.NULL_ID, host, port) 
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.sendPing(n, callback=callback)
 
@@ -161,46 +166,6 @@ class Khashmir(protocol.Factory):
             state = FindNode(self, id, d.callback)
             reactor.callFromThread(state.goWithNodes, nodes)
     
-    
-    ## also async
-    def valueForKey(self, key, callback, searchlocal = 1):
-        """ returns the values found for key in global table
-            callback will be called with a list of values for each peer that returns unique values
-            final callback will be an empty list - probably should change to 'more coming' arg
-        """
-        nodes = self.table.findNodes(key)
-        
-        # get locals
-        if searchlocal:
-            l = self.retrieveValues(key)
-            if len(l) > 0:
-                reactor.callLater(0, callback, (l))
-        else:
-            l = []
-        
-        # create our search state
-        state = GetValue(self, key, callback)
-        reactor.callFromThread(state.goWithNodes, nodes, l)
-
-    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, callback=None):
-        """ stores the value for key in the global table, returns immediately, no status 
-            in this implementation, peers respond but don't indicate status to storing values
-            a key can have many values
-        """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
-            if not response:
-                # default callback
-                def _storedValueHandler(sender):
-                    pass
-                response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response)
-            reactor.callFromThread(action.goWithNodes, nodes)
-            
-        # this call is asynch
-        self.findNode(key, _storeValueForKey)
-        
-    
     def insertNode(self, n, contacted=1):
         """
         insert a node in our local table, pinging oldest contact in bucket, if necessary
@@ -238,17 +203,13 @@ class Khashmir(protocol.Factory):
             _krpc_sender = dict['_krpc_sender']
             dict = dict['rsp']
             sender = {'id' : dict['id']}
-            if node.id != const.NULL_ID and node.id != sender['id']:
-                # whoah, got response from different peer than we were expecting
-                self.table.invalidateNode(node)
-            else:
-                sender['host'] = _krpc_sender[0]
-                sender['port'] = _krpc_sender[1]
-                n = Node().initWithDict(sender)
-                n.conn = self.udp.connectionForAddr((n.host, n.port))
-                table.insertNode(n)
-                if callback:
-                    callback()
+            sender['host'] = _krpc_sender[0]
+            sender['port'] = _krpc_sender[1]
+            n = self.Node().initWithDict(sender)
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
+            table.insertNode(n)
+            if callback:
+                callback()
         def _defaultPong(err, node=node, table=self.table, callback=callback):
             table.nodeFailed(node)
             if callback:
@@ -277,25 +238,21 @@ class Khashmir(protocol.Factory):
                 id = newIDInRange(bucket.min, bucket.max)
                 self.findNode(id, callback)
 
+    def stats(self):
+        """
+        Returns (num_contacts, num_nodes)
+        num_contacts: number contacts in our routing table
+        num_nodes: number of nodes estimated in the entire dht
+        """
+        num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+        num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+        return (num_contacts, num_nodes)
 
-    def retrieveValues(self, key):
-        c = self.store.cursor()
-        c.execute("select value from kv where key = %s;", sqlite.encode(key))
-        t = c.fetchone()
-        l = []
-        while t:
-            l.append(t['value'])
-            t = c.fetchone()
-        return l
-    
-    #####
-    ##### INCOMING MESSAGE HANDLERS
-    
     def krpc_ping(self, id, _krpc_sender):
         sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
+        n = self.Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"id" : self.node.id}
@@ -306,52 +263,50 @@ class Khashmir(protocol.Factory):
         sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
+        n = self.Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
         return {"nodes" : nodes, "id" : self.node.id}
-            
-    def krpc_store_value(self, key, value, id, _krpc_sender):
-        t = "%0.6f" % time.time()
-        c = self.store.cursor()
-        try:
-            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
-        except sqlite.IntegrityError, reason:
-            # update last insert time
-            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
-        self.insertNode(n, contacted=0)
-        return {"id" : self.node.id}
 
-    ## multiple values per key
-    def krpc_store_values(self, key, values, id, _krpc_sender):
-        t = "%0.6f" % time.time()
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+    _Node = KNodeRead
+    def retrieveValues(self, key):
         c = self.store.cursor()
-        key = sqlite.encode(key)
-        for value in values:
-            value = sqlite.encode(value)
-            try:
-                c.execute("insert into kv values (%s, %s, %s);", key, value, t)
-            except sqlite.IntegrityError, reason:
-                # update last insert time
-                c.execute("update kv set time = %s where key = %s and value = %s;", (t, key, value))
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
-        self.insertNode(n, contacted=0)
-        return {"id" : self.node.id}
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'])
+            t = c.fetchone()
+        return l
+    ## also async
+    def valueForKey(self, key, callback, searchlocal = 1):
+        """ returns the values found for key in global table
+            callback will be called with a list of values for each peer that returns unique values
+            final callback will be an empty list - probably should change to 'more coming' arg
+        """
+        nodes = self.table.findNodes(key)
+        
+        # get locals
+        if searchlocal:
+            l = self.retrieveValues(key)
+            if len(l) > 0:
+                reactor.callLater(0, callback, (l))
+        else:
+            l = []
+        
+        # create our search state
+        state = GetValue(self, key, callback)
+        reactor.callFromThread(state.goWithNodes, nodes, l)
 
     def krpc_find_value(self, key, id, _krpc_sender):
         sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
-        n = Node().initWithDict(sender)
+        n = self.Node().initWithDict(sender)
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
@@ -363,3 +318,45 @@ class Khashmir(protocol.Factory):
             nodes = map(lambda node: node.senderDict(), nodes)
             return {'nodes' : nodes, "id": self.node.id}
 
+###  provides a generic write method, you probably don't want to deploy something that allows
+###  arbitrary value storage
+class KhashmirWrite(KhashmirRead):
+    _Node = KNodeWrite
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
+        """ stores the value for key in the global table, returns immediately, no status 
+            in this implementation, peers respond but don't indicate status to storing values
+            a key can have many values
+        """
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+            if not response:
+                # default callback
+                def _storedValueHandler(sender):
+                    pass
+                response=_storedValueHandler
+            action = StoreValue(self.table, key, value, response)
+            reactor.callFromThread(action.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findNode(key, _storeValueForKey)
+                    
+    def krpc_store_value(self, key, value, id, _krpc_sender):
+        t = "%0.6f" % time.time()
+        c = self.store.cursor()
+        try:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
+            # update last insert time
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        self.store.commit()
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"id" : self.node.id}
+
+# the whole shebang, for testing
+class Khashmir(KhashmirWrite):
+    _Node = KNodeWrite
index 44341ae6324386a1e8782b82f584266eaa2ebc6e..46ab780f5cf6cebdf37c96860ce74f1f0d968b89 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -1,4 +1,4 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
 from node import Node
@@ -10,7 +10,7 @@ class IDChecker:
     def __init__(id):
         self.id = id
 
-class KNode(Node):
+class KNodeBase(Node):
     def checkSender(self, dict):
         try:
             senderid = dict['rsp']['id']
@@ -20,7 +20,8 @@ class KNode(Node):
         else:
             if self.id != NULL_ID and senderid != self.id:
                 print "Got response from different node than expected."
-                raise Exception, "Got response from different node than expected."
+                self.table.invalidateNode(self)
+                
         return dict
 
     def errBack(self, err):
@@ -37,6 +38,15 @@ class KNode(Node):
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
+
+class KNodeRead(KNodeBase):
+    def findValue(self, key, id):
+        df =  self.conn.sendRequest('find_value', {"key" : key, "id" : id})
+        df.addErrback(self.errBack)
+        df.addCallback(self.checkSender)
+        return df
+
+class KNodeWrite(KNodeRead):
     def storeValue(self, key, value, id):
         df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
         df.addErrback(self.errBack)
@@ -47,8 +57,3 @@ class KNode(Node):
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
-    def findValue(self, key, id):
-        df =  self.conn.sendRequest('find_value', {"key" : key, "id" : id})
-        df.addErrback(self.errBack)
-        df.addCallback(self.checkSender)
-        return df
index 83ddde3ef5cfbce0056c512c0b6ec180457b9378..3f4908f7c49b24d2c88be22f633cb14da0845a17 100644 (file)
@@ -78,7 +78,7 @@ class MultiTest(TestCase):
         
     def setUp(self):
         self.l = []
-        self.startport = 4044
+        self.startport = 4088
         for i in range(self.num):
             self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i)))
         reactor.iterate()
@@ -139,33 +139,6 @@ class MultiTest(TestCase):
                         reactor.iterate()
 
 
-            K = khash.newID()
-            l = map(lambda a: newID(), range(8))
-            for a in range(3):
-                self.done = 0
-                def _scb(val):
-                    self.done = 1
-                self.l[randrange(0, self.num)].storeValueForKey(K, l, _scb)
-                while not self.done:
-                    reactor.iterate()
-
-
-                c = []
-                def _rcb(val):
-                    if not val:
-                        self.done = 1
-                        self.assertEqual(self.got, 1)
-                    for n in val:
-                        c.remove(n)
-                        if not c:
-                            self.got = 1
-                for x in range(3):
-                    self.got = 0
-                    self.done = 0
-                    c = copy(l)
-                    self.l[randrange(0, self.num)].valueForKey(K, _rcb)
-                    while not self.done:
-                        reactor.iterate()