]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/khashmir.py
Move the key expiring to the checkpoint function.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index 80d811878260d54ac3f76bc69ce2150fb0063bf8..a9a78674a048c8e3d6e0f0e3392bcb285bd7bfce 100644 (file)
@@ -17,7 +17,7 @@ from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
-from actions import FindNode, GetValue, KeyExpirer, StoreValue
+from actions import FindNode, GetValue, StoreValue
 import krpc
 
 # this is the base class, has base functionality and find node, no key-value mappings
@@ -33,12 +33,12 @@ class KhashmirBase(protocol.Factory):
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
+        self.token_secrets = [newID()]
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(self.port, self.udp)
         self._loadRoutingTable()
-        self.expirer = KeyExpirer(self.store, config)
         self.refreshTable(force=1)
         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
 
@@ -59,8 +59,12 @@ class KhashmirBase(protocol.Factory):
         return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
+        self.token_secrets.insert(0, newID())
+        if len(self.token_secrets) > 3:
+            self.token_secrets.pop()
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
+        self.store.expireValues(self.config['KEY_EXPIRE'])
         self.refreshTable()
         if auto:
             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
@@ -80,12 +84,12 @@ class KhashmirBase(protocol.Factory):
 
     #######
     #######  LOCAL INTERFACE    - use these methods!
-    def addContact(self, host, port, callback=None):
+    def addContact(self, host, port, callback=None, errback=None):
         """
             ping this node and add the contact info to the table on pong!
         """
         n = self.Node(NULL_ID, host, port)
-        self.sendJoin(n, callback=callback)
+        self.sendJoin(n, callback=callback, errback=errback)
 
     ## this call is async!
     def findNode(self, id, callback, errback=None):
@@ -133,7 +137,7 @@ class KhashmirBase(protocol.Factory):
             df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
-    def sendJoin(self, node, callback=None):
+    def sendJoin(self, node, callback=None, errback=None):
         """
             ping a node
         """
@@ -144,21 +148,23 @@ class KhashmirBase(protocol.Factory):
             self.insertNode(n)
             if callback:
                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
-        def _defaultPong(err, node=node, table=self.table, callback=callback):
+        def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
             table.nodeFailed(node)
-            if callback:
+            if errback:
+                errback()
+            else:
                 callback(None)
         
         df.addCallbacks(_pongHandler,_defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None):
+    def findCloseNodes(self, callback=lambda a: None, errback = None):
         """
             This does a findNode on the ID one away from our own.  
             This will allow us to populate our table with nodes on our network closest to our own.
             This is called as soon as we start up with an empty table
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback)
+        self.findNode(id, callback, errback)
 
     def refreshTable(self, force=0):
         """
@@ -190,7 +196,6 @@ class KhashmirBase(protocol.Factory):
             self.next_checkpoint.cancel()
         except:
             pass
-        self.expirer.shutdown()
         self.store.close()
 
     #### Remote Interface - called by remote nodes
@@ -208,8 +213,9 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
-        nodes = map(lambda node: node.senderDict(), nodes)
-        return {"nodes" : nodes, "id" : self.node.id}
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+        return {"nodes" : nodes, "token" : token, "id" : self.node.id}
 
 
 ## This class provides read-only access to the DHT, valueForKey
@@ -247,7 +253,7 @@ class KhashmirRead(KhashmirBase):
             return {'values' : l, "id": self.node.id}
         else:
             nodes = self.table.findNodes(key)
-            nodes = map(lambda node: node.senderDict(), nodes)
+            nodes = map(lambda node: node.contactInfo(), nodes)
             return {'nodes' : nodes, "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
@@ -256,7 +262,7 @@ class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
     def storeValueForKey(self, key, value, callback=None):
-        """ stores the value for key in the global table, returns immediately, no status 
+        """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
@@ -273,11 +279,15 @@ class KhashmirWrite(KhashmirRead):
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, id, _krpc_sender):
+    def krpc_store_value(self, key, value, token, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value)
-        return {"id" : self.node.id}
+        for secret in self.token_secrets:
+            this_token = sha(secret + _krpc_sender[0]).digest()
+            if token == this_token:
+                self.store.storeValue(key, value)
+                return {"id" : self.node.id}
+        raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
 
 # the whole shebang, for testing
 class Khashmir(KhashmirWrite):
@@ -287,11 +297,10 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
         krpc.KRPC.noisy = 0
@@ -361,11 +370,10 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):
         self.done = 1