]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/DHT.py
Return a token in find_node responses, use it in store_value requests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / DHT.py
index d16f59837811e0bc561718a130c2fe0ddcaf3832..e62f30441b1bd555fa95e1bdeee2846c56c08958 100644 (file)
@@ -10,6 +10,7 @@ from zope.interface import implements
 
 from apt_dht.interfaces import IDHT
 from khashmir import Khashmir
+from bencode import bencode, bdecode
 
 khashmir_dir = 'apt-dht-Khashmir'
 
@@ -139,7 +140,7 @@ class DHT:
         
     def _getValue(self, key, result):
         if result:
-            self.retrieved.setdefault(key, []).extend(result)
+            self.retrieved.setdefault(key, []).extend([bdecode(r) for r in result])
         else:
             final_result = []
             if key in self.retrieved:
@@ -150,30 +151,30 @@ class DHT:
                 d.callback(final_result)
             del self.retrieving[key]
 
-    def storeValue(self, key, value, originated = None):
+    def storeValue(self, key, value):
         """See L{apt_dht.interfaces.IDHT}."""
         if self.config is None:
             raise DHTError, "configuration not loaded"
         if not self.joined:
             raise DHTError, "have not joined a network yet"
 
-        if key in self.storing and value in self.storing[key]:
+        bvalue = bencode(value)
+
+        if key in self.storing and bvalue in self.storing[key]:
             raise DHTError, "already storing that key with the same value"
 
-        if originated is None:
-            originated = datetime.utcnow()
         d = defer.Deferred()
-        self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
-        self.storing.setdefault(key, {})[value] = d
+        self.khashmir.storeValueForKey(key, bvalue, self._storeValue)
+        self.storing.setdefault(key, {})[bvalue] = d
         return d
     
-    def _storeValue(self, key, value, result):
-        if key in self.storing and value in self.storing[key]:
+    def _storeValue(self, key, bvalue, result):
+        if key in self.storing and bvalue in self.storing[key]:
             if len(result) > 0:
-                self.storing[key][value].callback(result)
+                self.storing[key][bvalue].callback(result)
             else:
-                self.storing[key][value].errback(DHTError('could not store value %s in key %s' % (value, key)))
-            del self.storing[key][value]
+                self.storing[key][bvalue].errback(DHTError('could not store value %s in key %s' % (bvalue, key)))
+            del self.storing[key][bvalue]
             if len(self.storing[key].keys()) == 0:
                 del self.storing[key]
 
@@ -182,7 +183,7 @@ class TestSimpleDHT(unittest.TestCase):
     
     timeout = 2
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
@@ -275,7 +276,7 @@ class TestMultiDHT(unittest.TestCase):
     timeout = 60
     num = 20
     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
-                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,