]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Reorganize the DHT's remote interface parameters.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 6734ff7e614173218eaaee1123e069ae80cb5dd4..94af8ae0ddc8aae044f1bad704a5ebca140cb631 100644 (file)
@@ -9,10 +9,12 @@ warnings.simplefilter("ignore", DeprecationWarning)
 from datetime import datetime, timedelta
 from random import randrange, shuffle
 from sha import sha
+from copy import copy
 import os
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
+from twisted.python import log
 from twisted.trial import unittest
 
 from db import DB
@@ -174,6 +176,7 @@ class KhashmirBase(protocol.Factory):
         """
         # Get K nodes out of local table/cache
         nodes = self.table.findNodes(id)
+        nodes = [copy(node) for node in nodes]
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
@@ -185,7 +188,7 @@ class KhashmirBase(protocol.Factory):
             d.callback(nodes)
         else:
             # Start the finding nodes action
-            state = FindNode(self, id, d.callback, self.config)
+            state = FindNode(self, id, d.callback, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
@@ -208,17 +211,20 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(oldnode = old, newnode = node):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
                 """The pinged node never responded, so replace it."""
+                log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
+                log.err(err)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old):
+            def _notStaleNodeHandler(dict, old=old, self=self):
                 """Got a pong from the old node, so update it."""
                 dict = dict['rsp']
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
             # Bucket is full, check to see if old node is still available
+            self.stats.startedAction('ping')
             df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
@@ -243,14 +249,17 @@ class KhashmirBase(protocol.Factory):
             if callback:
                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
 
-        def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
             """Error occurred, fail node and errback or callback with error."""
-            table.nodeFailed(node)
+            log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
+            log.err(err)
+            self.table.nodeFailed(node)
             if errback:
                 errback()
             elif callback:
                 callback(None)
         
+        self.stats.startedAction('join')
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
@@ -299,10 +308,10 @@ class KhashmirBase(protocol.Factory):
     
     def getStats(self):
         """Gather the statistics for the DHT."""
-        return self.stats.gather()
+        return self.stats.formatHTML()
 
     #{ Remote interface
-    def krpc_ping(self, id, _krpc_sender):
+    def krpc_ping(self, id, _krpc_sender = None):
         """Pong with our ID.
         
         @type id: C{string}
@@ -310,12 +319,13 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
 
         return {"id" : self.node.id}
         
-    def krpc_join(self, id, _krpc_sender):
+    def krpc_join(self, id, _krpc_sender = None):
         """Add the node by responding with its address and port.
         
         @type id: C{string}
@@ -323,12 +333,15 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
 
         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
         
-    def krpc_find_node(self, target, id, _krpc_sender):
+    def krpc_find_node(self, id, target, _krpc_sender = None):
         """Find the K closest nodes to the target in the local routing table.
         
         @type target: C{string}
@@ -338,8 +351,11 @@ class KhashmirBase(protocol.Factory):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
 
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.contactInfo(), nodes)
@@ -367,6 +383,7 @@ class KhashmirRead(KhashmirBase):
         """
         # Get K nodes out of local table/cache
         nodes = self.table.findNodes(key)
+        nodes = [copy(node) for node in nodes]
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
@@ -374,7 +391,7 @@ class KhashmirRead(KhashmirBase):
             d.addCallback(callback)
 
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config)
+        state = FindValue(self, key, d.callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
@@ -401,14 +418,14 @@ class KhashmirRead(KhashmirBase):
 
         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
             """Use the found nodes to send requests for values to."""
-            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key
         self.findValue(key, _getValueForKey)
 
     #{ Remote interface
-    def krpc_find_value(self, key, id, _krpc_sender):
+    def krpc_find_value(self, id, key, _krpc_sender = None):
         """Find the number of values stored locally for the key, and the K closest nodes.
         
         @type key: C{string}
@@ -418,15 +435,16 @@ class KhashmirRead(KhashmirBase):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
     
         nodes = self.table.findNodes(key)
         nodes = map(lambda node: node.contactInfo(), nodes)
         num_values = self.store.countValues(key)
         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
 
-    def krpc_get_value(self, key, num, id, _krpc_sender):
+    def krpc_get_value(self, id, key, num, _krpc_sender = None):
         """Retrieve the values stored locally for the key.
         
         @type key: C{string}
@@ -439,8 +457,9 @@ class KhashmirRead(KhashmirBase):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
     
         l = self.store.retrieveValues(key)
         if num == 0 or num >= len(l):
@@ -478,14 +497,14 @@ class KhashmirWrite(KhashmirRead):
                     """Default callback that does nothing."""
                     pass
                 response = _storedValueHandler
-            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
+            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # First find the K closest nodes to operate on.
         self.findNode(key, _storeValueForKey)
                     
     #{ Remote interface
-    def krpc_store_value(self, key, value, token, id, _krpc_sender):
+    def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
         """Store the value locally with the key.
         
         @type key: C{string}
@@ -498,8 +517,12 @@ class KhashmirWrite(KhashmirRead):
         @type _krpc_sender: (C{string}, C{int})
         @param _krpc_sender: the sender node's IP address and port
         """
-        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
-        self.insertNode(n, contacted = False)
+        if _krpc_sender is not None:
+            n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+            self.insertNode(n, contacted = False)
+        else:
+            _krpc_sender = ('127.0.0.1', self.port)
+
         for secret in self.token_secrets:
             this_token = sha(secret + _krpc_sender[0]).digest()
             if token == this_token: