Comply with the newly defined protocol on the web page.
Various things need to done to comply with the newly defined protocol:
- - add the token to find_node responses
- - use the token in store_node requests
- standardize the error messages (especially for a bad token)
HASH_LENGTH = 160
# interval between saving the running state
-CHECKPOINT_INTERVAL = 15m
+CHECKPOINT_INTERVAL = 5m
# concurrent number of calls per find node/value request!
CONCURRENT_REQS = 4
'HASH_LENGTH': '160',
# checkpoint every this many seconds
- 'CHECKPOINT_INTERVAL': '15m', # fifteen minutes
+ 'CHECKPOINT_INTERVAL': '5m', # five minutes
### SEARCHING/STORING
# concurrent xmlrpc calls per find node/value request!
timeout = 2
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
timeout = 60
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
dict = dict['rsp']
n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
self.caller.insertNode(n)
+ if dict["id"] in self.found:
+ self.found[dict["id"]].updateToken(dict.get('token', ''))
l = dict["nodes"]
if self.finished or self.answered.has_key(dict["id"]):
# a day late and a dollar short
except AttributeError:
log.msg("%s doesn't have a %s method!" % (node, self.store))
else:
- df = f(self.target, self.value, self.caller.node.id)
+ df = f(self.target, self.value, node.token, self.caller.node.id)
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
+ self.token_secrets = [newID()]
#self.app = service.Application("krpc")
self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
return self._Node(id, host, port)
def checkpoint(self, auto=0):
+ self.token_secrets.insert(0, newID())
+ if len(self.token_secrets) > 3:
+ self.token_secrets.pop()
self.store.saveSelfNode(self.node.id)
self.store.dumpRoutingTable(self.table.buckets)
self.refreshTable()
self.insertNode(n, contacted=0)
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.contactInfo(), nodes)
- return {"nodes" : nodes, "id" : self.node.id}
+ token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
+ return {"nodes" : nodes, "token" : token, "id" : self.node.id}
## This class provides read-only access to the DHT, valueForKey
self.findNode(key, _storeValueForKey)
#### Remote Interface - called by remote nodes
- def krpc_store_value(self, key, value, id, _krpc_sender):
+ def krpc_store_value(self, key, value, token, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
- self.store.storeValue(key, value)
+ for secret in self.token_secrets:
+ this_token = sha(secret + _krpc_sender[0]).digest()
+ if token == this_token:
+ self.store.storeValue(key, value)
+ break;
return {"id" : self.node.id}
# the whole shebang, for testing
timeout = 10
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
timeout = 30
num = 20
DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
- 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
return df
class KNodeWrite(KNodeRead):
- def storeValue(self, key, value, id):
- df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id})
+ def storeValue(self, key, value, token, id):
+ df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
self.num = khash.intify(id)
self.host = host
self.port = int(port)
+ self.token = ''
self._contactInfo = None
def updateLastSeen(self):
self.lastSeen = datetime.now()
self.fails = 0
+
+ def updateToken(self, token):
+ self.token = token
def msgFailed(self):
self.fails = self.fails + 1
<term><option>CHECKPOINT_INTERVAL = <replaceable>time</replaceable></option></term>
<listitem>
<para>The <replaceable>time</replaceable> to wait between saves of the running state.
- (Default is 15 minutes.)</para>
+ (Default is 5 minutes.)</para>
</listitem>
</varlistentry>
<varlistentry>
HASH_LENGTH = 160
# checkpoint every this many seconds
-CHECKPOINT_INTERVAL = 15m
+CHECKPOINT_INTERVAL = 5m
# concurrent xmlrpc calls per find node/value request!
CONCURRENT_REQS = 4