from time import time
from random import randrange
+from sha import sha
import os
import sqlite ## find this at http://pysqlite.sourceforge.net/
from twisted.internet.defer import Deferred
-from twisted.internet import protocol
-from twisted.internet import reactor
+from twisted.internet import protocol, reactor
+from twisted.trial import unittest
from ktable import KTable
from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
def setup(self, config, cache_dir):
self.config = config
- self._findDB(os.path.join(cache_dir, 'khashmir.db'))
self.port = config['PORT']
+ self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
#self.app = service.Application("krpc")
self.udp = krpc.hostbroker(self)
self.udp.protocol = krpc.KRPC
- self.listenport = reactor.listenUDP(port, self.udp)
+ self.listenport = reactor.listenUDP(self.port, self.udp)
self.last = time()
self._loadRoutingTable()
- KeyExpirer(self.store, config)
+ self.expirer = KeyExpirer(self.store, config)
self.refreshTable(force=1)
- reactor.callLater(60, self.checkpoint, (1,))
+ self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
def Node(self):
n = self._Node()
self._dumpRoutingTable()
self.refreshTable()
if auto:
- reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
+ self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
self.checkpoint, (1,))
def _findDB(self, db):
+ self.db = db
try:
os.stat(db)
except OSError:
# the whole shebang, for testing
class Khashmir(KhashmirWrite):
_Node = KNodeWrite
+
+class SimpleTests(unittest.TestCase):
+ DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
+ 'KE_AGE': 3600, }
+
+ def setUp(self):
+ d = self.DHT_DEFAULTS.copy()
+ d['PORT'] = 4044
+ self.a = Khashmir(d)
+ d = self.DHT_DEFAULTS.copy()
+ d['PORT'] = 4045
+ self.b = Khashmir(d)
+
+ def tearDown(self):
+ self.a.listenport.stopListening()
+ self.b.listenport.stopListening()
+ try:
+ self.a.next_checkpoint.cancel()
+ except:
+ pass
+ try:
+ self.b.next_checkpoint.cancel()
+ except:
+ pass
+ try:
+ self.a.expirer.next_expire.cancel()
+ except:
+ pass
+ try:
+ self.b.expirer.next_expire.cancel()
+ except:
+ pass
+ self.a.store.close()
+ self.b.store.close()
+ os.unlink(self.a.db)
+ os.unlink(self.b.db)
+
+ def testAddContact(self):
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 0)
+
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 0)
+
+ self.a.addContact('127.0.0.1', 4045)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 1)
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 1)
+
+ def testStoreRetrieve(self):
+ self.a.addContact('127.0.0.1', 4045)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.got = 0
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.a.valueForKey(sha('foo').digest(), self._cb)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ def _cb(self, val):
+ if not val:
+ self.assertEqual(self.got, 1)
+ elif 'foobar' in val:
+ self.got = 1
+
+
+class MultiTest(unittest.TestCase):
+ num = 20
+ DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
+ 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
+ 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
+ 'KE_AGE': 3600, }
+
+ def _done(self, val):
+ self.done = 1
+
+ def setUp(self):
+ self.l = []
+ self.startport = 4088
+ for i in range(self.num):
+ d = self.DHT_DEFAULTS.copy()
+ d['PORT'] = self.startport + i
+ self.l.append(Khashmir(d))
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+
+ def tearDown(self):
+ for i in self.l:
+ i.listenport.stopListening()
+ try:
+ i.next_checkpoint.cancel()
+ except:
+ pass
+ try:
+ i.expirer.next_expire.cancel()
+ except:
+ pass
+ i.store.close()
+ os.unlink(i.db)
+
+ reactor.iterate()
+
+ def testStoreRetrieve(self):
+ for i in range(10):
+ K = newID()
+ V = newID()
+
+ for a in range(3):
+ self.done = 0
+ def _scb(val):
+ self.done = 1
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+ while not self.done:
+ reactor.iterate()
+
+
+ def _rcb(val):
+ if not val:
+ self.done = 1
+ self.assertEqual(self.got, 1)
+ elif V in val:
+ self.got = 1
+ for x in range(3):
+ self.got = 0
+ self.done = 0
+ self.l[randrange(0, self.num)].valueForKey(K, _rcb)
+ while not self.done:
+ reactor.iterate()