]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/khashmir.py
Added complicated testing to find our IP address.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index be60243dcd23bb26a0ed8831c16019eb5cc5adc6..8c75c48df2727c4d98a60db575bfe24367eb3df3 100644 (file)
@@ -4,7 +4,7 @@
 import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
-from time import time
+from datetime import datetime, timedelta
 from random import randrange
 from sha import sha
 import os
@@ -34,18 +34,19 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         #self.app = service.Application("krpc")
-        self.udp = krpc.hostbroker(self)
+        self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(self.port, self.udp)
-        self.last = time()
         self._loadRoutingTable()
         self.expirer = KeyExpirer(self.store, config)
         self.refreshTable(force=1)
         self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
 
-    def Node(self):
-        n = self._Node()
+    def Node(self, id, host = None, port = None):
+        """Create a new node."""
+        n = self._Node(id, host, port)
         n.table = self.table
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
         return n
     
     def __del__(self):
@@ -55,7 +56,7 @@ class KhashmirBase(protocol.Factory):
         id = self.store.getSelfNode()
         if not id:
             id = newID()
-        return self._Node().init(id, host, port)
+        return self._Node(id, host, port)
         
     def checkpoint(self, auto=0):
         self.store.saveSelfNode(self.node.id)
@@ -73,20 +74,18 @@ class KhashmirBase(protocol.Factory):
         """
         nodes = self.store.getRoutingTable()
         for rec in nodes:
-            n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
-            n.conn = self.udp.connectionForAddr((n.host, n.port))
+            n = self.Node(rec[0], rec[1], int(rec[2]))
             self.table.insertNode(n, contacted=0)
             
 
     #######
     #######  LOCAL INTERFACE    - use these methods!
-    def addContact(self, host, port, callback=None):
+    def addContact(self, host, port, callback=None, errback=None):
         """
             ping this node and add the contact info to the table on pong!
         """
-        n =self.Node().init(NULL_ID, host, port) 
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
-        self.sendPing(n, callback=callback)
+        n = self.Node(NULL_ID, host, port)
+        self.sendJoin(n, callback=callback, errback=errback)
 
     ## this call is async!
     def findNode(self, id, callback, errback=None):
@@ -115,7 +114,9 @@ class KhashmirBase(protocol.Factory):
         method needs to be a properly formed Node object with a valid ID.
         """
         old = self.table.insertNode(n, contacted=contacted)
