from datetime import datetime, timedelta
from random import randrange, shuffle
from sha import sha
+from copy import copy
import os
from twisted.internet.defer import Deferred
from twisted.internet import protocol, reactor
+from twisted.python import log
from twisted.trial import unittest
from db import DB
"""
# Get K nodes out of local table/cache
nodes = self.table.findNodes(id)
+ nodes = [copy(node) for node in nodes]
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
d.callback(nodes)
else:
# Start the finding nodes action
- state = FindNode(self, id, d.callback, self.config)
+ state = FindNode(self, id, d.callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, node, contacted = True):
(datetime.now() - old.lastSeen) >
timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
- def _staleNodeHandler(oldnode = old, newnode = node):
+ def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
"""The pinged node never responded, so replace it."""
+ log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
+ log.err(err)
self.table.replaceStaleNode(oldnode, newnode)
- def _notStaleNodeHandler(dict, old=old):
+ def _notStaleNodeHandler(dict, old=old, self=self):
"""Got a pong from the old node, so update it."""
dict = dict['rsp']
if dict['id'] == old.id:
self.table.justSeenNode(old.id)
# Bucket is full, check to see if old node is still available
+ self.stats.startedAction('ping')
df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
if callback:
callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
- def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+ def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
"""Error occurred, fail node and errback or callback with error."""
- table.nodeFailed(node)
+ log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
+ log.err(err)
+ self.table.nodeFailed(node)
if errback:
errback()
elif callback:
callback(None)
+ self.stats.startedAction('join')
df = node.join(self.node.id)
df.addCallbacks(_pongHandler, _defaultPong)
def getStats(self):
"""Gather the statistics for the DHT."""
- return self.stats.gather()
+ return self.stats.formatHTML()
#{ Remote interface
- def krpc_ping(self, id, _krpc_sender):
+ def krpc_ping(self, id, _krpc_sender = None):
"""Pong with our ID.
@type id: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
return {"id" : self.node.id}
- def krpc_join(self, id, _krpc_sender):
+ def krpc_join(self, id, _krpc_sender = None):
"""Add the node by responding with its address and port.
@type id: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+ else:
+ _krpc_sender = ('127.0.0.1', self.port)
return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
- def krpc_find_node(self, target, id, _krpc_sender):
+ def krpc_find_node(self, id, target, _krpc_sender = None):
"""Find the K closest nodes to the target in the local routing table.
@type target: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+ else:
+ _krpc_sender = ('127.0.0.1', self.port)
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.contactInfo(), nodes)
"""
# Get K nodes out of local table/cache
nodes = self.table.findNodes(key)
+ nodes = [copy(node) for node in nodes]
d = Deferred()
if errback:
d.addCallbacks(callback, errback)
d.addCallback(callback)
# Search for others starting with the locally found ones
- state = FindValue(self, key, d.callback, self.config)
+ state = FindValue(self, key, d.callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def valueForKey(self, key, callback, searchlocal = True):
def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
"""Use the found nodes to send requests for values to."""
- state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+ state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
# First lookup nodes that have values for the key
self.findValue(key, _getValueForKey)
#{ Remote interface
- def krpc_find_value(self, key, id, _krpc_sender):
+ def krpc_find_value(self, id, key, _krpc_sender = None):
"""Find the number of values stored locally for the key, and the K closest nodes.
@type key: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.contactInfo(), nodes)
num_values = self.store.countValues(key)
return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
- def krpc_get_value(self, key, num, id, _krpc_sender):
+ def krpc_get_value(self, id, key, num, _krpc_sender = None):
"""Retrieve the values stored locally for the key.
@type key: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
l = self.store.retrieveValues(key)
if num == 0 or num >= len(l):
"""Default callback that does nothing."""
pass
response = _storedValueHandler
- action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
+ action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
reactor.callLater(0, action.goWithNodes, nodes)
# First find the K closest nodes to operate on.
self.findNode(key, _storeValueForKey)
#{ Remote interface
- def krpc_store_value(self, key, value, token, id, _krpc_sender):
+ def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
"""Store the value locally with the key.
@type key: C{string}
@type _krpc_sender: (C{string}, C{int})
@param _krpc_sender: the sender node's IP address and port
"""
- n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ if _krpc_sender is not None:
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted = False)
+ else:
+ _krpc_sender = ('127.0.0.1', self.port)
+
for secret in self.token_secrets:
this_token = sha(secret + _krpc_sender[0]).digest()
if token == this_token: