warnings.simplefilter("ignore", DeprecationWarning)
from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, shuffle
from sha import sha
import os
from ktable import KTable
from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
-from actions import FindNode, GetValue, KeyExpirer, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
import krpc
# this is the base class, has base functionality and find node, no key-value mappings
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
self._loadRoutingTable()
- self.expirer = KeyExpirer(self.store, config)
self.refreshTable(force=1)
self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
self.token_secrets.pop()
self.store.saveSelfNode(self.node.id)
self.store.dumpRoutingTable(self.table.buckets)
+ self.store.expireValues(self.config['KEY_EXPIRE'])
self.refreshTable()
if auto:
self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
self.next_checkpoint.cancel()
except:
pass
- self.expirer.shutdown()
self.store.close()
#### Remote Interface - called by remote nodes
_Node = KNodeRead
## also async
+ def findValue(self, key, callback, errback=None):
+ """ returns the contact info for nodes that have values for the key, from the global table """
+ # get K nodes out of local table/cache
+ nodes = self.table.findNodes(key)
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+
+ # create our search state
+ state = FindValue(self, key, d.callback, self.config)
+ reactor.callLater(0, state.goWithNodes, nodes)
+
def valueForKey(self, key, callback, searchlocal = 1):
""" returns the values found for key in global table
callback will be called with a list of values for each peer that returns unique values
final callback will be an empty list - probably should change to 'more coming' arg
"""
- nodes = self.table.findNodes(key)
-
# get locals
if searchlocal:
l = self.store.retrieveValues(key)
reactor.callLater(0, callback, key, l)
else:
l = []
-
- # create our search state
- state = GetValue(self, key, callback, self.config)
- reactor.callLater(0, state.goWithNodes, nodes, l)
+
+ def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+ # create our search state
+ state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+ reactor.callLater(0, state.goWithNodes, nodes)
+
+ # this call is asynch
+ self.findValue(key, _getValueForKey)
#### Remote Interface - called by remote nodes
def krpc_find_value(self, key, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
+ nodes = self.table.findNodes(key)
+ nodes = map(lambda node: node.contactInfo(), nodes)
+ num_values = self.store.countValues(key)
+ return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+ def krpc_get_value(self, key, num, id, _krpc_sender):
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
+
l = self.store.retrieveValues(key)
- if len(l) > 0:
+ if num == 0 or num >= len(l):
return {'values' : l, "id": self.node.id}
else:
- nodes = self.table.findNodes(key)
- nodes = map(lambda node: node.contactInfo(), nodes)
- return {'nodes' : nodes, "id": self.node.id}
+ shuffle(l)
+ return {'values' : l[:num], "id": self.node.id}
### provides a generic write method, you probably don't want to deploy something that allows
### arbitrary value storage
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, response, self.config)
+ action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
timeout = 10
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
- 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, 'SPEW': False, }
+ 'KEY_EXPIRE': 3600, 'SPEW': False, }
def setUp(self):
krpc.KRPC.noisy = 0
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
- 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
+ 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
- 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, 'SPEW': False, }
+ 'KEY_EXPIRE': 3600, 'SPEW': False, }
def _done(self, val):
self.done = 1