+from datetime import datetime
import os, sha, random
from twisted.internet import defer, reactor
from apt_dht.interfaces import IDHT
from khashmir import Khashmir
+khashmir_dir = 'apt-dht-Khashmir'
+
class DHTError(Exception):
"""Represents errors that occur in the DHT."""
self.bootstrap_node = False
self.joining = None
self.joined = False
+ self.outstandingJoins = 0
self.foundAddrs = []
self.storing = {}
self.retrieving = {}
self.config_parser = config
self.section = section
self.config = {}
- self.cache_dir = self.config_parser.get(section, 'cache_dir')
+ self.cache_dir = os.path.join(self.config_parser.get(section, 'cache_dir'), khashmir_dir)
+ if not os.path.exists(self.cache_dir):
+ os.makedirs(self.cache_dir)
self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
for k in self.config_parser.options(section):
def _join_gotIP(self, ip, port):
"""Called after an IP address has been found for a single bootstrap node."""
- self.khashmir.addContact(ip, port, self._join_single)
+ self.outstandingJoins += 1
+ self.khashmir.addContact(ip, port, self._join_single, self._join_error)
def _join_single(self, addr):
"""Called when a single bootstrap node has been added."""
+ self.outstandingJoins -= 1
if addr:
self.foundAddrs.append(addr)
+ if addr or self.outstandingJoins <= 0:
+ self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
log.msg('Got back from bootstrap node: %r' % (addr,))
- self.khashmir.findCloseNodes(self._join_complete)
+ def _join_error(self, failure = None):
+ """Called when a single bootstrap node has failed."""
+ self.outstandingJoins -= 1
+ log.msg("bootstrap node could not be reached")
+ if self.outstandingJoins <= 0:
+ self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+
def _join_complete(self, result):
"""Called when the tables have been initialized with nodes."""
- if not self.joined:
+ if not self.joined and len(result) > 0:
self.joined = True
+ if self.joining and self.outstandingJoins <= 0:
df = self.joining
self.joining = None
- if len(result) > 0 or self.bootstrap_node:
- df.callback(result)
+ if self.joined or self.bootstrap_node:
+ self.joined = True
+ df.callback(self.foundAddrs)
else:
df.errback(DHTError('could not find any nodes to bootstrap to'))
d.callback(final_result)
del self.retrieving[key]
- def storeValue(self, key, value):
+ def storeValue(self, key, value, originated = None):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "configuration not loaded"
if key in self.storing and value in self.storing[key]:
raise DHTError, "already storing that key with the same value"
+ if originated is None:
+ originated = datetime.utcnow()
d = defer.Deferred()
- self.khashmir.storeValueForKey(key, value, self._storeValue)
+ self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
self.storing.setdefault(key, {})[value] = d
return d