]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/DHT.py
Updated the DHT join and leave to check if a join is in progress.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / DHT.py
1
2 import os
3
4 from twisted.internet import defer
5 from twisted.trial import unittest
6 from zope.interface import implements
7
8 from apt_dht.interfaces import IDHT
9 from khashmir import Khashmir
10
11 class DHTError(Exception):
12     """Represents errors that occur in the DHT."""
13
14 class DHT:
15     
16     implements(IDHT)
17     
18     def __init__(self):
19         self.config = None
20         self.cache_dir = ''
21         self.bootstrap = []
22         self.bootstrap_node = False
23         self.joining = None
24         self.joined = False
25     
26     def loadConfig(self, config, section):
27         """See L{apt_dht.interfaces.IDHT}."""
28         self.config_parser = config
29         self.section = section
30         self.config = []
31         self.cache_dir = self.config_parser.get('DEFAULT', 'cache_dir')
32         self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
33         self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
34         for k in self.config_parser.options(section):
35             if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', 
36                      'MAX_FAILURES', 'PORT']:
37                 self.config[k] = self.config_parser.getint(section, k)
38             elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 
39                        'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
40                 self.config[k] = self.config_parser.gettime(section, k)
41             else:
42                 self.config[k] = self.config_parser.get(section, k)
43         if 'PORT' not in self.config:
44             self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT')
45     
46     def join(self):
47         """See L{apt_dht.interfaces.IDHT}."""
48         if self.config is None:
49             raise DHTError, "configuration not loaded"
50         if self.joining:
51             raise DHTError, "a join is already in progress"
52
53         self.khashmir = Khashmir(self.config, self.cache_dir)
54         
55         self.joining = defer.Deferred()
56         for node in self.bootstrap:
57             host, port = node.rsplit(':', 1)
58             port = int(port)
59             self.khashmir.addContact(host, port, self._join_single)
60         
61         return self.joining
62     
63     def _join_single(self):
64         """Called when a single bootstrap node has been added."""
65         self.khashmir.findCloseNodes(self._join_complete)
66     
67     def _join_complete(self, result):
68         """Called when the tables have been initialized with nodes."""
69         if not self.joined:
70             self.joined = True
71             if len(result) > 0 or self.bootstrap_node:
72                 df = self.joining
73                 self.joining = None
74                 df.callback(result)
75             else:
76                 df = self.joining
77                 self.joining = None
78                 df.errback(DHTError('could not find any nodes to bootstrap to'))
79         
80     def leave(self):
81         """See L{apt_dht.interfaces.IDHT}."""
82         if self.config is None:
83             raise DHTError, "configuration not loaded"
84         
85         if self.joined or self.joining:
86             if self.joining:
87                 self.joining.errback(DHTError('still joining when leave was called'))
88                 self.joining = None
89             self.joined = False
90             self.khashmir.shutdown()
91         
92     def getValue(self, key):
93         """See L{apt_dht.interfaces.IDHT}."""
94         if self.config is None:
95             raise DHTError, "configuration not loaded"
96         if not self.joined:
97             raise DHTError, "have not joined a network yet"
98
99         d = defer.Deferred()
100         self.khashmir.valueForKey(key, d.callback)
101         return d
102         
103     def storeValue(self, key, value):
104         """See L{apt_dht.interfaces.IDHT}."""
105         if self.config is None:
106             raise DHTError, "configuration not loaded"
107         if not self.joined:
108             raise DHTError, "have not joined a network yet"
109
110         self.khashmir.storeValueForKey(key, value)
111
112 class TestSimpleDHT(unittest.TestCase):
113     """Unit tests for the DHT."""
114     
115     timeout = 2
116     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
117                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
118                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
119                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
120                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
121                     'KE_AGE': 3600, }
122
123     def setUp(self):
124         self.a = DHT()
125         self.b = DHT()
126         self.a.config = self.DHT_DEFAULTS.copy()
127         self.a.config['PORT'] = 4044
128         self.a.bootstrap = ["127.0.0.1:4044"]
129         self.a.bootstrap_node = True
130         self.a.cache_dir = '/tmp'
131         self.b.config = self.DHT_DEFAULTS.copy()
132         self.b.config['PORT'] = 4045
133         self.b.bootstrap = ["127.0.0.1:4044"]
134         self.b.cache_dir = '/tmp'
135         
136     def test_bootstrap_join(self):
137         d = self.a.join()
138         return d
139         
140     def node_join(self, result):
141         d = self.b.join()
142         return d
143     
144     def test_join(self):
145         self.lastDefer = defer.Deferred()
146         d = self.a.join()
147         d.addCallback(self.node_join)
148         d.addCallback(self.lastDefer.callback)
149         return self.lastDefer
150         
151     def tearDown(self):
152         self.a.leave()
153         try:
154             os.unlink(self.a.khashmir.db)
155         except:
156             pass
157         self.b.leave()
158         try:
159             os.unlink(self.b.khashmir.db)
160         except:
161             pass
162
163 class TestMultiDHT(unittest.TestCase):
164     
165     timeout = 10
166     num = 20
167     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
168                     'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
169                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
170                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
171                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
172                     'KE_AGE': 3600, }
173
174     def setUp(self):
175         self.l = []
176         self.startport = 4088
177         for i in range(self.num):
178             self.l.append(DHT())
179             self.l[i].config = self.DHT_DEFAULTS.copy()
180             self.l[i].config['PORT'] = self.startport + i
181             self.l[i].bootstrap = ["127.0.0.1:" + str(self.startport)]
182             self.l[i].cache_dir = '/tmp'
183         self.l[0].bootstrap_node = True
184         
185     def node_join(self, result, next_node):
186         d = self.l[next_node].join()
187         if next_node + 1 < len(self.l):
188             d.addCallback(self.node_join, next_node + 1)
189         else:
190             d.addCallback(self.lastDefer.callback)
191     
192     def test_join(self):
193         self.lastDefer = defer.Deferred()
194         d = self.l[0].join()
195         d.addCallback(self.node_join, 1)
196         return self.lastDefer
197         
198     def tearDown(self):
199         for i in self.l:
200             try:
201                 i.leave()
202                 os.unlink(i.khashmir.db)
203             except:
204                 pass