Standardize the number of values retrieved from the DHT.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index ae11dd7ef07158642637df47810aebb6a8c8ee6e..eeaab0acf638215dc2ec707d6c0d62daff48d04c 100644 (file)
@@ -5,7 +5,7 @@ import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
 from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, shuffle
 from sha import sha
 import os
 
@@ -17,7 +17,7 @@ from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
-from actions import FindNode, GetValue, KeyExpirer, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
 import krpc
 
 # this is the base class, has base functionality and find node, no key-value mappings
@@ -33,12 +33,12 @@ class KhashmirBase(protocol.Factory):
         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
+        self.token_secrets = [newID()]
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(self.port, self.udp)
         self._loadRoutingTable()
-        self.expirer = KeyExpirer(self.store, config)
         self.refreshTable(force=1)
         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
 
@@ -59,8 +59,12 @@ class KhashmirBase(protocol.Factory):
         return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
+        self.token_secrets.insert(0, newID())
+        if len(self.token_secrets) > 3:
+            self.token_secrets.pop()
         self.store.saveSelfNode(self.node.id)
         self.store.dumpRoutingTable(self.table.buckets)
+        self.store.expireValues(self.config['KEY_EXPIRE'])
         self.refreshTable()
         if auto:
             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
@@ -192,7 +196,6 @@ class KhashmirBase(protocol.Factory):
             self.next_checkpoint.cancel()
         except:
             pass
-        self.expirer.shutdown()
         self.store.close()
 
     #### Remote Interface - called by remote nodes
@@ -210,8 +213,9 @@ class KhashmirBase(protocol.Factory):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
-        nodes = map(lambda node: node.senderDict(), nodes)
-        return {"nodes" : nodes, "id" : self.node.id}
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+        return {"nodes" : nodes, "token" : token, "id" : self.node.id}
 
 
 ## This class provides read-only access to the DHT, valueForKey
@@ -220,13 +224,25 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     ## also async
+    def findValue(self, key, callback, errback=None):
+        """ returns the contact info for nodes that have values for the key, from the global table """
+        # get K nodes out of local table/cache
+        nodes = self.table.findNodes(key)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+
+        # create our search state
+        state = FindValue(self, key, d.callback, self.config)
+        reactor.callLater(0, state.goWithNodes, nodes)
+
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
             callback will be called with a list of values for each peer that returns unique values
             final callback will be an empty list - probably should change to 'more coming' arg
         """
-        nodes = self.table.findNodes(key)
-        
         # get locals
         if searchlocal:
             l = self.store.retrieveValues(key)
@@ -234,52 +250,68 @@ class KhashmirRead(KhashmirBase):
                 reactor.callLater(0, callback, key, l)
         else:
             l = []
-        
-        # create our search state
-        state = GetValue(self, key, callback, self.config)
-        reactor.callLater(0, state.goWithNodes, nodes, l)
+
+        def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+            # create our search state
+            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+            reactor.callLater(0, state.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findValue(key, _getValueForKey)
 
     #### Remote Interface - called by remote nodes
     def krpc_find_value(self, key, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
     
+        nodes = self.table.findNodes(key)
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        num_values = self.store.countValues(key)
+        return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+    def krpc_get_value(self, key, num, id, _krpc_sender):
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+        self.insertNode(n, contacted=0)
+    
         l = self.store.retrieveValues(key)
-        if len(l) > 0:
+        if num == 0 or num >= len(l):
             return {'values' : l, "id": self.node.id}
         else:
-            nodes = self.table.findNodes(key)
-            nodes = map(lambda node: node.senderDict(), nodes)
-            return {'nodes' : nodes, "id": self.node.id}
+            shuffle(l)
+            return {'values' : l[:num], "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
 ###  arbitrary value storage
 class KhashmirWrite(KhashmirRead):
     _Node = KNodeWrite
     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
-    def storeValueForKey(self, key, value, originated, callback=None):
+    def storeValueForKey(self, key, value, callback=None):
         """ stores the value and origination time for key in the global table, returns immediately, no status 
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, originated, response, self.config)
+            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
     #### Remote Interface - called by remote nodes
-    def krpc_store_value(self, key, value, originated, id, _krpc_sender):
+    def krpc_store_value(self, key, value, token, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
-        self.store.storeValue(key, value, originated)
-        return {"id" : self.node.id}
+        for secret in self.token_secrets:
+            this_token = sha(secret + _krpc_sender[0]).digest()
+            if token == this_token:
+                self.store.storeValue(key, value)
+                return {"id" : self.node.id}
+        raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
 
 # the whole shebang, for testing
 class Khashmir(KhashmirWrite):
@@ -289,11 +321,11 @@ class SimpleTests(unittest.TestCase):
     
     timeout = 10
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
         krpc.KRPC.noisy = 0
@@ -335,7 +367,7 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
         self.got = 0
-        self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
         reactor.iterate()
         reactor.iterate()
         reactor.iterate()
@@ -363,11 +395,11 @@ class MultiTest(unittest.TestCase):
     timeout = 30
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
-                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
+                    'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+                    'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
-                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, 'SPEW': False, }
+                    'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):
         self.done = 1
@@ -417,7 +449,7 @@ class MultiTest(unittest.TestCase):
                 self.done = 0
                 def _scb(key, value, result):
                     self.done = 1
-                self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
+                self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
                 while not self.done:
                     reactor.iterate()