]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/khashmir.py
Change all unittests to use twisted's trial.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index eaee998e384a6dff148884559e8aba01a148cb5a..b253c3859b09403ce5c9138296ba49ebe94dafca 100644 (file)
@@ -3,12 +3,13 @@
 
 from time import time
 from random import randrange
+from sha import sha
 import os
 import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 from twisted.internet.defer import Deferred
-from twisted.internet import protocol
-from twisted.internet import reactor
+from twisted.internet import protocol, reactor
+from twisted.trial import unittest
 
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
@@ -28,19 +29,19 @@ class KhashmirBase(protocol.Factory):
         
     def setup(self, config, cache_dir):
         self.config = config
-        self._findDB(os.path.join(cache_dir, 'khashmir.db'))
         self.port = config['PORT']
+        self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         #self.app = service.Application("krpc")
         self.udp = krpc.hostbroker(self)
         self.udp.protocol = krpc.KRPC
-        self.listenport = reactor.listenUDP(port, self.udp)
+        self.listenport = reactor.listenUDP(self.port, self.udp)
         self.last = time()
         self._loadRoutingTable()
-        KeyExpirer(self.store, config)
+        self.expirer = KeyExpirer(self.store, config)
         self.refreshTable(force=1)
-        reactor.callLater(60, self.checkpoint, (1,))
+        self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
 
     def Node(self):
         n = self._Node()
@@ -70,11 +71,12 @@ class KhashmirBase(protocol.Factory):
         self._dumpRoutingTable()
         self.refreshTable()
         if auto:
-            reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
+            self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
                               self.checkpoint, (1,))
         
     def _findDB(self, db):
+        self.db = db
         try:
             os.stat(db)
         except OSError:
@@ -350,3 +352,175 @@ class KhashmirWrite(KhashmirRead):
 # the whole shebang, for testing
 class Khashmir(KhashmirWrite):
     _Node = KNodeWrite
+
+class SimpleTests(unittest.TestCase):
+    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
+                    'KE_AGE': 3600, }
+
+    def setUp(self):
+        d = self.DHT_DEFAULTS.copy()
+        d['PORT'] = 4044
+        self.a = Khashmir(d)
+        d = self.DHT_DEFAULTS.copy()
+        d['PORT'] = 4045
+        self.b = Khashmir(d)
+        
+    def tearDown(self):
+        self.a.listenport.stopListening()
+        self.b.listenport.stopListening()
+        try:
+            self.a.next_checkpoint.cancel()
+        except:
+            pass
+        try:
+            self.b.next_checkpoint.cancel()
+        except:
+            pass
+        try:
+            self.a.expirer.next_expire.cancel()
+        except:
+            pass
+        try:
+            self.b.expirer.next_expire.cancel()
+        except:
+            pass
+        self.a.store.close()
+        self.b.store.close()
+        os.unlink(self.a.db)
+        os.unlink(self.b.db)
+
+    def testAddContact(self):
+        self.assertEqual(len(self.a.table.buckets), 1)
+        self.assertEqual(len(self.a.table.buckets[0].l), 0)
+
+        self.assertEqual(len(self.b.table.buckets), 1)
+        self.assertEqual(len(self.b.table.buckets[0].l), 0)
+
+        self.a.addContact('127.0.0.1', 4045)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+
+        self.assertEqual(len(self.a.table.buckets), 1)
+        self.assertEqual(len(self.a.table.buckets[0].l), 1)
+        self.assertEqual(len(self.b.table.buckets), 1)
+        self.assertEqual(len(self.b.table.buckets[0].l), 1)
+
+    def testStoreRetrieve(self):
+        self.a.addContact('127.0.0.1', 4045)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.got = 0
+        self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        self.a.valueForKey(sha('foo').digest(), self._cb)
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+        reactor.iterate()
+
+    def _cb(self, val):
+        if not val:
+            self.assertEqual(self.got, 1)
+        elif 'foobar' in val:
+            self.got = 1
+
+
+class MultiTest(unittest.TestCase):
+    num = 20
+    DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+                    'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+                    'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+                    'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
+                    'KE_AGE': 3600, }
+
+    def _done(self, val):
+        self.done = 1
+        
+    def setUp(self):
+        self.l = []
+        self.startport = 4088
+        for i in range(self.num):
+            d = self.DHT_DEFAULTS.copy()
+            d['PORT'] = self.startport + i
+            self.l.append(Khashmir(d))
+        reactor.iterate()
+        reactor.iterate()
+        
+        for i in self.l:
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+            reactor.iterate()
+            reactor.iterate()
+            reactor.iterate() 
+            
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+        for i in self.l:
+            self.done = 0
+            i.findCloseNodes(self._done)
+            while not self.done:
+                reactor.iterate()
+
+    def tearDown(self):
+        for i in self.l:
+            i.listenport.stopListening()
+            try:
+                i.next_checkpoint.cancel()
+            except:
+                pass
+            try:
+                i.expirer.next_expire.cancel()
+            except:
+                pass
+            i.store.close()
+            os.unlink(i.db)
+            
+        reactor.iterate()
+        
+    def testStoreRetrieve(self):
+        for i in range(10):
+            K = newID()
+            V = newID()
+            
+            for a in range(3):
+                self.done = 0
+                def _scb(val):
+                    self.done = 1
+                self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+                while not self.done:
+                    reactor.iterate()
+
+
+                def _rcb(val):
+                    if not val:
+                        self.done = 1
+                        self.assertEqual(self.got, 1)
+                    elif V in val:
+                        self.got = 1
+                for x in range(3):
+                    self.got = 0
+                    self.done = 0
+                    self.l[randrange(0, self.num)].valueForKey(K, _rcb)
+                    while not self.done:
+                        reactor.iterate()