self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
+ self.token_secrets = [newID()]
#self.app = service.Application("krpc")
self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
return self._Node(id, host, port)
def checkpoint(self, auto=0):
+ self.token_secrets.insert(0, newID())
+ if len(self.token_secrets) > 3:
+ self.token_secrets.pop()
self.store.saveSelfNode(self.node.id)
self.store.dumpRoutingTable(self.table.buckets)
self.refreshTable()
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
nodes = self.table.findNodes(target)
- nodes = map(lambda node: node.senderDict(), nodes)
- return {"nodes" : nodes, "id" : self.node.id}
+ nodes = map(lambda node: node.contactInfo(), nodes)
+ token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+ return {"nodes" : nodes, "token" : token, "id" : self.node.id}
## This class provides read-only access to the DHT, valueForKey
return {'values' : l, "id": self.node.id}
else:
nodes = self.table.findNodes(key)
- nodes = map(lambda node: node.senderDict(), nodes)
+ nodes = map(lambda node: node.contactInfo(), nodes)
return {'nodes' : nodes, "id": self.node.id}
### provides a generic write method, you probably don't want to deploy something that allows
class KhashmirWrite(KhashmirRead):
_Node = KNodeWrite
## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, originated, callback=None):
+ def storeValueForKey(self, key, value, callback=None):
""" stores the value and origination time for key in the global table, returns immediately, no status
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, originated, response, self.config)
+ action = StoreValue(self.table, key, value, response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
self.findNode(key, _storeValueForKey)
#### Remote Interface - called by remote nodes
- def krpc_store_value(self, key, value, originated, id, _krpc_sender):
+ def krpc_store_value(self, key, value, token, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
- self.store.storeValue(key, value, originated)
- return {"id" : self.node.id}
+ for secret in self.token_secrets:
+ this_token = sha(secret + _krpc_sender[0]).digest()
+ if token == this_token:
+ self.store.storeValue(key, value)
+ return {"id" : self.node.id}
+ raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
# the whole shebang, for testing
class Khashmir(KhashmirWrite):
timeout = 10
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
reactor.iterate()
reactor.iterate()
self.got = 0
- self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow())
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar')
reactor.iterate()
reactor.iterate()
reactor.iterate()
timeout = 30
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
self.done = 0
def _scb(key, value, result):
self.done = 1
- self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb)
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
while not self.done:
reactor.iterate()