Configuration variables are all passed in as a dictionary where needed.
The NULL_ID config option has been made a constant.
The ConfigParser now stores all keys as upper case.
config_file = opts.opts['config-file']
config.read(config_file)
-if config.defaults()['username']:
- uid,gid = pwd.getpwnam(config.defaults()['username'])[2:4]
+if config.has_option('DEFAULT', 'username') and config.get('DEFAULT', 'username'):
+ uid,gid = pwd.getpwnam(config.get('DEFAULT', 'username'))[2:4]
else:
uid,gid = None,None
print service.IProcess(application).processName
service.IProcess(application).processName = 'apt-dht'
-DHT = __import__(config.get('DEFAULT', 'DHT'), globals(), locals(), ['DHT'])
+DHT = __import__(config.get('DEFAULT', 'DHT')+'.DHT', globals(), locals(), ['DHT'])
assert(IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface.")
myDHT = DHT.DHT()
myDHT.loadConfig(config)
from apt_dht.apt_dht import AptDHT
myapp = AptDHT(myDHT)
site = myapp.getSite()
- s = strports.service('tcp:'+config.defaults()['port'], channel.HTTPFactory(site))
+ s = strports.service('tcp:'+config.get('DEFAULT', 'port'), channel.HTTPFactory(site))
s.setServiceParent(application)
if __name__ == '__main__':
class AptDHT:
def __init__(self, dht):
self.dht = dht
- self.http_server = TopLevel(config.defaults()['cache_dir'], self)
+ self.http_server = TopLevel(config.get('DEFAULT', 'cache_dir'), self)
self.http_site = server.Site(self.http_server)
self.peers = PeerManager()
- self.mirrors = MirrorManager(config.defaults()['cache_dir'])
+ self.mirrors = MirrorManager(config.get('DEFAULT', 'cache_dir'))
def getSite(self):
return self.http_site
DEFAULTS = {
# Port to listen on for all requests (TCP and UDP)
- 'port': '9977',
+ 'PORT': '9977',
# Directory to store the downloaded files in
- 'cache_dir': home + '/.apt-dht/cache',
+ 'CACHE_DIR': home + '/.apt-dht/cache',
# User name to try and run as
- 'username': '',
+ 'USERNAME': '',
# Which DHT implementation to use.
- # It must be possile to do "from <DHT> import DHT" to get a class that
+ # It must be possile to do "from <DHT>.DHT import DHT" to get a class that
# implements the IDHT interface.
- 'DHT': 'apt_dht_Khashmir.DHT',
+ 'DHT': 'apt_dht_Khashmir',
# Whether to only run the DHT (for providing a login node)
- 'DHT-only': 'no',
+ 'DHT-ONLY': 'no',
}
DHT_DEFAULTS = {
# bootstrap nodes to contact to join the DHT
- 'bootstrap': """www.camrdale.org:9977
+ 'BOOTSTRAP': """www.camrdale.org:9977
steveholt.hopto.org:9977""",
- # magic id to use before we know a peer's id
- 'NULL_ID': 20 * '\0',
-
# Kademlia "K" constant, this should be an even number
'K': '8',
return self.get(section,option)
def getstringlist(self, section, option):
return self.get(section,option).split()
+ def optionxform(self, option):
+ return option.upper()
config = AptDHTConfigParser(DEFAULTS)
config.add_section(config.get('DEFAULT', 'DHT'))
def loadConfig(self, config, section):
"""See L{apt_dht.interfaces.IDHT}."""
- self.config = config
+ self.config_parser = config
self.section = section
- if self.config.has_option(section, 'port'):
- self.port = self.config.get(section, 'port')
- else:
- self.port = self.config.get('DEFAULT', 'port')
+ self.config = []
+ for k in self.config_parser.options(section):
+ if k in ['K', 'HASH_LENGTH', 'CONCURRENT_REQS', 'STORE_REDUNDANCY',
+ 'MAX_FAILURES', 'PORT']:
+ self.config[k] = self.config_parser.getint(section, k)
+ elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
+ 'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
+ self.config[k] = self.config_parser.gettime(section, k)
+ else:
+ self.config[k] = self.config_parser.get(section, k)
+ if 'PORT' not in self.config:
+ self.config['PORT'] = self.config_parser.getint('DEFAULT', 'PORT')
def join(self):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "configuration not loaded"
- self.khashmir = Khashmir('', self.port)
+ self.khashmir = Khashmir(self.config, self.config_parser.get('DEFAULT', 'cache_dir'))
- for node in self.config.get(self.section, 'bootstrap'):
+ for node in self.config_parser.get(self.section, 'BOOTSTRAP'):
host, port = node.rsplit(':', 1)
self.khashmir.addContact(host, port)
from twisted.internet import reactor
-import const
from khash import intify
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, table, target, callback):
+ def __init__(self, table, target, callback, config):
self.table = table
self.target = target
+ self.config = config
self.num = intify(target)
self.found = {}
self.queried = {}
return
l = self.found.values()
l.sort(self.sort)
- for node in l[:const.K]:
+ for node in l[:self.config['K']]:
if node.id == self.target:
self.finished=1
return self.callback([node])
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
## all done!!
self.finished=1
- reactor.callLater(0, self.callback, l[:const.K])
+ reactor.callLater(0, self.callback, l[:self.config['K']])
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
get_value_timeout = 15
class GetValue(FindNode):
- def __init__(self, table, target, callback, find="findValue"):
- FindNode.__init__(self, table, target, callback)
+ def __init__(self, table, target, callback, config, find="findValue"):
+ FindNode.__init__(self, table, target, callback, config)
self.findValue = find
""" get value task """
l = self.found.values()
l.sort(self.sort)
- for node in l[:const.K]:
+ for node in l[:self.config['K']]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
try:
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
class StoreValue(ActionBase):
- def __init__(self, table, target, value, callback, store="storeValue"):
- ActionBase.__init__(self, table, target, callback)
+ def __init__(self, table, target, value, callback, config, store="storeValue"):
+ ActionBase.__init__(self, table, target, callback, config)
self.value = value
self.stored = []
self.store = store
if self.finished:
return
self.stored.append(t)
- if len(self.stored) >= const.STORE_REDUNDANCY:
+ if len(self.stored) >= self.config['STORE_REDUNDANCY']:
self.finished=1
self.callback(self.stored)
else:
- if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
self.schedule()
return t
def schedule(self):
if self.finished:
return
- num = const.CONCURRENT_REQS - self.outstanding
- if num > const.STORE_REDUNDANCY:
- num = const.STORE_REDUNDANCY
+ num = self.config['CONCURRENT_REQS'] - self.outstanding
+ if num > self.config['STORE_REDUNDANCY']:
+ num = self.config['STORE_REDUNDANCY']
for i in range(num):
try:
node = self.nodes.pop()
class KeyExpirer:
- def __init__(self, store):
+ def __init__(self, store, config):
self.store = store
- reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+ self.config = config
+ reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
def doExpire(self):
- self.cut = "%0.6f" % (time() - const.KE_AGE)
+ self.cut = "%0.6f" % (time() - self.config['KE_AGE'])
self._expire()
def _expire(self):
c = self.store.cursor()
s = "delete from kv where time < '%s';" % self.cut
c.execute(s)
- reactor.callLater(const.KE_DELAY, self.doExpire)
+ reactor.callLater(self.config['KE_DELAY'], self.doExpire)
+++ /dev/null
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-"""
-from twisted.internet.default import SelectReactor ## twistedmatrix.com
-
-reactor = SelectReactor()
-
-from twisted.internet import main
-main.installReactor(reactor)
-
-
-try:
- import twisted.names.client
- reactor.installResolver(twisted.names.client.theResolver)
-except IOError:
- print "no resolv.conf!"
-"""
-
-# magic id to use before we know a peer's id
-NULL_ID = 20 * '\0'
-
-# Kademlia "K" constant, this should be an even number
-K = 8
-
-# SHA1 is 160 bits long
-HASH_LENGTH = 160
-
-# checkpoint every this many seconds
-CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes
-
-
-### SEARCHING/STORING
-# concurrent xmlrpc calls per find node/value request!
-CONCURRENT_REQS = 4
-
-# how many hosts to post to
-STORE_REDUNDANCY = 3
-
-
-### ROUTING TABLE STUFF
-# how many times in a row a node can fail to respond before it's booted from the routing table
-MAX_FAILURES = 3
-
-# never ping a node more often than this
-MIN_PING_INTERVAL = 60 * 15 # fifteen minutes
-
-# refresh buckets that haven't been touched in this long
-BUCKET_STALENESS = 60 * 60 # one hour
-
-
-### KEY EXPIRER
-# time before expirer starts running
-KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db
-
-# time between expirer runs
-KE_DELAY = 60 * 20 # 20 minutes
-
-# expire entries older than this
-KE_AGE = 60 * 60 # 60 minutes
from time import time
from random import randrange
+import os
import sqlite ## find this at http://pysqlite.sourceforge.net/
from twisted.internet.defer import Deferred
from twisted.internet import protocol
from twisted.internet import reactor
-import const
from ktable import KTable
-from knode import KNodeBase, KNodeRead, KNodeWrite
+from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
from actions import FindNode, GetValue, KeyExpirer, StoreValue
import krpc
# this is the base class, has base functionality and find node, no key-value mappings
class KhashmirBase(protocol.Factory):
_Node = KNodeBase
- def __init__(self, host, port, db='khashmir.db'):
- self.setup(host, port, db)
+ def __init__(self, config, cache_dir='/tmp'):
+ self.config = None
+ self.setup(config, cache_dir)
- def setup(self, host, port, db='khashmir.db'):
- self._findDB(db)
- self.port = port
- self.node = self._loadSelfNode(host, port)
- self.table = KTable(self.node)
+ def setup(self, config, cache_dir):
+ self.config = config
+ self._findDB(os.path.join(cache_dir, 'khashmir.db'))
+ self.port = config['PORT']
+ self.node = self._loadSelfNode('', self.port)
+ self.table = KTable(self.node, config)
#self.app = service.Application("krpc")
self.udp = krpc.hostbroker(self)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(port, self.udp)
self.last = time()
self._loadRoutingTable()
- KeyExpirer(store=self.store)
+ KeyExpirer(self.store, config)
self.refreshTable(force=1)
reactor.callLater(60, self.checkpoint, (1,))
self._dumpRoutingTable()
self.refreshTable()
if auto:
- reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
+ reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
+ int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
+ self.checkpoint, (1,))
def _findDB(self, db):
- import os
try:
os.stat(db)
except OSError:
"""
ping this node and add the contact info to the table on pong!
"""
- n =self.Node().init(const.NULL_ID, host, port)
+ n =self.Node().init(NULL_ID, host, port)
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.sendPing(n, callback=callback)
d.callback(nodes)
else:
# create our search state
- state = FindNode(self, id, d.callback)
+ state = FindNode(self, id, d.callback, self.config)
reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, n, contacted=1):
method needs to be a properly formed Node object with a valid ID.
"""
old = self.table.insertNode(n, contacted=contacted)
- if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+ if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
## these are the callbacks used when we ping the oldest node in a bucket
pass
for bucket in self.table.buckets:
- if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+ if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
num_nodes: number of nodes estimated in the entire dht
"""
num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
- num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+ num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
return (num_contacts, num_nodes)
def krpc_ping(self, id, _krpc_sender):
l = []
# create our search state
- state = GetValue(self, key, callback)
+ state = GetValue(self, key, callback, self.config)
reactor.callLater(0, state.goWithNodes, nodes, l)
def krpc_find_value(self, key, id, _krpc_sender):
def _storedValueHandler(sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, response)
+ action = StoreValue(self.table, key, value, response, self.config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
-from const import NULL_ID
-from node import Node
+from node import Node, NULL_ID
class KNodeBase(Node):
def checkSender(self, dict):
from time import time
from bisect import bisect_left
-from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES
import khash
-from node import Node
+from node import Node, NULL_ID
class KTable:
"""local routing table for a kademlia like distributed hash table"""
- def __init__(self, node):
+ def __init__(self, node, config):
# this is the root node, a.k.a. US!
self.node = node
- self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
+ self.config = config
+ self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
self.insertNode(node)
def _bucketIndexForInt(self, num):
return
# we don't have this node, check to see if the bucket is full
- if len(self.buckets[i].l) < K:
+ if len(self.buckets[i].l) < self.config['K']:
# no, append this node and return
if contacted:
node.updateLastSeen()
return self.buckets[i].l[0]
# this bucket is full and contains our node, split the bucket
- if len(self.buckets) >= HASH_LENGTH:
+ if len(self.buckets) >= self.config['HASH_LENGTH']:
# our table is FULL, this is really unlikely
print "Hash Table is FULL! Increase K!"
return
except IndexError:
return None
else:
- if n.msgFailed() >= MAX_FAILURES:
+ if n.msgFailed() >= self.config['MAX_FAILURES']:
self.invalidateNode(n)
class KBucket:
def testFail(self):
self.testAddNode()
- for i in range(MAX_FAILURES - 1):
+ for i in range(const.MAX_FAILURES - 1):
self.t.nodeFailed(self.b)
self.assertEqual(len(self.t.buckets[0].l), 1)
self.assertEqual(self.t.buckets[0].l[0], self.b)
import khash
+# magic id to use before we know a peer's id
+NULL_ID = 20 * '\0',
+
class Node:
"""encapsulate contact info"""
def __init__(self):