-        if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
+        if (old and old.id != self.node.id and
+            (datetime.now() - old.lastSeen) > 
+             timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
             # the bucket is full, check to see if old node is still around and if so, replace it
             
             ## these are the callbacks used when we ping the oldest node in a bucket
@@ -132,38 +133,34 @@ class KhashmirBase(protocol.Factory):
             df = old.ping(self.node.id)
             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
 
-    def sendPing(self, node, callback=None):
+    def sendJoin(self, node, callback=None, errback=None):
         """
             ping a node
         """
-        df = node.ping(self.node.id)
+        df = node.join(self.node.id)
         ## these are the callbacks we use when we issue a PING
-        def _pongHandler(dict, node=node, table=self.table, callback=callback):
-            _krpc_sender = dict['_krpc_sender']
-            dict = dict['rsp']
-            sender = {'id' : dict['id']}
-            sender['host'] = _krpc_sender[0]
-            sender['port'] = _krpc_sender[1]
-            n = self.Node().initWithDict(sender)
-            n.conn = self.udp.connectionForAddr((n.host, n.port))
-            table.insertNode(n)
+        def _pongHandler(dict, node=node, self=self, callback=callback):
+            n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+            self.insertNode(n)
             if callback:
-                callback()
-        def _defaultPong(err, node=node, table=self.table, callback=callback):
+                callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
+        def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
             table.nodeFailed(node)
-            if callback:
-                callback()
+            if errback:
+                errback()
+            else:
+                callback(None)
         
         df.addCallbacks(_pongHandler,_defaultPong)
 
-    def findCloseNodes(self, callback=lambda a: None):
+    def findCloseNodes(self, callback=lambda a: None, errback = None):
         """
             This does a findNode on the ID one away from our own.  
             This will allow us to populate our table with nodes on our network closest to our own.
             This is called as soon as we start up with an empty table
         """
         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-        self.findNode(id, callback)
+        self.findNode(id, callback, errback)
 
     def refreshTable(self, force=0):
         """
@@ -173,7 +170,8 @@ class KhashmirBase(protocol.Factory):
             pass
     
         for bucket in self.table.buckets:
-            if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
+            if force or (datetime.now() - bucket.lastAccessed > 
+                         timedelta(seconds=self.config['BUCKET_STALENESS'])):
                 id = newIDInRange(bucket.min, bucket.max)
                 self.findNode(id, callback)
 
@@ -197,24 +195,22 @@ class KhashmirBase(protocol.Factory):
         self.expirer.shutdown()
         self.store.close()
 
+    #### Remote Interface - called by remote nodes
     def krpc_ping(self, id, _krpc_sender):
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = self.Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
         return {"id" : self.node.id}
         
+    def krpc_join(self, id, _krpc_sender):
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+        self.insertNode(n, contacted=0)
+        return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
+        
     def krpc_find_node(self, target, id, _krpc_sender):
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+        self.insertNode(n, contacted=0)
         nodes = self.table.findNodes(target)
         nodes = map(lambda node: node.senderDict(), nodes)
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = self.Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
-        self.insertNode(n, contacted=0)
         return {"nodes" : nodes, "id" : self.node.id}
 
 
@@ -243,12 +239,9 @@ class KhashmirRead(KhashmirBase):
         state = GetValue(self, key, callback, self.config)
         reactor.callLater(0, state.goWithNodes, nodes, l)
 
+    #### Remote Interface - called by remote nodes
     def krpc_find_value(self, key, id, _krpc_sender):
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = self.Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
     
         l = self.store.retrieveValues(key)
@@ -281,14 +274,11 @@ class KhashmirWrite(KhashmirRead):
         # this call is asynch
         self.findNode(key, _storeValueForKey)
                     
+    #### Remote Interface - called by remote nodes
     def krpc_store_value(self, key, value, id, _krpc_sender):
-        self.store.storeValue(key, value)
-        sender = {'id' : id}
-        sender['host'] = _krpc_sender[0]
-        sender['port'] = _krpc_sender[1]        
-        n = self.Node().initWithDict(sender)
-        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
+        self.store.storeValue(key, value)
         return {"id" : self.node.id}
 
 # the whole shebang, for testing
@@ -303,7 +293,7 @@ class SimpleTests(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def setUp(self):
         krpc.KRPC.noisy = 0
@@ -321,11 +311,11 @@ class SimpleTests(unittest.TestCase):
         os.unlink(self.b.store.db)
 
     def testAddContact(self):
-        self.assertEqual(len(self.a.table.buckets), 1)
-        self.assertEqual(len(self.a.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.a.table.buckets), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
 
-        self.assertEqual(len(self.b.table.buckets), 1)
-        self.assertEqual(len(self.b.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.b.table.buckets), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
@@ -333,10 +323,10 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
 
-        self.assertEqual(len(self.a.table.buckets), 1)
-        self.assertEqual(len(self.a.table.buckets[0].l), 1)
-        self.assertEqual(len(self.b.table.buckets), 1)
-        self.assertEqual(len(self.b.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.a.table.buckets), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.b.table.buckets), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
@@ -363,7 +353,7 @@ class SimpleTests(unittest.TestCase):
 
     def _cb(self, key, val):
         if not val:
-            self.assertEqual(self.got, 1)
+            self.failUnlessEqual(self.got, 1)
         elif 'foobar' in val:
             self.got = 1
 
@@ -377,7 +367,7 @@ class MultiTest(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def _done(self, val):
         self.done = 1
@@ -435,7 +425,7 @@ class MultiTest(unittest.TestCase):
                 def _rcb(key, val):
                     if not val:
                         self.done = 1
-                        self.assertEqual(self.got, 1)
+                        self.failUnlessEqual(self.got, 1)
                     elif V in val:
                         self.got = 1
                 for x in range(3):