4 from twisted.internet import defer
5 from twisted.trial import unittest
6 from zope.interface import implements
8 from apt_dht.interfaces import IDHT
9 from khashmir import Khashmir
11 class DHTError(Exception):
12 """Represents errors that occur in the DHT."""
22 self.bootstrap_node = False
26 def loadConfig(self, config, section):
27 """See L{apt_dht.interfaces.IDHT}."""
28 self.config_parser = config
29 self.section = section
31 self.cache_dir = self.config_parser.get('DEFAULT', 'cache_dir')
32 self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
33 self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
34 for k in self.config_parser.options(section):
35 if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY',
36 'MAX_FAILURES', 'PORT']:
37 self.config[k] = self.config_parser.getint(section, k)
38 elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
39 'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
40 self.config[k] = self.config_parser.gettime(section, k)
42 self.config[k] = self.config_parser.get(section, k)
43 if 'PORT' not in self.config:
44 self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT')
47 """See L{apt_dht.interfaces.IDHT}."""
48 if self.config is None:
49 raise DHTError, "configuration not loaded"
51 raise DHTError, "a join is already in progress"
53 self.khashmir = Khashmir(self.config, self.cache_dir)
55 self.joining = defer.Deferred()
56 for node in self.bootstrap:
57 host, port = node.rsplit(':', 1)
59 self.khashmir.addContact(host, port, self._join_single)
63 def _join_single(self):
64 """Called when a single bootstrap node has been added."""
65 self.khashmir.findCloseNodes(self._join_complete)
67 def _join_complete(self, result):
68 """Called when the tables have been initialized with nodes."""
71 if len(result) > 0 or self.bootstrap_node:
78 df.errback(DHTError('could not find any nodes to bootstrap to'))
81 """See L{apt_dht.interfaces.IDHT}."""
82 if self.config is None:
83 raise DHTError, "configuration not loaded"
85 if self.joined or self.joining:
87 self.joining.errback(DHTError('still joining when leave was called'))
90 self.khashmir.shutdown()
92 def getValue(self, key):
93 """See L{apt_dht.interfaces.IDHT}."""
94 if self.config is None:
95 raise DHTError, "configuration not loaded"
97 raise DHTError, "have not joined a network yet"
100 self.khashmir.valueForKey(key, d.callback)
103 def storeValue(self, key, value):
104 """See L{apt_dht.interfaces.IDHT}."""
105 if self.config is None:
106 raise DHTError, "configuration not loaded"
108 raise DHTError, "have not joined a network yet"
110 self.khashmir.storeValueForKey(key, value)
112 class TestSimpleDHT(unittest.TestCase):
113 """Unit tests for the DHT."""
116 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
117 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
118 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
119 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
120 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
126 self.a.config = self.DHT_DEFAULTS.copy()
127 self.a.config['PORT'] = 4044
128 self.a.bootstrap = ["127.0.0.1:4044"]
129 self.a.bootstrap_node = True
130 self.a.cache_dir = '/tmp'
131 self.b.config = self.DHT_DEFAULTS.copy()
132 self.b.config['PORT'] = 4045
133 self.b.bootstrap = ["127.0.0.1:4044"]
134 self.b.cache_dir = '/tmp'
136 def test_bootstrap_join(self):
140 def node_join(self, result):
145 self.lastDefer = defer.Deferred()
147 d.addCallback(self.node_join)
148 d.addCallback(self.lastDefer.callback)
149 return self.lastDefer
154 os.unlink(self.a.khashmir.db)
159 os.unlink(self.b.khashmir.db)
163 class TestMultiDHT(unittest.TestCase):
167 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
168 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
169 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
170 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
171 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
176 self.startport = 4088
177 for i in range(self.num):
179 self.l[i].config = self.DHT_DEFAULTS.copy()
180 self.l[i].config['PORT'] = self.startport + i
181 self.l[i].bootstrap = ["127.0.0.1:" + str(self.startport)]
182 self.l[i].cache_dir = '/tmp'
183 self.l[0].bootstrap_node = True
185 def node_join(self, result, next_node):
186 d = self.l[next_node].join()
187 if next_node + 1 < len(self.l):
188 d.addCallback(self.node_join, next_node + 1)
190 d.addCallback(self.lastDefer.callback)
193 self.lastDefer = defer.Deferred()
195 d.addCallback(self.node_join, 1)
196 return self.lastDefer
202 os.unlink(i.khashmir.db)