]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/khashmir.py
Workaround old sqlite not having 'select count(distinct key)'.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
index 9bd31402afb2f95c2f68d2e3d69788a3260575ab..b3b007d079ef3e6cc4f5048f1e1158b81b0b78a3 100644 (file)
@@ -40,6 +40,8 @@ class KhashmirBase(protocol.Factory):
     @ivar table: the routing table
     @type token_secrets: C{list} of C{string}
     @ivar token_secrets: the current secrets to use to create tokens
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics gatherer
     @type udp: L{krpc.hostbroker}
     @ivar udp: the factory for the KRPC protocol
     @type listenport: L{twisted.internet.interfaces.IListeningPort}
@@ -183,7 +185,7 @@ class KhashmirBase(protocol.Factory):
             d.callback(nodes)
         else:
             # Start the finding nodes action
-            state = FindNode(self, id, d.callback, self.config)
+            state = FindNode(self, id, d.callback, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
@@ -206,17 +208,20 @@ class KhashmirBase(protocol.Factory):
             (datetime.now() - old.lastSeen) > 
              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             
-            def _staleNodeHandler(oldnode = old, newnode = node):
+            def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
                 """The pinged node never responded, so replace it."""
+                log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
+                log.err(err)
                 self.table.replaceStaleNode(oldnode, newnode)
             
-            def _notStaleNodeHandler(dict, old=old):
+            def _notStaleNodeHandler(dict, old=old, self=self):
                 """Got a pong from the old node, so update it."""
                 dict = dict['rsp']
                 if dict['id'] == old.id:
                     self.table.justSeenNode(old.id)
             
             # Bucket is full, check to see if old node is still available
+            self.stats.startedAction('ping')
             df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
@@ -241,14 +246,17 @@ class KhashmirBase(protocol.Factory):
             if callback:
                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
 
-        def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
+        def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
             """Error occurred, fail node and errback or callback with error."""
-            table.nodeFailed(node)
+            log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
+            log.err(err)
+            self.table.nodeFailed(node)
             if errback:
                 errback()
             elif callback:
                 callback(None)
         
+        self.stats.startedAction('join')
         df = node.join(self.node.id)
         df.addCallbacks(_pongHandler, _defaultPong)
 
@@ -294,6 +302,10 @@ class KhashmirBase(protocol.Factory):
         except:
             pass
         self.store.close()
+    
+    def getStats(self):
+        """Gather the statistics for the DHT."""
+        return self.stats.formatHTML()
 
     #{ Remote interface
     def krpc_ping(self, id, _krpc_sender):
@@ -368,7 +380,7 @@ class KhashmirRead(KhashmirBase):
             d.addCallback(callback)
 
         # Search for others starting with the locally found ones
-        state = FindValue(self, key, d.callback, self.config)
+        state = FindValue(self, key, d.callback, self.config, self.stats)
         reactor.callLater(0, state.goWithNodes, nodes)
 
     def valueForKey(self, key, callback, searchlocal = True):
@@ -395,7 +407,7 @@ class KhashmirRead(KhashmirBase):
 
         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
             """Use the found nodes to send requests for values to."""
-            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key
@@ -472,7 +484,7 @@ class KhashmirWrite(KhashmirRead):
                     """Default callback that does nothing."""
                     pass
                 response = _storedValueHandler
-            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
+            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # First find the K closest nodes to operate on.