]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/DHT.py
Made the DHT join method work and added tests for it.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / DHT.py
1
2 import os
3
4 from twisted.internet import defer
5 from twisted.trial import unittest
6 from zope.interface import implements
7
8 from apt_dht.interfaces import IDHT
9 from khashmir import Khashmir
10
11 class DHTError(Exception):
12     """Represents errors that occur in the DHT."""
13
14 class DHT:
15     
16     implements(IDHT)
17     
18     def __init__(self):
19         self.config = None
20         self.cache_dir = ''
21         self.bootstrap = []
22         self.bootstrap_node = False
23         self.joining = None
24         self.joined = False
25     
26     def loadConfig(self, config, section):
27         """See L{apt_dht.interfaces.IDHT}."""
28         self.config_parser = config
29         self.section = section
30         self.config = []
31         self.cache_dir = self.config_parser.get('DEFAULT', 'cache_dir')
32         self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
33         self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
34         for k in self.config_parser.options(section):
35             if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', 
36                      'MAX_FAILURES', 'PORT']:
37                 self.config[k] = self.config_parser.getint(section, k)
38             elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 
39                        'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
40                 self.config[k] = self.config_parser.gettime(section, k)
41             else:
42                 self.config[k] = self.config_parser.get(section, k)
43         if 'PORT' not in self.config:
44             self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT')
45     
46     def join(self):
47         """See L{apt_dht.interfaces.IDHT}."""
48         if self.config is None:
49             raise DHTError, "configuration not loaded"
50
51         self.khashmir = Khashmir(self.config, self.cache_dir)
52         
53         self.joining = defer.Deferred()
54         for node in self.bootstrap:
55             host, port = node.rsplit(':', 1)
56             port = int(port)
57             self.khashmir.addContact(host, port, self._join_single)
58         
59         return self.joining
60     
61     def _join_single(self):
62         """Called when a single bootstrap node has been added."""
63         self.khashmir.findCloseNodes(self._join_complete)
64     
65     def _join_complete(self, result):
66         """Called when the tables have been initialized with nodes."""
67         if not self.joined:
68             self.joined = True
69             if len(result) > 0 or self.bootstrap_node:
70                 self.joining.callback(result)
71             else:
72                 self.joining.errback(DHTError('could not find any nodes to bootstrap to'))
73         
74     def leave(self):
75         """See L{apt_dht.interfaces.IDHT}."""
76         if self.config is None:
77             raise DHTError, "configuration not loaded"
78         
79         if self.joined:
80             self.joined = False
81             self.khashmir.shutdown()
82         
83     def getValue(self, key):
84         """See L{apt_dht.interfaces.IDHT}."""
85         if self.config is None:
86             raise DHTError, "configuration not loaded"
87         if not self.joined:
88             raise DHTError, "have not joined a network yet"
89
90         d = defer.Deferred()
91         self.khashmir.valueForKey(key, d.callback)
92         return d
93         
94     def storeValue(self, key, value):
95         """See L{apt_dht.interfaces.IDHT}."""
96         if self.config is None:
97             raise DHTError, "configuration not loaded"
98         if not self.joined:
99             raise DHTError, "have not joined a network yet"
100
101         self.khashmir.storeValueForKey(key, value)
102
103 class TestSimpleDHT(unittest.TestCase):
104     """Unit tests for the DHT."""
105     
106     timeout = 2
107     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
108                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
109                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
110                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
111                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
112                     'KE_AGE': 3600, }
113
114     def setUp(self):
115         self.a = DHT()
116         self.b = DHT()
117         self.a.config = self.DHT_DEFAULTS.copy()
118         self.a.config['PORT'] = 4044
119         self.a.bootstrap = ["127.0.0.1:4044"]
120         self.a.bootstrap_node = True
121         self.a.cache_dir = '/tmp'
122         self.b.config = self.DHT_DEFAULTS.copy()
123         self.b.config['PORT'] = 4045
124         self.b.bootstrap = ["127.0.0.1:4044"]
125         self.b.cache_dir = '/tmp'
126         
127     def test_bootstrap_join(self):
128         d = self.a.join()
129         return d
130         
131     def node_join(self, result):
132         d = self.b.join()
133         return d
134     
135     def test_join(self):
136         self.lastDefer = defer.Deferred()
137         d = self.a.join()
138         d.addCallback(self.node_join)
139         d.addCallback(self.lastDefer.callback)
140         return self.lastDefer
141         
142     def tearDown(self):
143         self.a.leave()
144         try:
145             os.unlink(self.a.khashmir.db)
146         except:
147             pass
148         self.b.leave()
149         try:
150             os.unlink(self.b.khashmir.db)
151         except:
152             pass
153
154 class TestMultiDHT(unittest.TestCase):
155     
156     timeout = 10
157     num = 20
158     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
159                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
160                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
161                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
162                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
163                     'KE_AGE': 3600, }
164
165     def setUp(self):
166         self.l = []
167         self.startport = 4088
168         for i in range(self.num):
169             self.l.append(DHT())
170             self.l[i].config = self.DHT_DEFAULTS.copy()
171             self.l[i].config['PORT'] = self.startport + i
172             self.l[i].bootstrap = ["127.0.0.1:" + str(self.startport)]
173             self.l[i].cache_dir = '/tmp'
174         self.l[0].bootstrap_node = True
175         
176     def node_join(self, result, next_node):
177         d = self.l[next_node].join()
178         if next_node + 1 < len(self.l):
179             d.addCallback(self.node_join, next_node + 1)
180         else:
181             d.addCallback(self.lastDefer.callback)
182     
183     def test_join(self):
184         self.lastDefer = defer.Deferred()
185         d = self.l[0].join()
186         d.addCallback(self.node_join, 1)
187         return self.lastDefer
188         
189     def tearDown(self):
190         for i in self.l:
191             try:
192                 i.leave()
193                 os.unlink(i.khashmir.db)
194             except:
195                 pass