]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/khashmir.py
More and better error messages in the DHT.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index ae11dd7ef07158642637df47810aebb6a8c8ee6e..d3479e661e4f610723ff9a1cc825b19078b9e153 100644 (file)
@@ -33,6 +33,7 @@ class KhashmirBase(protocol.Factory):
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
+        self.token_secrets = [newID()]
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
@@ -59,6 +60,9 @@ class KhashmirBase(protocol.Factory):
         return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
         return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
+        self.token_secrets.insert(0, newID())
+        if len(self.token_secrets) > 3:
+            self.token_secrets.pop()
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
         self.refreshTable()
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
         self.refreshTable()
@@ -210,8 +214,9 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
-        nodes = map(lambda node: node.senderDict(), nodes)
-        return {"nodes" : nodes, "id" : self.node.id}
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+        return {"nodes" : nodes, "token" : token, "id" : self.node.id}
 
 
 ## This class provides read-only access to the DHT, valueForKey
 
 
 ## This class provides read-only access to the DHT, valueForKey
@@ -249,7 +254,7 @@ class KhashmirRead(KhashmirBase):
             return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
             return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
-            nodes = map(lambda node: node.senderDict(), nodes)
+            nodes = map(lambda node: node.contactInfo(), nodes)
             return {'nodes' : nodes, "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
             return {'nodes' : nodes, "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
@@ -257,29 +262,33 @@ class KhashmirRead(KhashmirBase):
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, originated, callback=None):
+    def storeValueForKey(self, key, value, callback=None):
         """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
         """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, originated, response, self.config)
+            action = StoreValue(self.table, key, value, response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, originated, id, _krpc_sender):
+    def krpc_store_value(self, key, value, token, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value, originated)
-        return {"id" : self.node.id}
+        for secret in self.token_secrets:
+            this_token = sha(secret + _krpc_sender[0]).digest()
+            if token == this_token:
+                self.store.storeValue(key, value)
+                return {"id" : self.node.id}
+        raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
 
 # the whole shebang, for testing
 class Khashmir(KhashmirWrite):
 
 # the whole shebang, for testing
 class Khashmir(KhashmirWrite):
@@ -289,7 +298,7 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
@@ -335,7 +344,7 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         self.got = 0
         reactor.iterate()
         reactor.iterate()
         self.got = 0
-        self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
@@ -363,7 +372,7 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
@@ -417,7 +426,7 @@ class MultiTest(unittest.TestCase):
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
-                self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
+                self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
                 while not self.done:
                     reactor.iterate()
 
                 while not self.done:
                     reactor.iterate()