4 from twisted.internet import defer
5 from twisted.trial import unittest
6 from zope.interface import implements
8 from apt_dht.interfaces import IDHT
9 from khashmir import Khashmir
11 class DHTError(Exception):
12 """Represents errors that occur in the DHT."""
22 self.bootstrap_node = False
26 def loadConfig(self, config, section):
27 """See L{apt_dht.interfaces.IDHT}."""
28 self.config_parser = config
29 self.section = section
31 self.cache_dir = self.config_parser.get('DEFAULT', 'cache_dir')
32 self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
33 self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
34 for k in self.config_parser.options(section):
35 if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY',
36 'MAX_FAILURES', 'PORT']:
37 self.config[k] = self.config_parser.getint(section, k)
38 elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
39 'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
40 self.config[k] = self.config_parser.gettime(section, k)
42 self.config[k] = self.config_parser.get(section, k)
43 if 'PORT' not in self.config:
44 self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT')
47 """See L{apt_dht.interfaces.IDHT}."""
48 if self.config is None:
49 raise DHTError, "configuration not loaded"
51 self.khashmir = Khashmir(self.config, self.cache_dir)
53 self.joining = defer.Deferred()
54 for node in self.bootstrap:
55 host, port = node.rsplit(':', 1)
57 self.khashmir.addContact(host, port, self._join_single)
61 def _join_single(self):
62 """Called when a single bootstrap node has been added."""
63 self.khashmir.findCloseNodes(self._join_complete)
65 def _join_complete(self, result):
66 """Called when the tables have been initialized with nodes."""
69 if len(result) > 0 or self.bootstrap_node:
70 self.joining.callback(result)
72 self.joining.errback(DHTError('could not find any nodes to bootstrap to'))
75 """See L{apt_dht.interfaces.IDHT}."""
76 if self.config is None:
77 raise DHTError, "configuration not loaded"
81 self.khashmir.shutdown()
83 def getValue(self, key):
84 """See L{apt_dht.interfaces.IDHT}."""
85 if self.config is None:
86 raise DHTError, "configuration not loaded"
88 raise DHTError, "have not joined a network yet"
91 self.khashmir.valueForKey(key, d.callback)
94 def storeValue(self, key, value):
95 """See L{apt_dht.interfaces.IDHT}."""
96 if self.config is None:
97 raise DHTError, "configuration not loaded"
99 raise DHTError, "have not joined a network yet"
101 self.khashmir.storeValueForKey(key, value)
103 class TestSimpleDHT(unittest.TestCase):
104 """Unit tests for the DHT."""
107 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
108 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
109 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
110 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
111 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
117 self.a.config = self.DHT_DEFAULTS.copy()
118 self.a.config['PORT'] = 4044
119 self.a.bootstrap = ["127.0.0.1:4044"]
120 self.a.bootstrap_node = True
121 self.a.cache_dir = '/tmp'
122 self.b.config = self.DHT_DEFAULTS.copy()
123 self.b.config['PORT'] = 4045
124 self.b.bootstrap = ["127.0.0.1:4044"]
125 self.b.cache_dir = '/tmp'
127 def test_bootstrap_join(self):
131 def node_join(self, result):
136 self.lastDefer = defer.Deferred()
138 d.addCallback(self.node_join)
139 d.addCallback(self.lastDefer.callback)
140 return self.lastDefer
145 os.unlink(self.a.khashmir.db)
150 os.unlink(self.b.khashmir.db)
154 class TestMultiDHT(unittest.TestCase):
158 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
159 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
160 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
161 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
162 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
167 self.startport = 4088
168 for i in range(self.num):
170 self.l[i].config = self.DHT_DEFAULTS.copy()
171 self.l[i].config['PORT'] = self.startport + i
172 self.l[i].bootstrap = ["127.0.0.1:" + str(self.startport)]
173 self.l[i].cache_dir = '/tmp'
174 self.l[0].bootstrap_node = True
176 def node_join(self, result, next_node):
177 d = self.l[next_node].join()
178 if next_node + 1 < len(self.l):
179 d.addCallback(self.node_join, next_node + 1)
181 d.addCallback(self.lastDefer.callback)
184 self.lastDefer = defer.Deferred()
186 d.addCallback(self.node_join, 1)
187 return self.lastDefer
193 os.unlink(i.khashmir.db)