From 7dab2471d589ea439ccbfe794258863c2af858e9 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 20 Dec 2007 17:54:53 -0800 Subject: [PATCH] Modify khashmir's config system to not use the const module. Configuration variables are all passed in as a dictionary where needed. The NULL_ID config option has been made a constant. The ConfigParser now stores all keys as upper case. --- apt-dht.py | 8 ++--- apt_dht/apt_dht.py | 4 +-- apt_dht/apt_dht_conf.py | 19 ++++++------ apt_dht_Khashmir/DHT.py | 22 ++++++++----- apt_dht_Khashmir/actions.py | 41 ++++++++++++------------ apt_dht_Khashmir/const.py | 60 ------------------------------------ apt_dht_Khashmir/khashmir.py | 41 ++++++++++++------------ apt_dht_Khashmir/knode.py | 3 +- apt_dht_Khashmir/ktable.py | 16 +++++----- apt_dht_Khashmir/node.py | 3 ++ 10 files changed, 85 insertions(+), 132 deletions(-) delete mode 100644 apt_dht_Khashmir/const.py diff --git a/apt-dht.py b/apt-dht.py index c7afe4d..0742589 100644 --- a/apt-dht.py +++ b/apt-dht.py @@ -45,8 +45,8 @@ if __name__ == '__main__': config_file = opts.opts['config-file'] config.read(config_file) -if config.defaults()['username']: - uid,gid = pwd.getpwnam(config.defaults()['username'])[2:4] +if config.has_option('DEFAULT', 'username') and config.get('DEFAULT', 'username'): + uid,gid = pwd.getpwnam(config.get('DEFAULT', 'username'))[2:4] else: uid,gid = None,None @@ -54,7 +54,7 @@ application = service.Application("apt-dht", uid, gid) print service.IProcess(application).processName service.IProcess(application).processName = 'apt-dht' -DHT = __import__(config.get('DEFAULT', 'DHT'), globals(), locals(), ['DHT']) +DHT = __import__(config.get('DEFAULT', 'DHT')+'.DHT', globals(), locals(), ['DHT']) assert(IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface.") myDHT = DHT.DHT() myDHT.loadConfig(config) @@ -64,7 +64,7 @@ if not config.getboolean('DEFAULT', 'DHT-only'): from apt_dht.apt_dht import AptDHT myapp = AptDHT(myDHT) site = myapp.getSite() - s = strports.service('tcp:'+config.defaults()['port'], channel.HTTPFactory(site)) + s = strports.service('tcp:'+config.get('DEFAULT', 'port'), channel.HTTPFactory(site)) s.setServiceParent(application) if __name__ == '__main__': diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index 5897de5..8d81c3d 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -10,10 +10,10 @@ from MirrorManager import MirrorManager class AptDHT: def __init__(self, dht): self.dht = dht - self.http_server = TopLevel(config.defaults()['cache_dir'], self) + self.http_server = TopLevel(config.get('DEFAULT', 'cache_dir'), self) self.http_site = server.Site(self.http_server) self.peers = PeerManager() - self.mirrors = MirrorManager(config.defaults()['cache_dir']) + self.mirrors = MirrorManager(config.get('DEFAULT', 'cache_dir')) def getSite(self): return self.http_site diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index 5d59180..0c5ca52 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -20,31 +20,28 @@ if home == '${HOME}' or not os.path.isdir(home): DEFAULTS = { # Port to listen on for all requests (TCP and UDP) - 'port': '9977', + 'PORT': '9977', # Directory to store the downloaded files in - 'cache_dir': home + '/.apt-dht/cache', + 'CACHE_DIR': home + '/.apt-dht/cache', # User name to try and run as - 'username': '', + 'USERNAME': '', # Which DHT implementation to use. - # It must be possile to do "from import DHT" to get a class that + # It must be possile to do "from .DHT import DHT" to get a class that # implements the IDHT interface. - 'DHT': 'apt_dht_Khashmir.DHT', + 'DHT': 'apt_dht_Khashmir', # Whether to only run the DHT (for providing a login node) - 'DHT-only': 'no', + 'DHT-ONLY': 'no', } DHT_DEFAULTS = { # bootstrap nodes to contact to join the DHT - 'bootstrap': """www.camrdale.org:9977 + 'BOOTSTRAP': """www.camrdale.org:9977 steveholt.hopto.org:9977""", - # magic id to use before we know a peer's id - 'NULL_ID': 20 * '\0', - # Kademlia "K" constant, this should be an even number 'K': '8', @@ -107,6 +104,8 @@ class AptDHTConfigParser(SafeConfigParser): return self.get(section,option) def getstringlist(self, section, option): return self.get(section,option).split() + def optionxform(self, option): + return option.upper() config = AptDHTConfigParser(DEFAULTS) config.add_section(config.get('DEFAULT', 'DHT')) diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index ca5bb60..c4a0b34 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -17,21 +17,29 @@ class DHT: def loadConfig(self, config, section): """See L{apt_dht.interfaces.IDHT}.""" - self.config = config + self.config_parser = config self.section = section - if self.config.has_option(section, 'port'): - self.port = self.config.get(section, 'port') - else: - self.port = self.config.get('DEFAULT', 'port') + self.config = [] + for k in self.config_parser.options(section): + if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY', + 'MAX_FAILURES', 'PORT']: + self.config[k] = self.config_parser.getint(section, k) + elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', + 'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']: + self.config[k] = self.config_parser.gettime(section, k) + else: + self.config[k] = self.config_parser.get(section, k) + if 'PORT' not in self.config: + self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT') def join(self): """See L{apt_dht.interfaces.IDHT}.""" if self.config is None: raise DHTError, "configuration not loaded" - self.khashmir = Khashmir('', self.port) + self.khashmir = Khashmir(self.config, self.config_parser.get('DEFAULT', 'cache_dir')) - for node in self.config.get(self.section, 'bootstrap'): + for node in self.config_parser.get(self.section, 'BOOTSTRAP'): host, port = node.rsplit(':', 1) self.khashmir.addContact(host, port) diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 013a9a7..dc743de 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -5,14 +5,14 @@ from time import time from twisted.internet import reactor -import const from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback): + def __init__(self, table, target, callback, config): self.table = table self.target = target + self.config = config self.num = intify(target) self.found = {} self.queried = {} @@ -70,7 +70,7 @@ class FindNode(ActionBase): return l = self.found.values() l.sort(self.sort) - for node in l[:const.K]: + for node in l[:self.config['K']]: if node.id == self.target: self.finished=1 return self.callback([node]) @@ -80,13 +80,13 @@ class FindNode(ActionBase): df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: + if self.outstanding >= self.config['CONCURRENT_REQS']: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done!! self.finished=1 - reactor.callLater(0, self.callback, l[:const.K]) + reactor.callLater(0, self.callback, l[:self.config['K']]) def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): @@ -112,8 +112,8 @@ class FindNode(ActionBase): get_value_timeout = 15 class GetValue(FindNode): - def __init__(self, table, target, callback, find="findValue"): - FindNode.__init__(self, table, target, callback) + def __init__(self, table, target, callback, config, find="findValue"): + FindNode.__init__(self, table, target, callback, config) self.findValue = find """ get value task """ @@ -159,7 +159,7 @@ class GetValue(FindNode): l = self.found.values() l.sort(self.sort) - for node in l[:const.K]: + for node in l[:self.config['K']]: if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT try: @@ -172,7 +172,7 @@ class GetValue(FindNode): df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: + if self.outstanding >= self.config['CONCURRENT_REQS']: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -196,8 +196,8 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback, store="storeValue"): - ActionBase.__init__(self, table, target, callback) + def __init__(self, table, target, value, callback, config, store="storeValue"): + ActionBase.__init__(self, table, target, callback, config) self.value = value self.stored = [] self.store = store @@ -208,11 +208,11 @@ class StoreValue(ActionBase): if self.finished: return self.stored.append(t) - if len(self.stored) >= const.STORE_REDUNDANCY: + if len(self.stored) >= self.config['STORE_REDUNDANCY']: self.finished=1 self.callback(self.stored) else: - if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']: self.schedule() return t @@ -228,9 +228,9 @@ class StoreValue(ActionBase): def schedule(self): if self.finished: return - num = const.CONCURRENT_REQS - self.outstanding - if num > const.STORE_REDUNDANCY: - num = const.STORE_REDUNDANCY + num = self.config['CONCURRENT_REQS'] - self.outstanding + if num > self.config['STORE_REDUNDANCY']: + num = self.config['STORE_REDUNDANCY'] for i in range(num): try: node = self.nodes.pop() @@ -257,16 +257,17 @@ class StoreValue(ActionBase): class KeyExpirer: - def __init__(self, store): + def __init__(self, store, config): self.store = store - reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + self.config = config + reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) def doExpire(self): - self.cut = "%0.6f" % (time() - const.KE_AGE) + self.cut = "%0.6f" % (time() - self.config['KE_AGE']) self._expire() def _expire(self): c = self.store.cursor() s = "delete from kv where time < '%s';" % self.cut c.execute(s) - reactor.callLater(const.KE_DELAY, self.doExpire) + reactor.callLater(self.config['KE_DELAY'], self.doExpire) diff --git a/apt_dht_Khashmir/const.py b/apt_dht_Khashmir/const.py deleted file mode 100644 index 58d539e..0000000 --- a/apt_dht_Khashmir/const.py +++ /dev/null @@ -1,60 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -""" -from twisted.internet.default import SelectReactor ## twistedmatrix.com - -reactor = SelectReactor() - -from twisted.internet import main -main.installReactor(reactor) - - -try: - import twisted.names.client - reactor.installResolver(twisted.names.client.theResolver) -except IOError: - print "no resolv.conf!" -""" - -# magic id to use before we know a peer's id -NULL_ID = 20 * '\0' - -# Kademlia "K" constant, this should be an even number -K = 8 - -# SHA1 is 160 bits long -HASH_LENGTH = 160 - -# checkpoint every this many seconds -CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes - - -### SEARCHING/STORING -# concurrent xmlrpc calls per find node/value request! -CONCURRENT_REQS = 4 - -# how many hosts to post to -STORE_REDUNDANCY = 3 - - -### ROUTING TABLE STUFF -# how many times in a row a node can fail to respond before it's booted from the routing table -MAX_FAILURES = 3 - -# never ping a node more often than this -MIN_PING_INTERVAL = 60 * 15 # fifteen minutes - -# refresh buckets that haven't been touched in this long -BUCKET_STALENESS = 60 * 60 # one hour - - -### KEY EXPIRER -# time before expirer starts running -KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db - -# time between expirer runs -KE_DELAY = 60 * 20 # 20 minutes - -# expire entries older than this -KE_AGE = 60 * 60 # 60 minutes diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index 0196fd2..eaee998 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -3,15 +3,15 @@ from time import time from random import randrange +import os import sqlite ## find this at http://pysqlite.sourceforge.net/ from twisted.internet.defer import Deferred from twisted.internet import protocol from twisted.internet import reactor -import const from ktable import KTable -from knode import KNodeBase, KNodeRead, KNodeWrite +from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange from actions import FindNode, GetValue, KeyExpirer, StoreValue import krpc @@ -22,21 +22,23 @@ class KhashmirDBExcept(Exception): # this is the base class, has base functionality and find node, no key-value mappings class KhashmirBase(protocol.Factory): _Node = KNodeBase - def __init__(self, host, port, db='khashmir.db'): - self.setup(host, port, db) + def __init__(self, config, cache_dir='/tmp'): + self.config = None + self.setup(config, cache_dir) - def setup(self, host, port, db='khashmir.db'): - self._findDB(db) - self.port = port - self.node = self._loadSelfNode(host, port) - self.table = KTable(self.node) + def setup(self, config, cache_dir): + self.config = config + self._findDB(os.path.join(cache_dir, 'khashmir.db')) + self.port = config['PORT'] + self.node = self._loadSelfNode('', self.port) + self.table = KTable(self.node, config) #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(port, self.udp) self.last = time() self._loadRoutingTable() - KeyExpirer(store=self.store) + KeyExpirer(self.store, config) self.refreshTable(force=1) reactor.callLater(60, self.checkpoint, (1,)) @@ -68,10 +70,11 @@ class KhashmirBase(protocol.Factory): self._dumpRoutingTable() self.refreshTable() if auto: - reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) + reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), + int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), + self.checkpoint, (1,)) def _findDB(self, db): - import os try: os.stat(db) except OSError: @@ -132,7 +135,7 @@ class KhashmirBase(protocol.Factory): """ ping this node and add the contact info to the table on pong! """ - n =self.Node().init(const.NULL_ID, host, port) + n =self.Node().init(NULL_ID, host, port) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.sendPing(n, callback=callback) @@ -150,7 +153,7 @@ class KhashmirBase(protocol.Factory): d.callback(nodes) else: # create our search state - state = FindNode(self, id, d.callback) + state = FindNode(self, id, d.callback, self.config) reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, n, contacted=1): @@ -163,7 +166,7 @@ class KhashmirBase(protocol.Factory): method needs to be a properly formed Node object with a valid ID. """ old = self.table.insertNode(n, contacted=contacted) - if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id: # the bucket is full, check to see if old node is still around and if so, replace it ## these are the callbacks used when we ping the oldest node in a bucket @@ -221,7 +224,7 @@ class KhashmirBase(protocol.Factory): pass for bucket in self.table.buckets: - if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']): id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) @@ -232,7 +235,7 @@ class KhashmirBase(protocol.Factory): num_nodes: number of nodes estimated in the entire dht """ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) - num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1)) return (num_contacts, num_nodes) def krpc_ping(self, id, _krpc_sender): @@ -286,7 +289,7 @@ class KhashmirRead(KhashmirBase): l = [] # create our search state - state = GetValue(self, key, callback) + state = GetValue(self, key, callback, self.config) reactor.callLater(0, state.goWithNodes, nodes, l) def krpc_find_value(self, key, id, _krpc_sender): @@ -321,7 +324,7 @@ class KhashmirWrite(KhashmirRead): def _storedValueHandler(sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, response) + action = StoreValue(self.table, key, value, response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py index d2cea72..1e0b299 100644 --- a/apt_dht_Khashmir/knode.py +++ b/apt_dht_Khashmir/knode.py @@ -1,8 +1,7 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from const import NULL_ID -from node import Node +from node import Node, NULL_ID class KNodeBase(Node): def checkSender(self, dict): diff --git a/apt_dht_Khashmir/ktable.py b/apt_dht_Khashmir/ktable.py index e0ff8df..6340513 100644 --- a/apt_dht_Khashmir/ktable.py +++ b/apt_dht_Khashmir/ktable.py @@ -4,16 +4,16 @@ from time import time from bisect import bisect_left -from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES import khash -from node import Node +from node import Node, NULL_ID class KTable: """local routing table for a kademlia like distributed hash table""" - def __init__(self, node): + def __init__(self, node, config): # this is the root node, a.k.a. US! self.node = node - self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] + self.config = config + self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])] self.insertNode(node) def _bucketIndexForInt(self, num): @@ -116,7 +116,7 @@ class KTable: return # we don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < K: + if len(self.buckets[i].l) < self.config['K']: # no, append this node and return if contacted: node.updateLastSeen() @@ -129,7 +129,7 @@ class KTable: return self.buckets[i].l[0] # this bucket is full and contains our node, split the bucket - if len(self.buckets) >= HASH_LENGTH: + if len(self.buckets) >= self.config['HASH_LENGTH']: # our table is FULL, this is really unlikely print "Hash Table is FULL! Increase K!" return @@ -164,7 +164,7 @@ class KTable: except IndexError: return None else: - if n.msgFailed() >= MAX_FAILURES: + if n.msgFailed() >= self.config['MAX_FAILURES']: self.invalidateNode(n) class KBucket: @@ -228,7 +228,7 @@ class TestKTable(unittest.TestCase): def testFail(self): self.testAddNode() - for i in range(MAX_FAILURES - 1): + for i in range(const.MAX_FAILURES - 1): self.t.nodeFailed(self.b) self.assertEqual(len(self.t.buckets[0].l), 1) self.assertEqual(self.t.buckets[0].l[0], self.b) diff --git a/apt_dht_Khashmir/node.py b/apt_dht_Khashmir/node.py index 35dadc6..fda6fbe 100644 --- a/apt_dht_Khashmir/node.py +++ b/apt_dht_Khashmir/node.py @@ -6,6 +6,9 @@ from types import InstanceType import khash +# magic id to use before we know a peer's id +NULL_ID = 20 * '\0', + class Node: """encapsulate contact info""" def __init__(self): -- 2.39.5