From: Cameron Dale Date: Sun, 16 Dec 2007 03:53:41 +0000 (-0800) Subject: Moved the files to appropriate package directories. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=dd75e47b4d4ee40dae492753a226d5a42ac73c1c Moved the files to appropriate package directories. --- diff --git a/AptPackages.py b/AptPackages.py deleted file mode 100644 index a7a2a54..0000000 --- a/AptPackages.py +++ /dev/null @@ -1,507 +0,0 @@ -# Disable the FutureWarning from the apt module -import warnings -warnings.simplefilter("ignore", FutureWarning) - -import os, shelve -from random import choice -from shutil import rmtree -from copy import deepcopy -from UserDict import DictMixin - -from twisted.internet import threads, defer -from twisted.python import log -from twisted.trial import unittest - -import apt_pkg, apt_inst -from apt import OpProgress - -apt_pkg.init() - -class PackageFileList(DictMixin): - """Manages a list of package files belonging to a backend. - - @type packages: C{shelve dictionary} - @ivar packages: the files stored for this backend - """ - - def __init__(self, cache_dir): - self.cache_dir = cache_dir - if not os.path.exists(self.cache_dir): - os.makedirs(self.cache_dir) - self.packages = None - self.open() - - def open(self): - """Open the persistent dictionary of files in this backend.""" - if self.packages is None: - self.packages = shelve.open(self.cache_dir+'/packages.db') - - def close(self): - """Close the persistent dictionary.""" - if self.packages is not None: - self.packages.close() - - def update_file(self, cache_path, file_path): - """Check if an updated file needs to be tracked. - - Called from the mirror manager when files get updated so we can update our - fake lists and sources.list. - """ - filename = cache_path.split('/')[-1] - if filename=="Packages" or filename=="Release" or filename=="Sources": - log.msg("Registering package file: "+cache_path) - self.packages[cache_path] = file_path - return True - return False - - def check_files(self): - """Check all files in the database to make sure they exist.""" - files = self.packages.keys() - for f in files: - if not os.path.exists(self.packages[f]): - log.msg("File in packages database has been deleted: "+f) - del self.packages[f] - - # Standard dictionary implementation so this class can be used like a dictionary. - def __getitem__(self, key): return self.packages[key] - def __setitem__(self, key, item): self.packages[key] = item - def __delitem__(self, key): del self.packages[key] - def keys(self): return self.packages.keys() - -class AptPackages: - """Uses python-apt to answer queries about packages. - - Makes a fake configuration for python-apt for each backend. - """ - - DEFAULT_APT_CONFIG = { - #'APT' : '', - #'APT::Architecture' : 'i386', # Commented so the machine's config will set this - #'APT::Default-Release' : 'unstable', - 'Dir':'.', # / - 'Dir::State' : 'apt/', # var/lib/apt/ - 'Dir::State::Lists': 'lists/', # lists/ - #'Dir::State::cdroms' : 'cdroms.list', - 'Dir::State::userstatus' : 'status.user', - 'Dir::State::status': 'dpkg/status', # '/var/lib/dpkg/status' - 'Dir::Cache' : '.apt/cache/', # var/cache/apt/ - #'Dir::Cache::archives' : 'archives/', - 'Dir::Cache::srcpkgcache' : 'srcpkgcache.bin', - 'Dir::Cache::pkgcache' : 'pkgcache.bin', - 'Dir::Etc' : 'apt/etc/', # etc/apt/ - 'Dir::Etc::sourcelist' : 'sources.list', - 'Dir::Etc::vendorlist' : 'vendors.list', - 'Dir::Etc::vendorparts' : 'vendors.list.d', - #'Dir::Etc::main' : 'apt.conf', - #'Dir::Etc::parts' : 'apt.conf.d', - #'Dir::Etc::preferences' : 'preferences', - 'Dir::Bin' : '', - #'Dir::Bin::methods' : '', #'/usr/lib/apt/methods' - 'Dir::Bin::dpkg' : '/usr/bin/dpkg', - #'DPkg' : '', - #'DPkg::Pre-Install-Pkgs' : '', - #'DPkg::Tools' : '', - #'DPkg::Tools::Options' : '', - #'DPkg::Tools::Options::/usr/bin/apt-listchanges' : '', - #'DPkg::Tools::Options::/usr/bin/apt-listchanges::Version' : '2', - #'DPkg::Post-Invoke' : '', - } - essential_dirs = ('apt', 'apt/cache', 'apt/dpkg', 'apt/etc', 'apt/lists', - 'apt/lists/partial') - essential_files = ('apt/dpkg/status', 'apt/etc/sources.list',) - - def __init__(self, cache_dir): - """Construct a new packages manager. - - @ivar backendName: name of backend associated with this packages file - @ivar cache_dir: cache directory from config file - """ - self.cache_dir = cache_dir - self.apt_config = deepcopy(self.DEFAULT_APT_CONFIG) - - for dir in self.essential_dirs: - path = os.path.join(self.cache_dir, dir) - if not os.path.exists(path): - os.makedirs(path) - for file in self.essential_files: - path = os.path.join(self.cache_dir, file) - if not os.path.exists(path): - f = open(path,'w') - f.close() - del f - - self.apt_config['Dir'] = self.cache_dir - self.apt_config['Dir::State::status'] = os.path.join(self.cache_dir, - self.apt_config['Dir::State'], self.apt_config['Dir::State::status']) - self.packages = PackageFileList(cache_dir) - self.loaded = 0 - self.loading = None - - def __del__(self): - self.cleanup() - self.packages.close() - - def addRelease(self, cache_path, file_path): - """Dirty hack until python-apt supports apt-pkg/indexrecords.h - (see Bug #456141) - """ - self.indexrecords[cache_path] = {} - - read_packages = False - f = open(file_path, 'r') - - for line in f: - line = line.rstrip() - - if line[:1] != " ": - read_packages = False - try: - # Read the various headers from the file - h, v = line.split(":", 1) - if h == "MD5Sum" or h == "SHA1" or h == "SHA256": - read_packages = True - hash_type = h - except: - # Bad header line, just ignore it - log.msg("WARNING: Ignoring badly formatted Release line: %s" % line) - - # Skip to the next line - continue - - # Read file names from the multiple hash sections of the file - if read_packages: - p = line.split() - self.indexrecords[cache_path].setdefault(p[2], {})[hash_type] = (p[0], p[1]) - - f.close() - - def file_updated(self, cache_path, file_path): - """A file in the backend has changed, manage it. - - If this affects us, unload our apt database - """ - if self.packages.update_file(cache_path, file_path): - self.unload() - - def load(self): - """Make sure the package is initialized and loaded.""" - if self.loading is None: - self.loading = threads.deferToThread(self._load) - self.loading.addCallback(self.doneLoading) - return self.loading - - def doneLoading(self, loadResult): - """Cache is loaded.""" - self.loading = None - # Must pass on the result for the next callback - return loadResult - - def _load(self): - """Regenerates the fake configuration and load the packages cache.""" - if self.loaded: return True - apt_pkg.InitSystem() - rmtree(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'])) - os.makedirs(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'], 'partial')) - sources_filename = os.path.join(self.cache_dir, self.apt_config['Dir::Etc'], - self.apt_config['Dir::Etc::sourcelist']) - sources = open(sources_filename, 'w') - sources_count = 0 - self.packages.check_files() - self.indexrecords = {} - for f in self.packages: - # we should probably clear old entries from self.packages and - # take into account the recorded mtime as optimization - filepath = self.packages[f] - if f.split('/')[-1] == "Release": - self.addRelease(f, filepath) - fake_uri='http://apt-dht'+f - fake_dirname = '/'.join(fake_uri.split('/')[:-1]) - if f.endswith('Sources'): - source_line='deb-src '+fake_dirname+'/ /' - else: - source_line='deb '+fake_dirname+'/ /' - listpath=(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'], - apt_pkg.URItoFileName(fake_uri))) - sources.write(source_line+'\n') - log.msg("Sources line: " + source_line) - sources_count = sources_count + 1 - - try: - #we should empty the directory instead - os.unlink(listpath) - except: - pass - os.symlink(filepath, listpath) - sources.close() - - if sources_count == 0: - log.msg("No Packages files available for %s backend"%(self.cache_dir)) - return False - - log.msg("Loading Packages database for "+self.cache_dir) - for key, value in self.apt_config.items(): - apt_pkg.Config[key] = value - - self.cache = apt_pkg.GetCache(OpProgress()) - self.records = apt_pkg.GetPkgRecords(self.cache) - self.srcrecords = apt_pkg.GetPkgSrcRecords() - - self.loaded = 1 - return True - - def unload(self): - """Tries to make the packages server quit.""" - if self.loaded: - del self.cache - del self.records - del self.srcrecords - del self.indexrecords - self.loaded = 0 - - def cleanup(self): - """Cleanup and close any loaded caches.""" - self.unload() - self.packages.close() - - def findHash(self, path): - """Find the hash for a given path in this mirror. - - Returns a deferred so it can make sure the cache is loaded first. - """ - d = defer.Deferred() - - deferLoad = self.load() - deferLoad.addCallback(self._findHash, path, d) - - return d - - def _findHash(self, loadResult, path, d): - """Really find the hash for a path. - - Have to pass the returned loadResult on in case other calls to this - function are pending. - """ - if not loadResult: - d.callback((None, None)) - return loadResult - - # First look for the path in the cache of index files - for release in self.indexrecords: - if path.startswith(release[:-7]): - for indexFile in self.indexrecords[release]: - if release[:-7] + indexFile == path: - d.callback(self.indexrecords[release][indexFile]['SHA1']) - return loadResult - - package = path.split('/')[-1].split('_')[0] - - # Check the binary packages - try: - for version in self.cache[package].VersionList: - size = version.Size - for verFile in version.FileList: - if self.records.Lookup(verFile): - if '/' + self.records.FileName == path: - d.callback((self.records.SHA1Hash, size)) - return loadResult - except KeyError: - pass - - # Check the source packages' files - self.srcrecords.Restart() - if self.srcrecords.Lookup(package): - for f in self.srcrecords.Files: - if path == '/' + f[2]: - d.callback((f[0], f[1])) - return loadResult - - d.callback((None, None)) - return loadResult - -class TestAptPackages(unittest.TestCase): - """Unit tests for the AptPackages cache.""" - - pending_calls = [] - client = None - packagesFile = '' - sourcesFile = '' - releaseFile = '' - - def setUp(self): - self.client = AptPackages('/tmp/.apt-dht') - - self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n') - self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n') - for f in os.walk('/var/lib/apt/lists').next()[2]: - if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]): - self.releaseFile = f - break - - self.client.file_updated(self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.releaseFile) - self.client.file_updated(self.packagesFile[self.packagesFile.find('_debian_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.packagesFile) - self.client.file_updated(self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.sourcesFile) - - def test_pkg_hash(self): - self.client._load() - - self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0]) - - pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^SHA1:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - - self.failUnless(self.client.records.SHA1Hash == pkg_hash, - "Hashes don't match: %s != %s" % (self.client.records.SHA1Hash, pkg_hash)) - - def test_src_hash(self): - self.client._load() - - self.client.srcrecords.Lookup('dpkg') - - src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 2').read().split('\n')[:-1] - - for f in self.client.srcrecords.Files: - self.failUnless(f[0] in src_hashes, "Couldn't find %s in: %r" % (f[0], src_hashes)) - - def test_index_hash(self): - self.client._load() - - indexhash = self.client.indexrecords[self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')]['main/binary-i386/Packages.bz2']['SHA1'][0] - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/binary-i386/Packages.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - - self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash)) - - def verifyHash(self, found_hash, path, true_hash): - self.failUnless(found_hash[0] == true_hash, - "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash)) - - def test_findIndexHash(self): - lastDefer = defer.Deferred() - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/binary-i386/Packages.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' - - d = self.client.findHash(idx_path) - d.addCallback(self.verifyHash, idx_path, idx_hash) - - d.addCallback(lastDefer.callback) - return lastDefer - - def test_findPkgHash(self): - lastDefer = defer.Deferred() - - pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^SHA1:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^Filename:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - - d = self.client.findHash(pkg_path) - d.addCallback(self.verifyHash, pkg_path, pkg_hash) - - d.addCallback(lastDefer.callback) - return lastDefer - - def test_findSrcHash(self): - lastDefer = defer.Deferred() - - src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -E "^Directory:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 2').read().split('\n')[:-1] - src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 4').read().split('\n')[:-1] - - i = choice(range(len(src_hashes))) - d = self.client.findHash(src_dir + '/' + src_paths[i]) - d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i]) - - d.addCallback(lastDefer.callback) - return lastDefer - - def test_multipleFindHash(self): - lastDefer = defer.Deferred() - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/binary-i386/Packages.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' - - d = self.client.findHash(idx_path) - d.addCallback(self.verifyHash, idx_path, idx_hash) - - pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^SHA1:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^Filename:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - - d = self.client.findHash(pkg_path) - d.addCallback(self.verifyHash, pkg_path, pkg_hash) - - src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -E "^Directory:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 2').read().split('\n')[:-1] - src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 4').read().split('\n')[:-1] - - for i in range(len(src_hashes)): - d = self.client.findHash(src_dir + '/' + src_paths[i]) - d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i]) - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/source/Sources.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/source/Sources.bz2' - - d = self.client.findHash(idx_path) - d.addCallback(self.verifyHash, idx_path, idx_hash) - - d.addCallback(lastDefer.callback) - return lastDefer - - def tearDown(self): - for p in self.pending_calls: - if p.active(): - p.cancel() - self.pending_calls = [] - self.client.cleanup() - self.client = None diff --git a/HTTPDownloader.py b/HTTPDownloader.py deleted file mode 100644 index 7e5a06c..0000000 --- a/HTTPDownloader.py +++ /dev/null @@ -1,226 +0,0 @@ - -from twisted.internet import reactor, defer, protocol -from twisted.internet.protocol import ClientFactory -from twisted import version as twisted_version -from twisted.web2.client.interfaces import IHTTPClientManager -from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol -from twisted.web2 import stream as stream_mod, http_headers -from twisted.web2 import version as web2_version -from twisted.trial import unittest -from zope.interface import implements - -from apt_dht_conf import version - -class HTTPClientManager(ClientFactory): - """A manager for all HTTP requests to a single site. - - Controls all requests that got to a single site (host and port). - This includes buffering requests until they can be sent and reconnecting - in the even of the connection being closed. - - """ - - implements(IHTTPClientManager) - - def __init__(self, host, port=80): - self.host = host - self.port = port - self.busy = False - self.pipeline = False - self.closed = True - self.connecting = False - self.request_queue = [] - self.response_queue = [] - self.proto = None - self.connector = None - - def connect(self): - assert(self.closed and not self.connecting) - self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) - d.addCallback(self.connected) - - def connected(self, proto): - self.closed = False - self.connecting = False - self.proto = proto - self.processQueue() - - def close(self): - if not self.closed: - self.proto.transport.loseConnection() - - def is_idle(self): - return not self.busy and not self.request_queue and not self.response_queue - - def submitRequest(self, request): - request.deferRequest = defer.Deferred() - self.request_queue.append(request) - self.processQueue() - return request.deferRequest - - def processQueue(self): - if not self.request_queue: - return - if self.connecting: - return - if self.closed: - self.connect() - return - if self.busy and not self.pipeline: - return - if self.response_queue and not self.pipeline: - return - - req = self.request_queue.pop(0) - self.response_queue.append(req) - req.deferResponse = self.proto.submitRequest(req, False) - req.deferResponse.addCallback(self.requestComplete) - req.deferResponse.addErrback(self.requestError) - - def requestComplete(self, resp): - req = self.response_queue.pop(0) - req.deferRequest.callback(resp) - - def requestError(self, error): - req = self.response_queue.pop(0) - req.deferRequest.errback(error) - - def clientBusy(self, proto): - self.busy = True - - def clientIdle(self, proto): - self.busy = False - self.processQueue() - - def clientPipelining(self, proto): - self.pipeline = True - self.processQueue() - - def clientGone(self, proto): - for req in self.response_queue: - req.deferRequest.errback(ProtocolError('lost connection')) - self.busy = False - self.pipeline = False - self.closed = True - self.connecting = False - self.response_queue = [] - self.proto = None - if self.request_queue: - self.processQueue() - - def setCommonHeaders(self): - headers = http_headers.Headers() - headers.setHeader('Host', self.host) - headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % - (version.short(), twisted_version.short(), web2_version.short())) - return headers - - def get(self, path, method="GET"): - headers = self.setCommonHeaders() - return self.submitRequest(ClientRequest(method, path, headers, None)) - - def getRange(self, path, rangeStart, rangeEnd, method="GET"): - headers = self.setCommonHeaders() - headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) - return self.submitRequest(ClientRequest(method, path, headers, None)) - -class TestClientManager(unittest.TestCase): - """Unit tests for the HTTPClientManager.""" - - client = None - pending_calls = [] - - def gotResp(self, resp, num, expect): - self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) - if expect is not None: - self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream_mod.readStream(resp.stream, print_).addCallback(printdone) - - def test_download(self): - host = 'www.camrdale.org' - self.client = HTTPClientManager(host, 80) - self.timeout = 10 - - d = self.client.get('/robots.txt') - d.addCallback(self.gotResp, 1, 309) - return d - - def test_head(self): - host = 'www.camrdale.org' - self.client = HTTPClientManager(host, 80) - self.timeout = 10 - - d = self.client.get('/robots.txt', "HEAD") - d.addCallback(self.gotResp, 1, 0) - return d - - def test_multiple_downloads(self): - host = 'www.camrdale.org' - self.client = HTTPClientManager(host, 80) - self.timeout = 120 - lastDefer = defer.Deferred() - - def newRequest(path, num, expect, last=False): - d = self.client.get(path) - d.addCallback(self.gotResp, num, expect) - if last: - d.addCallback(lastDefer.callback) - - newRequest("/", 1, 3433) - newRequest("/blog/", 2, 37121) - newRequest("/camrdale.html", 3, 2234) - self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309)) - self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750)) - self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783)) - self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525)) - self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381)) - self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True)) - return lastDefer - - def test_multiple_quick_downloads(self): - host = 'www.camrdale.org' - self.client = HTTPClientManager(host, 80) - self.timeout = 30 - lastDefer = defer.Deferred() - - def newRequest(path, num, expect, last=False): - d = self.client.get(path) - d.addCallback(self.gotResp, num, expect) - if last: - d.addCallback(lastDefer.callback) - - newRequest("/", 1, 3433) - newRequest("/blog/", 2, 37121) - newRequest("/camrdale.html", 3, 2234) - self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True)) - return lastDefer - - def test_range(self): - host = 'www.camrdale.org' - self.client = HTTPClientManager(host, 80) - self.timeout = 10 - - d = self.client.getRange('/robots.txt', 100, 199) - d.addCallback(self.gotResp, 1, 100) - return d - - def tearDown(self): - for p in self.pending_calls: - if p.active(): - p.cancel() - self.pending_calls = [] - if self.client: - self.client.close() - self.client = None diff --git a/HTTPServer.py b/HTTPServer.py deleted file mode 100644 index 688b296..0000000 --- a/HTTPServer.py +++ /dev/null @@ -1,80 +0,0 @@ -import os.path, time - -from twisted.web2 import server, http, resource, channel -from twisted.web2 import static, http_headers, responsecode - -class FileDownloader(static.File): - - def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None): - self.manager = manager - super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames) - - def render(self, req): - resp = super(FileDownloader, self).render(req) - - if self.manager: - if resp != responsecode.NOT_FOUND: - return self.manager.check_freshness(req.uri, resp.headers.getHeader('Last-Modified'), resp) - - return self.manager.get_resp(req.uri) - - return resp - - def createSimilarFile(self, path): - return self.__class__(path, self.manager, self.defaultType, self.ignoredExts, - self.processors, self.indexNames[:]) -class TopLevel(resource.Resource): - addSlash = True - - def __init__(self, directory, manager): - self.directory = directory - self.manager = manager - self.subdirs = [] - - def addDirectory(self, directory): - path = "~" + str(len(self.subdirs)) - self.subdirs.append(directory) - return path - - def removeDirectory(self, directory): - loc = self.subdirs.index(directory) - self.subdirs[loc] = '' - - def render(self, ctx): - return http.Response( - 200, - {'content-type': http_headers.MimeType('text', 'html')}, - """ -

Statistics

-

TODO: eventually some stats will be shown here.""") - - def locateChild(self, request, segments): - name = segments[0] - if len(name) > 1 and name[0] == '~': - try: - loc = int(name[1:]) - except: - return None, () - - if loc >= 0 and loc < len(self.subdirs) and self.subdirs[loc]: - return static.File(self.subdirs[loc]), segments[1:] - else: - return None, () - -# if len(name) > 1: - return FileDownloader(self.directory, self.manager), segments[0:] -# else: -# return self, () - -if __name__ == '__builtin__': - # Running from twistd -y - t = TopLevel('/home', None) - t.addDirectory('/tmp') - t.addDirectory('/var/log') - site = server.Site(t) - - # Standard twisted application Boilerplate - from twisted.application import service, strports - application = service.Application("demoserver") - s = strports.service('tcp:18080', channel.HTTPFactory(site)) - s.setServiceParent(application) diff --git a/MirrorManager.py b/MirrorManager.py deleted file mode 100644 index 6795cc0..0000000 --- a/MirrorManager.py +++ /dev/null @@ -1,174 +0,0 @@ - -import os - -from twisted.python import log -from twisted.internet import defer -from twisted.trial import unittest - -from AptPackages import AptPackages - -aptpkg_dir='.apt-dht' - -class MirrorManager: - """Manages all requests for mirror objects.""" - - def __init__(self, cache_dir): - self.cache_dir = cache_dir - self.apt_caches = {} - - def extractPath(self, path): - site, path = path.split('/',1) - if not site: - site, path = path.split('/',1) - path = '/'+path - - # Make sure a port is included for consistency - if site.find(':') < 0: - site = site + ":80" - - i = max(path.rfind('/dists/'), path.rfind('/pool/')) - if i >= 0: - baseDir = path[:i] - path = path[i:] - else: - # Uh oh, this is not good - log.msg("Couldn't find a good base directory for path: %s" % (site + path)) - baseDir = '' - if site in self.apt_caches: - longest_match = 0 - for base in self.apt_caches[site]: - base_match = '' - for dirs in path.split('/'): - if base.startswith(base_match + '/' + dirs): - base_match += '/' + dirs - else: - break - if len(base_match) > longest_match: - longest_match = len(base_match) - baseDir = base_match - log.msg("Settled on baseDir: %s" % baseDir) - - return site, baseDir, path - - def init(self, site, baseDir): - if site not in self.apt_caches: - self.apt_caches[site] = {} - - if baseDir not in self.apt_caches[site]: - site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_')) - self.apt_caches[site][baseDir] = AptPackages(site_cache) - - def updatedFile(self, path, file_path): - site, baseDir, path = self.extractPath(path) - self.init(site, baseDir) - self.apt_caches[site][baseDir].file_updated(path, file_path) - - def findHash(self, path): - site, baseDir, path = self.extractPath(path) - if site in self.apt_caches and baseDir in self.apt_caches[site]: - return self.apt_caches[site][baseDir].findHash(path) - d = defer.Deferred() - d.errback("Not Found") - return d - -class TestMirrorManager(unittest.TestCase): - """Unit tests for the mirror manager.""" - - pending_calls = [] - client = None - - def setUp(self): - self.client = MirrorManager('/tmp') - - def test_extractPath(self): - site, baseDir, path = self.client.extractPath('/ftp.us.debian.org/debian/dists/unstable/Release') - self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site) - self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) - self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path) - - site, baseDir, path = self.client.extractPath('/ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz') - self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site) - self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) - self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path) - - def verifyHash(self, found_hash, path, true_hash): - self.failUnless(found_hash[0] == true_hash, - "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash)) - - def test_findHash(self): - self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n') - self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n') - for f in os.walk('/var/lib/apt/lists').next()[2]: - if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]): - self.releaseFile = f - break - - self.client.updatedFile('/' + self.releaseFile.replace('_','/'), - '/var/lib/apt/lists/' + self.releaseFile) - self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + - self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.packagesFile) - self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + - self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.sourcesFile) - - lastDefer = defer.Deferred() - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/binary-i386/Packages.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' - - d = self.client.findHash(idx_path) - d.addCallback(self.verifyHash, idx_path, idx_hash) - - pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^SHA1:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - pkg_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \ - os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.packagesFile + - ' | grep -E "^Filename:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - - d = self.client.findHash(pkg_path) - d.addCallback(self.verifyHash, pkg_path, pkg_hash) - - src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -E "^Directory:" | head -n 1' + - ' | cut -d\ -f 2').read().rstrip('\n') - src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 2').read().split('\n')[:-1] - src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + - '/var/lib/apt/lists/' + self.sourcesFile + - ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + - ' | cut -d\ -f 4').read().split('\n')[:-1] - - for i in range(len(src_hashes)): - src_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i] - d = self.client.findHash(src_path) - d.addCallback(self.verifyHash, src_path, src_hashes[i]) - - idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + - '/var/lib/apt/lists/' + self.releaseFile + - ' | grep -E " main/source/Sources.bz2$"' - ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2' - - d = self.client.findHash(idx_path) - d.addCallback(self.verifyHash, idx_path, idx_hash) - - d.addCallback(lastDefer.callback) - return lastDefer - - def tearDown(self): - for p in self.pending_calls: - if p.active(): - p.cancel() - self.client = None - \ No newline at end of file diff --git a/PeerManager.py b/PeerManager.py deleted file mode 100644 index e372f69..0000000 --- a/PeerManager.py +++ /dev/null @@ -1,100 +0,0 @@ - -from random import choice - -from twisted.internet import reactor, defer -from twisted.trial import unittest -from twisted.web2 import stream as stream_mod - -from HTTPDownloader import HTTPClientManager - -class PeerManager: - def __init__(self): - self.clients = {} - - def get(self, location_list, method="GET"): - """Download from a list of peers. - - @type location_list: C{list} of (C{string}, C{int}, C{string}) - @var location_list: a list of the locations where the file can be found - """ - host, port, path = choice(location_list) - return self.getPeer(host, port, path, method) - - def getPeer(self, host, port, path, method="GET"): - if not port: - port = 80 - site = host + ":" + str(port) - if site not in self.clients: - self.clients[site] = HTTPClientManager(host, port) - return self.clients[site].get(path, method) - - def close(self): - for site in self.clients: - self.clients[site].close() - self.clients = {} - -class TestPeerManager(unittest.TestCase): - """Unit tests for the PeerManager.""" - - manager = None - pending_calls = [] - - def gotResp(self, resp, num, expect): - self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) - if expect is not None: - self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) - def print_(n): - pass - def printdone(n): - pass - stream_mod.readStream(resp.stream, print_).addCallback(printdone) - - def test_download(self): - self.manager = PeerManager() - self.timeout = 10 - - host = 'www.camrdale.org' - d = self.manager.get([(host, 80, '/robots.txt')]) - d.addCallback(self.gotResp, 1, 309) - return d - - def test_head(self): - self.manager = PeerManager() - self.timeout = 10 - - host = 'www.camrdale.org' - d = self.manager.get([(host, 80, '/robots.txt')], "HEAD") - d.addCallback(self.gotResp, 1, 0) - return d - - def test_multiple_downloads(self): - self.manager = PeerManager() - self.timeout = 120 - lastDefer = defer.Deferred() - - def newRequest(host, path, num, expect, last=False): - d = self.manager.get([(host, 80, path)]) - d.addCallback(self.gotResp, num, expect) - if last: - d.addCallback(lastDefer.callback) - - newRequest('www.camrdale.org', "/", 1, 3433) - newRequest('www.camrdale.org', "/blog/", 2, 37121) - newRequest('www.google.ca', "/", 3, None) - self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None)) - self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750)) - self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None)) - self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525)) - self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381)) - self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True)) - return lastDefer - - def tearDown(self): - for p in self.pending_calls: - if p.active(): - p.cancel() - self.pending_calls = [] - if self.manager: - self.manager.close() - self.manager = None diff --git a/__init__.py b/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/actions.py b/actions.py deleted file mode 100644 index 013a9a7..0000000 --- a/actions.py +++ /dev/null @@ -1,272 +0,0 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from time import time - -from twisted.internet import reactor - -import const -from khash import intify - -class ActionBase: - """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback): - self.table = table - self.target = target - self.num = intify(target) - self.found = {} - self.queried = {} - self.answered = {} - self.callback = callback - self.outstanding = 0 - self.finished = 0 - - def sort(a, b, num=self.num): - """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = num ^ a.num, num ^ b.num - if x > y: - return 1 - elif x < y: - return -1 - return 0 - self.sort = sort - - def goWithNodes(self, t): - pass - - - -FIND_NODE_TIMEOUT = 15 - -class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - l = dict["nodes"] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - for node in l: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.schedule() - - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - for node in l[:const.K]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.id) - df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callLater(0, self.callback, l[:const.K]) - - def makeMsgFailed(self, node): - def defaultGotNodes(err, self=self, node=node): - print ">>> find failed %s/%s" % (node.host, node.port), err - self.table.table.nodeFailed(node) - self.outstanding = self.outstanding - 1 - self.schedule() - return defaultGotNodes - - def goWithNodes(self, nodes): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ - for node in nodes: - if node.id == self.table.node.id: - continue - else: - self.found[node.id] = node - - self.schedule() - - -get_value_timeout = 15 -class GetValue(FindNode): - def __init__(self, table, target, callback, find="findValue"): - FindNode.__init__(self, table, target, callback) - self.findValue = find - - """ get value task """ - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - sender = {'id' : dict["id"]} - sender['port'] = _krpc_sender[1] - sender['host'] = _krpc_sender[0] - sender = self.table.Node().initWithDict(sender) - sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if dict.has_key('nodes'): - for node in dict['nodes']: - n = self.table.Node().initWithDict(node) - n.conn = self.table.udp.connectionForAddr((n.host, n.port)) - if not self.found.has_key(n.id): - self.found[n.id] = n - elif dict.has_key('values'): - def x(y, z=self.results): - if not z.has_key(y): - z[y] = 1 - return y - else: - return None - z = len(dict['values']) - v = filter(None, map(x, dict['values'])) - if(len(v)): - reactor.callLater(0, self.callback, v) - self.schedule() - - ## get value - def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:const.K]: - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - try: - f = getattr(node, self.findValue) - except AttributeError: - print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) - else: - df = f(self.target, self.table.node.id) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - reactor.callLater(0, self.callback,[]) - - ## get value - def goWithNodes(self, nodes, found=None): - self.results = {} - if found: - for n in found: - self.results[n] = 1 - for node in nodes: - if node.id == self.table.node.id: - continue - else: - self.found[node.id] = node - - self.schedule() - - -class StoreValue(ActionBase): - def __init__(self, table, target, value, callback, store="storeValue"): - ActionBase.__init__(self, table, target, callback) - self.value = value - self.stored = [] - self.store = store - - def storedValue(self, t, node): - self.outstanding -= 1 - self.table.insertNode(node) - if self.finished: - return - self.stored.append(t) - if len(self.stored) >= const.STORE_REDUNDANCY: - self.finished=1 - self.callback(self.stored) - else: - if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: - self.schedule() - return t - - def storeFailed(self, t, node): - print ">>> store failed %s/%s" % (node.host, node.port) - self.table.nodeFailed(node) - self.outstanding -= 1 - if self.finished: - return t - self.schedule() - return t - - def schedule(self): - if self.finished: - return - num = const.CONCURRENT_REQS - self.outstanding - if num > const.STORE_REDUNDANCY: - num = const.STORE_REDUNDANCY - for i in range(num): - try: - node = self.nodes.pop() - except IndexError: - if self.outstanding == 0: - self.finished = 1 - self.callback(self.stored) - else: - if not node.id == self.table.node.id: - self.outstanding += 1 - try: - f = getattr(node, self.store) - except AttributeError: - print ">>> %s doesn't have a %s method!" % (node, self.store) - else: - df = f(self.target, self.value, self.table.node.id) - df.addCallback(self.storedValue, node=node) - df.addErrback(self.storeFailed, node=node) - - def goWithNodes(self, nodes): - self.nodes = nodes - self.nodes.sort(self.sort) - self.schedule() - - -class KeyExpirer: - def __init__(self, store): - self.store = store - reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) - - def doExpire(self): - self.cut = "%0.6f" % (time() - const.KE_AGE) - self._expire() - - def _expire(self): - c = self.store.cursor() - s = "delete from kv where time < '%s';" % self.cut - c.execute(s) - reactor.callLater(const.KE_DELAY, self.doExpire) diff --git a/apt_dht.py b/apt_dht.py deleted file mode 100644 index d30f0b8..0000000 --- a/apt_dht.py +++ /dev/null @@ -1,24 +0,0 @@ - -from twisted.web2 import server, http, http_headers - -from apt_dht_conf import config -from HTTPServer import TopLevel - -class AptDHT: - def __init__(self): - self.http_server = TopLevel(config.defaults()['cache_dir'], self) - self.http_site = server.Site(self.http_server) - - def getSite(self): - return self.http_site - - def check_freshness(self, path, modtime, resp): - return resp - - def get_resp(self, path): - return http.Response( - 200, - {'content-type': http_headers.MimeType('text', 'html')}, - """ -

Statistics

-

TODO: eventually this will cause a P2P lookup.""") diff --git a/apt_dht/AptPackages.py b/apt_dht/AptPackages.py new file mode 100644 index 0000000..a7a2a54 --- /dev/null +++ b/apt_dht/AptPackages.py @@ -0,0 +1,507 @@ +# Disable the FutureWarning from the apt module +import warnings +warnings.simplefilter("ignore", FutureWarning) + +import os, shelve +from random import choice +from shutil import rmtree +from copy import deepcopy +from UserDict import DictMixin + +from twisted.internet import threads, defer +from twisted.python import log +from twisted.trial import unittest + +import apt_pkg, apt_inst +from apt import OpProgress + +apt_pkg.init() + +class PackageFileList(DictMixin): + """Manages a list of package files belonging to a backend. + + @type packages: C{shelve dictionary} + @ivar packages: the files stored for this backend + """ + + def __init__(self, cache_dir): + self.cache_dir = cache_dir + if not os.path.exists(self.cache_dir): + os.makedirs(self.cache_dir) + self.packages = None + self.open() + + def open(self): + """Open the persistent dictionary of files in this backend.""" + if self.packages is None: + self.packages = shelve.open(self.cache_dir+'/packages.db') + + def close(self): + """Close the persistent dictionary.""" + if self.packages is not None: + self.packages.close() + + def update_file(self, cache_path, file_path): + """Check if an updated file needs to be tracked. + + Called from the mirror manager when files get updated so we can update our + fake lists and sources.list. + """ + filename = cache_path.split('/')[-1] + if filename=="Packages" or filename=="Release" or filename=="Sources": + log.msg("Registering package file: "+cache_path) + self.packages[cache_path] = file_path + return True + return False + + def check_files(self): + """Check all files in the database to make sure they exist.""" + files = self.packages.keys() + for f in files: + if not os.path.exists(self.packages[f]): + log.msg("File in packages database has been deleted: "+f) + del self.packages[f] + + # Standard dictionary implementation so this class can be used like a dictionary. + def __getitem__(self, key): return self.packages[key] + def __setitem__(self, key, item): self.packages[key] = item + def __delitem__(self, key): del self.packages[key] + def keys(self): return self.packages.keys() + +class AptPackages: + """Uses python-apt to answer queries about packages. + + Makes a fake configuration for python-apt for each backend. + """ + + DEFAULT_APT_CONFIG = { + #'APT' : '', + #'APT::Architecture' : 'i386', # Commented so the machine's config will set this + #'APT::Default-Release' : 'unstable', + 'Dir':'.', # / + 'Dir::State' : 'apt/', # var/lib/apt/ + 'Dir::State::Lists': 'lists/', # lists/ + #'Dir::State::cdroms' : 'cdroms.list', + 'Dir::State::userstatus' : 'status.user', + 'Dir::State::status': 'dpkg/status', # '/var/lib/dpkg/status' + 'Dir::Cache' : '.apt/cache/', # var/cache/apt/ + #'Dir::Cache::archives' : 'archives/', + 'Dir::Cache::srcpkgcache' : 'srcpkgcache.bin', + 'Dir::Cache::pkgcache' : 'pkgcache.bin', + 'Dir::Etc' : 'apt/etc/', # etc/apt/ + 'Dir::Etc::sourcelist' : 'sources.list', + 'Dir::Etc::vendorlist' : 'vendors.list', + 'Dir::Etc::vendorparts' : 'vendors.list.d', + #'Dir::Etc::main' : 'apt.conf', + #'Dir::Etc::parts' : 'apt.conf.d', + #'Dir::Etc::preferences' : 'preferences', + 'Dir::Bin' : '', + #'Dir::Bin::methods' : '', #'/usr/lib/apt/methods' + 'Dir::Bin::dpkg' : '/usr/bin/dpkg', + #'DPkg' : '', + #'DPkg::Pre-Install-Pkgs' : '', + #'DPkg::Tools' : '', + #'DPkg::Tools::Options' : '', + #'DPkg::Tools::Options::/usr/bin/apt-listchanges' : '', + #'DPkg::Tools::Options::/usr/bin/apt-listchanges::Version' : '2', + #'DPkg::Post-Invoke' : '', + } + essential_dirs = ('apt', 'apt/cache', 'apt/dpkg', 'apt/etc', 'apt/lists', + 'apt/lists/partial') + essential_files = ('apt/dpkg/status', 'apt/etc/sources.list',) + + def __init__(self, cache_dir): + """Construct a new packages manager. + + @ivar backendName: name of backend associated with this packages file + @ivar cache_dir: cache directory from config file + """ + self.cache_dir = cache_dir + self.apt_config = deepcopy(self.DEFAULT_APT_CONFIG) + + for dir in self.essential_dirs: + path = os.path.join(self.cache_dir, dir) + if not os.path.exists(path): + os.makedirs(path) + for file in self.essential_files: + path = os.path.join(self.cache_dir, file) + if not os.path.exists(path): + f = open(path,'w') + f.close() + del f + + self.apt_config['Dir'] = self.cache_dir + self.apt_config['Dir::State::status'] = os.path.join(self.cache_dir, + self.apt_config['Dir::State'], self.apt_config['Dir::State::status']) + self.packages = PackageFileList(cache_dir) + self.loaded = 0 + self.loading = None + + def __del__(self): + self.cleanup() + self.packages.close() + + def addRelease(self, cache_path, file_path): + """Dirty hack until python-apt supports apt-pkg/indexrecords.h + (see Bug #456141) + """ + self.indexrecords[cache_path] = {} + + read_packages = False + f = open(file_path, 'r') + + for line in f: + line = line.rstrip() + + if line[:1] != " ": + read_packages = False + try: + # Read the various headers from the file + h, v = line.split(":", 1) + if h == "MD5Sum" or h == "SHA1" or h == "SHA256": + read_packages = True + hash_type = h + except: + # Bad header line, just ignore it + log.msg("WARNING: Ignoring badly formatted Release line: %s" % line) + + # Skip to the next line + continue + + # Read file names from the multiple hash sections of the file + if read_packages: + p = line.split() + self.indexrecords[cache_path].setdefault(p[2], {})[hash_type] = (p[0], p[1]) + + f.close() + + def file_updated(self, cache_path, file_path): + """A file in the backend has changed, manage it. + + If this affects us, unload our apt database + """ + if self.packages.update_file(cache_path, file_path): + self.unload() + + def load(self): + """Make sure the package is initialized and loaded.""" + if self.loading is None: + self.loading = threads.deferToThread(self._load) + self.loading.addCallback(self.doneLoading) + return self.loading + + def doneLoading(self, loadResult): + """Cache is loaded.""" + self.loading = None + # Must pass on the result for the next callback + return loadResult + + def _load(self): + """Regenerates the fake configuration and load the packages cache.""" + if self.loaded: return True + apt_pkg.InitSystem() + rmtree(os.path.join(self.cache_dir, self.apt_config['Dir::State'], + self.apt_config['Dir::State::Lists'])) + os.makedirs(os.path.join(self.cache_dir, self.apt_config['Dir::State'], + self.apt_config['Dir::State::Lists'], 'partial')) + sources_filename = os.path.join(self.cache_dir, self.apt_config['Dir::Etc'], + self.apt_config['Dir::Etc::sourcelist']) + sources = open(sources_filename, 'w') + sources_count = 0 + self.packages.check_files() + self.indexrecords = {} + for f in self.packages: + # we should probably clear old entries from self.packages and + # take into account the recorded mtime as optimization + filepath = self.packages[f] + if f.split('/')[-1] == "Release": + self.addRelease(f, filepath) + fake_uri='http://apt-dht'+f + fake_dirname = '/'.join(fake_uri.split('/')[:-1]) + if f.endswith('Sources'): + source_line='deb-src '+fake_dirname+'/ /' + else: + source_line='deb '+fake_dirname+'/ /' + listpath=(os.path.join(self.cache_dir, self.apt_config['Dir::State'], + self.apt_config['Dir::State::Lists'], + apt_pkg.URItoFileName(fake_uri))) + sources.write(source_line+'\n') + log.msg("Sources line: " + source_line) + sources_count = sources_count + 1 + + try: + #we should empty the directory instead + os.unlink(listpath) + except: + pass + os.symlink(filepath, listpath) + sources.close() + + if sources_count == 0: + log.msg("No Packages files available for %s backend"%(self.cache_dir)) + return False + + log.msg("Loading Packages database for "+self.cache_dir) + for key, value in self.apt_config.items(): + apt_pkg.Config[key] = value + + self.cache = apt_pkg.GetCache(OpProgress()) + self.records = apt_pkg.GetPkgRecords(self.cache) + self.srcrecords = apt_pkg.GetPkgSrcRecords() + + self.loaded = 1 + return True + + def unload(self): + """Tries to make the packages server quit.""" + if self.loaded: + del self.cache + del self.records + del self.srcrecords + del self.indexrecords + self.loaded = 0 + + def cleanup(self): + """Cleanup and close any loaded caches.""" + self.unload() + self.packages.close() + + def findHash(self, path): + """Find the hash for a given path in this mirror. + + Returns a deferred so it can make sure the cache is loaded first. + """ + d = defer.Deferred() + + deferLoad = self.load() + deferLoad.addCallback(self._findHash, path, d) + + return d + + def _findHash(self, loadResult, path, d): + """Really find the hash for a path. + + Have to pass the returned loadResult on in case other calls to this + function are pending. + """ + if not loadResult: + d.callback((None, None)) + return loadResult + + # First look for the path in the cache of index files + for release in self.indexrecords: + if path.startswith(release[:-7]): + for indexFile in self.indexrecords[release]: + if release[:-7] + indexFile == path: + d.callback(self.indexrecords[release][indexFile]['SHA1']) + return loadResult + + package = path.split('/')[-1].split('_')[0] + + # Check the binary packages + try: + for version in self.cache[package].VersionList: + size = version.Size + for verFile in version.FileList: + if self.records.Lookup(verFile): + if '/' + self.records.FileName == path: + d.callback((self.records.SHA1Hash, size)) + return loadResult + except KeyError: + pass + + # Check the source packages' files + self.srcrecords.Restart() + if self.srcrecords.Lookup(package): + for f in self.srcrecords.Files: + if path == '/' + f[2]: + d.callback((f[0], f[1])) + return loadResult + + d.callback((None, None)) + return loadResult + +class TestAptPackages(unittest.TestCase): + """Unit tests for the AptPackages cache.""" + + pending_calls = [] + client = None + packagesFile = '' + sourcesFile = '' + releaseFile = '' + + def setUp(self): + self.client = AptPackages('/tmp/.apt-dht') + + self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n') + self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n') + for f in os.walk('/var/lib/apt/lists').next()[2]: + if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]): + self.releaseFile = f + break + + self.client.file_updated(self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.releaseFile) + self.client.file_updated(self.packagesFile[self.packagesFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.packagesFile) + self.client.file_updated(self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.sourcesFile) + + def test_pkg_hash(self): + self.client._load() + + self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0]) + + pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^SHA1:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + + self.failUnless(self.client.records.SHA1Hash == pkg_hash, + "Hashes don't match: %s != %s" % (self.client.records.SHA1Hash, pkg_hash)) + + def test_src_hash(self): + self.client._load() + + self.client.srcrecords.Lookup('dpkg') + + src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 2').read().split('\n')[:-1] + + for f in self.client.srcrecords.Files: + self.failUnless(f[0] in src_hashes, "Couldn't find %s in: %r" % (f[0], src_hashes)) + + def test_index_hash(self): + self.client._load() + + indexhash = self.client.indexrecords[self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')]['main/binary-i386/Packages.bz2']['SHA1'][0] + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/binary-i386/Packages.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + + self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash)) + + def verifyHash(self, found_hash, path, true_hash): + self.failUnless(found_hash[0] == true_hash, + "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash)) + + def test_findIndexHash(self): + lastDefer = defer.Deferred() + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/binary-i386/Packages.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' + + d = self.client.findHash(idx_path) + d.addCallback(self.verifyHash, idx_path, idx_hash) + + d.addCallback(lastDefer.callback) + return lastDefer + + def test_findPkgHash(self): + lastDefer = defer.Deferred() + + pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^SHA1:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^Filename:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + + d = self.client.findHash(pkg_path) + d.addCallback(self.verifyHash, pkg_path, pkg_hash) + + d.addCallback(lastDefer.callback) + return lastDefer + + def test_findSrcHash(self): + lastDefer = defer.Deferred() + + src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -E "^Directory:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 2').read().split('\n')[:-1] + src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 4').read().split('\n')[:-1] + + i = choice(range(len(src_hashes))) + d = self.client.findHash(src_dir + '/' + src_paths[i]) + d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i]) + + d.addCallback(lastDefer.callback) + return lastDefer + + def test_multipleFindHash(self): + lastDefer = defer.Deferred() + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/binary-i386/Packages.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' + + d = self.client.findHash(idx_path) + d.addCallback(self.verifyHash, idx_path, idx_hash) + + pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^SHA1:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^Filename:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + + d = self.client.findHash(pkg_path) + d.addCallback(self.verifyHash, pkg_path, pkg_hash) + + src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -E "^Directory:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 2').read().split('\n')[:-1] + src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 4').read().split('\n')[:-1] + + for i in range(len(src_hashes)): + d = self.client.findHash(src_dir + '/' + src_paths[i]) + d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i]) + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/source/Sources.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/source/Sources.bz2' + + d = self.client.findHash(idx_path) + d.addCallback(self.verifyHash, idx_path, idx_hash) + + d.addCallback(lastDefer.callback) + return lastDefer + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.pending_calls = [] + self.client.cleanup() + self.client = None diff --git a/apt_dht/HTTPDownloader.py b/apt_dht/HTTPDownloader.py new file mode 100644 index 0000000..7e5a06c --- /dev/null +++ b/apt_dht/HTTPDownloader.py @@ -0,0 +1,226 @@ + +from twisted.internet import reactor, defer, protocol +from twisted.internet.protocol import ClientFactory +from twisted import version as twisted_version +from twisted.web2.client.interfaces import IHTTPClientManager +from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol +from twisted.web2 import stream as stream_mod, http_headers +from twisted.web2 import version as web2_version +from twisted.trial import unittest +from zope.interface import implements + +from apt_dht_conf import version + +class HTTPClientManager(ClientFactory): + """A manager for all HTTP requests to a single site. + + Controls all requests that got to a single site (host and port). + This includes buffering requests until they can be sent and reconnecting + in the even of the connection being closed. + + """ + + implements(IHTTPClientManager) + + def __init__(self, host, port=80): + self.host = host + self.port = port + self.busy = False + self.pipeline = False + self.closed = True + self.connecting = False + self.request_queue = [] + self.response_queue = [] + self.proto = None + self.connector = None + + def connect(self): + assert(self.closed and not self.connecting) + self.connecting = True + d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) + d.addCallback(self.connected) + + def connected(self, proto): + self.closed = False + self.connecting = False + self.proto = proto + self.processQueue() + + def close(self): + if not self.closed: + self.proto.transport.loseConnection() + + def is_idle(self): + return not self.busy and not self.request_queue and not self.response_queue + + def submitRequest(self, request): + request.deferRequest = defer.Deferred() + self.request_queue.append(request) + self.processQueue() + return request.deferRequest + + def processQueue(self): + if not self.request_queue: + return + if self.connecting: + return + if self.closed: + self.connect() + return + if self.busy and not self.pipeline: + return + if self.response_queue and not self.pipeline: + return + + req = self.request_queue.pop(0) + self.response_queue.append(req) + req.deferResponse = self.proto.submitRequest(req, False) + req.deferResponse.addCallback(self.requestComplete) + req.deferResponse.addErrback(self.requestError) + + def requestComplete(self, resp): + req = self.response_queue.pop(0) + req.deferRequest.callback(resp) + + def requestError(self, error): + req = self.response_queue.pop(0) + req.deferRequest.errback(error) + + def clientBusy(self, proto): + self.busy = True + + def clientIdle(self, proto): + self.busy = False + self.processQueue() + + def clientPipelining(self, proto): + self.pipeline = True + self.processQueue() + + def clientGone(self, proto): + for req in self.response_queue: + req.deferRequest.errback(ProtocolError('lost connection')) + self.busy = False + self.pipeline = False + self.closed = True + self.connecting = False + self.response_queue = [] + self.proto = None + if self.request_queue: + self.processQueue() + + def setCommonHeaders(self): + headers = http_headers.Headers() + headers.setHeader('Host', self.host) + headers.setHeader('User-Agent', 'apt-dht/%s (twisted/%s twisted.web2/%s)' % + (version.short(), twisted_version.short(), web2_version.short())) + return headers + + def get(self, path, method="GET"): + headers = self.setCommonHeaders() + return self.submitRequest(ClientRequest(method, path, headers, None)) + + def getRange(self, path, rangeStart, rangeEnd, method="GET"): + headers = self.setCommonHeaders() + headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) + return self.submitRequest(ClientRequest(method, path, headers, None)) + +class TestClientManager(unittest.TestCase): + """Unit tests for the HTTPClientManager.""" + + client = None + pending_calls = [] + + def gotResp(self, resp, num, expect): + self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) + if expect is not None: + self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) + def print_(n): + pass + def printdone(n): + pass + stream_mod.readStream(resp.stream, print_).addCallback(printdone) + + def test_download(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.get('/robots.txt') + d.addCallback(self.gotResp, 1, 309) + return d + + def test_head(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.get('/robots.txt', "HEAD") + d.addCallback(self.gotResp, 1, 0) + return d + + def test_multiple_downloads(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 120 + lastDefer = defer.Deferred() + + def newRequest(path, num, expect, last=False): + d = self.client.get(path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest("/", 1, 3433) + newRequest("/blog/", 2, 37121) + newRequest("/camrdale.html", 3, 2234) + self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309)) + self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True)) + return lastDefer + + def test_multiple_quick_downloads(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 30 + lastDefer = defer.Deferred() + + def newRequest(path, num, expect, last=False): + d = self.client.get(path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest("/", 1, 3433) + newRequest("/blog/", 2, 37121) + newRequest("/camrdale.html", 3, 2234) + self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True)) + return lastDefer + + def test_range(self): + host = 'www.camrdale.org' + self.client = HTTPClientManager(host, 80) + self.timeout = 10 + + d = self.client.getRange('/robots.txt', 100, 199) + d.addCallback(self.gotResp, 1, 100) + return d + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.pending_calls = [] + if self.client: + self.client.close() + self.client = None diff --git a/apt_dht/HTTPServer.py b/apt_dht/HTTPServer.py new file mode 100644 index 0000000..688b296 --- /dev/null +++ b/apt_dht/HTTPServer.py @@ -0,0 +1,80 @@ +import os.path, time + +from twisted.web2 import server, http, resource, channel +from twisted.web2 import static, http_headers, responsecode + +class FileDownloader(static.File): + + def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None): + self.manager = manager + super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames) + + def render(self, req): + resp = super(FileDownloader, self).render(req) + + if self.manager: + if resp != responsecode.NOT_FOUND: + return self.manager.check_freshness(req.uri, resp.headers.getHeader('Last-Modified'), resp) + + return self.manager.get_resp(req.uri) + + return resp + + def createSimilarFile(self, path): + return self.__class__(path, self.manager, self.defaultType, self.ignoredExts, + self.processors, self.indexNames[:]) +class TopLevel(resource.Resource): + addSlash = True + + def __init__(self, directory, manager): + self.directory = directory + self.manager = manager + self.subdirs = [] + + def addDirectory(self, directory): + path = "~" + str(len(self.subdirs)) + self.subdirs.append(directory) + return path + + def removeDirectory(self, directory): + loc = self.subdirs.index(directory) + self.subdirs[loc] = '' + + def render(self, ctx): + return http.Response( + 200, + {'content-type': http_headers.MimeType('text', 'html')}, + """ +

Statistics

+

TODO: eventually some stats will be shown here.""") + + def locateChild(self, request, segments): + name = segments[0] + if len(name) > 1 and name[0] == '~': + try: + loc = int(name[1:]) + except: + return None, () + + if loc >= 0 and loc < len(self.subdirs) and self.subdirs[loc]: + return static.File(self.subdirs[loc]), segments[1:] + else: + return None, () + +# if len(name) > 1: + return FileDownloader(self.directory, self.manager), segments[0:] +# else: +# return self, () + +if __name__ == '__builtin__': + # Running from twistd -y + t = TopLevel('/home', None) + t.addDirectory('/tmp') + t.addDirectory('/var/log') + site = server.Site(t) + + # Standard twisted application Boilerplate + from twisted.application import service, strports + application = service.Application("demoserver") + s = strports.service('tcp:18080', channel.HTTPFactory(site)) + s.setServiceParent(application) diff --git a/apt_dht/MirrorManager.py b/apt_dht/MirrorManager.py new file mode 100644 index 0000000..6795cc0 --- /dev/null +++ b/apt_dht/MirrorManager.py @@ -0,0 +1,174 @@ + +import os + +from twisted.python import log +from twisted.internet import defer +from twisted.trial import unittest + +from AptPackages import AptPackages + +aptpkg_dir='.apt-dht' + +class MirrorManager: + """Manages all requests for mirror objects.""" + + def __init__(self, cache_dir): + self.cache_dir = cache_dir + self.apt_caches = {} + + def extractPath(self, path): + site, path = path.split('/',1) + if not site: + site, path = path.split('/',1) + path = '/'+path + + # Make sure a port is included for consistency + if site.find(':') < 0: + site = site + ":80" + + i = max(path.rfind('/dists/'), path.rfind('/pool/')) + if i >= 0: + baseDir = path[:i] + path = path[i:] + else: + # Uh oh, this is not good + log.msg("Couldn't find a good base directory for path: %s" % (site + path)) + baseDir = '' + if site in self.apt_caches: + longest_match = 0 + for base in self.apt_caches[site]: + base_match = '' + for dirs in path.split('/'): + if base.startswith(base_match + '/' + dirs): + base_match += '/' + dirs + else: + break + if len(base_match) > longest_match: + longest_match = len(base_match) + baseDir = base_match + log.msg("Settled on baseDir: %s" % baseDir) + + return site, baseDir, path + + def init(self, site, baseDir): + if site not in self.apt_caches: + self.apt_caches[site] = {} + + if baseDir not in self.apt_caches[site]: + site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_')) + self.apt_caches[site][baseDir] = AptPackages(site_cache) + + def updatedFile(self, path, file_path): + site, baseDir, path = self.extractPath(path) + self.init(site, baseDir) + self.apt_caches[site][baseDir].file_updated(path, file_path) + + def findHash(self, path): + site, baseDir, path = self.extractPath(path) + if site in self.apt_caches and baseDir in self.apt_caches[site]: + return self.apt_caches[site][baseDir].findHash(path) + d = defer.Deferred() + d.errback("Not Found") + return d + +class TestMirrorManager(unittest.TestCase): + """Unit tests for the mirror manager.""" + + pending_calls = [] + client = None + + def setUp(self): + self.client = MirrorManager('/tmp') + + def test_extractPath(self): + site, baseDir, path = self.client.extractPath('/ftp.us.debian.org/debian/dists/unstable/Release') + self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site) + self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) + self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path) + + site, baseDir, path = self.client.extractPath('/ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz') + self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site) + self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) + self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path) + + def verifyHash(self, found_hash, path, true_hash): + self.failUnless(found_hash[0] == true_hash, + "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash)) + + def test_findHash(self): + self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n') + self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n') + for f in os.walk('/var/lib/apt/lists').next()[2]: + if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]): + self.releaseFile = f + break + + self.client.updatedFile('/' + self.releaseFile.replace('_','/'), + '/var/lib/apt/lists/' + self.releaseFile) + self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + + self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.packagesFile) + self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + + self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.sourcesFile) + + lastDefer = defer.Deferred() + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/binary-i386/Packages.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' + + d = self.client.findHash(idx_path) + d.addCallback(self.verifyHash, idx_path, idx_hash) + + pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^SHA1:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + pkg_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \ + os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^Filename:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + + d = self.client.findHash(pkg_path) + d.addCallback(self.verifyHash, pkg_path, pkg_hash) + + src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -E "^Directory:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 2').read().split('\n')[:-1] + src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 4').read().split('\n')[:-1] + + for i in range(len(src_hashes)): + src_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i] + d = self.client.findHash(src_path) + d.addCallback(self.verifyHash, src_path, src_hashes[i]) + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/source/Sources.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2' + + d = self.client.findHash(idx_path) + d.addCallback(self.verifyHash, idx_path, idx_hash) + + d.addCallback(lastDefer.callback) + return lastDefer + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.client = None + \ No newline at end of file diff --git a/apt_dht/PeerManager.py b/apt_dht/PeerManager.py new file mode 100644 index 0000000..e372f69 --- /dev/null +++ b/apt_dht/PeerManager.py @@ -0,0 +1,100 @@ + +from random import choice + +from twisted.internet import reactor, defer +from twisted.trial import unittest +from twisted.web2 import stream as stream_mod + +from HTTPDownloader import HTTPClientManager + +class PeerManager: + def __init__(self): + self.clients = {} + + def get(self, location_list, method="GET"): + """Download from a list of peers. + + @type location_list: C{list} of (C{string}, C{int}, C{string}) + @var location_list: a list of the locations where the file can be found + """ + host, port, path = choice(location_list) + return self.getPeer(host, port, path, method) + + def getPeer(self, host, port, path, method="GET"): + if not port: + port = 80 + site = host + ":" + str(port) + if site not in self.clients: + self.clients[site] = HTTPClientManager(host, port) + return self.clients[site].get(path, method) + + def close(self): + for site in self.clients: + self.clients[site].close() + self.clients = {} + +class TestPeerManager(unittest.TestCase): + """Unit tests for the PeerManager.""" + + manager = None + pending_calls = [] + + def gotResp(self, resp, num, expect): + self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code) + if expect is not None: + self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect)) + def print_(n): + pass + def printdone(n): + pass + stream_mod.readStream(resp.stream, print_).addCallback(printdone) + + def test_download(self): + self.manager = PeerManager() + self.timeout = 10 + + host = 'www.camrdale.org' + d = self.manager.get([(host, 80, '/robots.txt')]) + d.addCallback(self.gotResp, 1, 309) + return d + + def test_head(self): + self.manager = PeerManager() + self.timeout = 10 + + host = 'www.camrdale.org' + d = self.manager.get([(host, 80, '/robots.txt')], "HEAD") + d.addCallback(self.gotResp, 1, 0) + return d + + def test_multiple_downloads(self): + self.manager = PeerManager() + self.timeout = 120 + lastDefer = defer.Deferred() + + def newRequest(host, path, num, expect, last=False): + d = self.manager.get([(host, 80, path)]) + d.addCallback(self.gotResp, num, expect) + if last: + d.addCallback(lastDefer.callback) + + newRequest('www.camrdale.org', "/", 1, 3433) + newRequest('www.camrdale.org', "/blog/", 2, 37121) + newRequest('www.google.ca', "/", 3, None) + self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None)) + self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084)) + self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None)) + self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525)) + self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381)) + self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True)) + return lastDefer + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.pending_calls = [] + if self.manager: + self.manager.close() + self.manager = None diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py new file mode 100644 index 0000000..d30f0b8 --- /dev/null +++ b/apt_dht/apt_dht.py @@ -0,0 +1,24 @@ + +from twisted.web2 import server, http, http_headers + +from apt_dht_conf import config +from HTTPServer import TopLevel + +class AptDHT: + def __init__(self): + self.http_server = TopLevel(config.defaults()['cache_dir'], self) + self.http_site = server.Site(self.http_server) + + def getSite(self): + return self.http_site + + def check_freshness(self, path, modtime, resp): + return resp + + def get_resp(self, path): + return http.Response( + 200, + {'content-type': http_headers.MimeType('text', 'html')}, + """ +

Statistics

+

TODO: eventually this will cause a P2P lookup.""") diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py new file mode 100644 index 0000000..32ffa06 --- /dev/null +++ b/apt_dht/apt_dht_conf.py @@ -0,0 +1,105 @@ + +import os, sys +from ConfigParser import SafeConfigParser + +from twisted.python import log, versions + +class ConfigError(Exception): + def __init__(self, message): + self.message = message + def __str__(self): + return repr(self.message) + +version = versions.Version('apt-dht', 0, 0, 0) +home = os.path.expandvars('${HOME}') +if home == '${HOME}' or not os.path.isdir(home): + home = os.path.expanduser('~') + if not os.path.isdir(home): + home = os.path.abspath(os.path.dirname(sys.argv[0])) + +DEFAULTS = { + + # Port to listen on for all requests (TCP and UDP) + 'port': '9977', + + # Directory to store the downloaded files in + 'cache_dir': home + '/.apt-dht/cache', + + # User name to try and run as + 'username': '', + + # Which DHT implementation to use + 'DHT': 'Khashmir', +} + +DHT_DEFAULTS = { + # magic id to use before we know a peer's id + 'NULL_ID': 20 * '\0', + + # Kademlia "K" constant, this should be an even number + 'K': '8', + + # SHA1 is 160 bits long + 'HASH_LENGTH': '160', + + # checkpoint every this many seconds + 'CHECKPOINT_INTERVAL': '15m', # fifteen minutes + + ### SEARCHING/STORING + # concurrent xmlrpc calls per find node/value request! + 'CONCURRENT_REQS': '4', + + # how many hosts to post to + 'STORE_REDUNDANCY': '3', + + ### ROUTING TABLE STUFF + # how many times in a row a node can fail to respond before it's booted from the routing table + 'MAX_FAILURES': '3', + + # never ping a node more often than this + 'MIN_PING_INTERVAL': '15m', # fifteen minutes + + # refresh buckets that haven't been touched in this long + 'BUCKET_STALENESS': '1h', # one hour + + ### KEY EXPIRER + # time before expirer starts running + 'KEINITIAL_DELAY': '15s', # 15 seconds - to clean out old stuff in persistent db + + # time between expirer runs + 'KE_DELAY': '20m', # 20 minutes + + # expire entries older than this + 'KE_AGE': '1h', # 60 minutes +} + +class AptDHTConfigParser(SafeConfigParser): + """ + Adds 'gettime' to ConfigParser to interpret the suffixes. + """ + time_multipliers={ + 's': 1, #seconds + 'm': 60, #minutes + 'h': 3600, #hours + 'd': 86400,#days + } + + def gettime(self, section, option): + mult = 1 + value = self.get(section, option) + if len(value) == 0: + raise ConfigError("Configuration parse error: [%s] %s" % (section, option)) + suffix = value[-1].lower() + if suffix in self.time_multipliers.keys(): + mult = self.time_multipliers[suffix] + value = value[:-1] + return int(value)*mult + def getstring(self, section, option): + return self.get(section,option) + def getstringlist(self, section, option): + return self.get(section,option).split() + +config = AptDHTConfigParser(DEFAULTS) +config.add_section(config.get('DEFAULT', 'DHT')) +for k in DHT_DEFAULTS: + config.set(config.get('DEFAULT', 'DHT'), k, DHT_DEFAULTS[k]) diff --git a/apt_dht_Khashmir/__init__.py b/apt_dht_Khashmir/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py new file mode 100644 index 0000000..013a9a7 --- /dev/null +++ b/apt_dht_Khashmir/actions.py @@ -0,0 +1,272 @@ +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from time import time + +from twisted.internet import reactor + +import const +from khash import intify + +class ActionBase: + """ base class for some long running asynchronous proccesses like finding nodes or values """ + def __init__(self, table, target, callback): + self.table = table + self.target = target + self.num = intify(target) + self.found = {} + self.queried = {} + self.answered = {} + self.callback = callback + self.outstanding = 0 + self.finished = 0 + + def sort(a, b, num=self.num): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = num ^ a.num, num ^ b.num + if x > y: + return 1 + elif x < y: + return -1 + return 0 + self.sort = sort + + def goWithNodes(self, t): + pass + + + +FIND_NODE_TIMEOUT = 15 + +class FindNode(ActionBase): + """ find node action merits it's own class as it is a long running stateful process """ + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + l = dict["nodes"] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + for node in l: + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + self.schedule() + + def schedule(self): + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + for node in l[:const.K]: + if node.id == self.target: + self.finished=1 + return self.callback([node]) + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.id) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + reactor.callLater(0, self.callback, l[:const.K]) + + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + print ">>> find failed %s/%s" % (node.host, node.port), err + self.table.table.nodeFailed(node) + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes + + def goWithNodes(self, nodes): + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + + +get_value_timeout = 15 +class GetValue(FindNode): + def __init__(self, table, target, callback, find="findValue"): + FindNode.__init__(self, table, target, callback) + self.findValue = find + + """ get value task """ + def handleGotNodes(self, dict): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + # go through nodes + # if we have any closer than what we already got, query them + if dict.has_key('nodes'): + for node in dict['nodes']: + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + elif dict.has_key('values'): + def x(y, z=self.results): + if not z.has_key(y): + z[y] = 1 + return y + else: + return None + z = len(dict['values']) + v = filter(None, map(x, dict['values'])) + if(len(v)): + reactor.callLater(0, self.callback, v) + self.schedule() + + ## get value + def schedule(self): + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + + for node in l[:const.K]: + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + try: + f = getattr(node, self.findValue) + except AttributeError: + print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + else: + df = f(self.target, self.table.node.id) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done, didn't find it!! + self.finished=1 + reactor.callLater(0, self.callback,[]) + + ## get value + def goWithNodes(self, nodes, found=None): + self.results = {} + if found: + for n in found: + self.results[n] = 1 + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + + +class StoreValue(ActionBase): + def __init__(self, table, target, value, callback, store="storeValue"): + ActionBase.__init__(self, table, target, callback) + self.value = value + self.stored = [] + self.store = store + + def storedValue(self, t, node): + self.outstanding -= 1 + self.table.insertNode(node) + if self.finished: + return + self.stored.append(t) + if len(self.stored) >= const.STORE_REDUNDANCY: + self.finished=1 + self.callback(self.stored) + else: + if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + self.schedule() + return t + + def storeFailed(self, t, node): + print ">>> store failed %s/%s" % (node.host, node.port) + self.table.nodeFailed(node) + self.outstanding -= 1 + if self.finished: + return t + self.schedule() + return t + + def schedule(self): + if self.finished: + return + num = const.CONCURRENT_REQS - self.outstanding + if num > const.STORE_REDUNDANCY: + num = const.STORE_REDUNDANCY + for i in range(num): + try: + node = self.nodes.pop() + except IndexError: + if self.outstanding == 0: + self.finished = 1 + self.callback(self.stored) + else: + if not node.id == self.table.node.id: + self.outstanding += 1 + try: + f = getattr(node, self.store) + except AttributeError: + print ">>> %s doesn't have a %s method!" % (node, self.store) + else: + df = f(self.target, self.value, self.table.node.id) + df.addCallback(self.storedValue, node=node) + df.addErrback(self.storeFailed, node=node) + + def goWithNodes(self, nodes): + self.nodes = nodes + self.nodes.sort(self.sort) + self.schedule() + + +class KeyExpirer: + def __init__(self, store): + self.store = store + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + + def doExpire(self): + self.cut = "%0.6f" % (time() - const.KE_AGE) + self._expire() + + def _expire(self): + c = self.store.cursor() + s = "delete from kv where time < '%s';" % self.cut + c.execute(s) + reactor.callLater(const.KE_DELAY, self.doExpire) diff --git a/apt_dht_Khashmir/bencode.py b/apt_dht_Khashmir/bencode.py new file mode 100644 index 0000000..b3f9a68 --- /dev/null +++ b/apt_dht_Khashmir/bencode.py @@ -0,0 +1,254 @@ +# Written by Petru Paler +# see LICENSE.txt for license information + +from types import IntType, LongType, StringType, ListType, TupleType, DictType +from re import compile +from cStringIO import StringIO + +int_filter = compile('(0|-?[1-9][0-9]*)e') + +def decode_int(x, f): + m = int_filter.match(x, f) + if m is None: + raise ValueError + return (long(m.group(1)), m.end()) + +string_filter = compile('(0|[1-9][0-9]*):') + +def decode_string(x, f): + m = string_filter.match(x, f) + if m is None: + raise ValueError + l = int(m.group(1)) + s = m.end() + return (x[s:s+l], s + l) + +def decode_list(x, f): + r = [] + while x[f] != 'e': + v, f = bdecode_rec(x, f) + r.append(v) + return (r, f + 1) + +def decode_dict(x, f): + r = {} + lastkey = None + while x[f] != 'e': + k, f = decode_string(x, f) + if lastkey is not None and lastkey >= k: + raise ValueError + lastkey = k + v, f = bdecode_rec(x, f) + r[k] = v + return (r, f + 1) + +def bdecode_rec(x, f): + t = x[f] + if t == 'i': + return decode_int(x, f + 1) + elif t == 'l': + return decode_list(x, f + 1) + elif t == 'd': + return decode_dict(x, f + 1) + else: + return decode_string(x, f) + +def bdecode(x): + try: + r, l = bdecode_rec(x, 0) + except IndexError: + raise ValueError + if l != len(x): + raise ValueError + return r + +def test_bdecode(): + try: + bdecode('0:0:') + assert 0 + except ValueError: + pass + try: + bdecode('ie') + assert 0 + except ValueError: + pass + try: + bdecode('i341foo382e') + assert 0 + except ValueError: + pass + assert bdecode('i4e') == 4L + assert bdecode('i0e') == 0L + assert bdecode('i123456789e') == 123456789L + assert bdecode('i-10e') == -10L + try: + bdecode('i-0e') + assert 0 + except ValueError: + pass + try: + bdecode('i123') + assert 0 + except ValueError: + pass + try: + bdecode('') + assert 0 + except ValueError: + pass + try: + bdecode('i6easd') + assert 0 + except ValueError: + pass + try: + bdecode('35208734823ljdahflajhdf') + assert 0 + except ValueError: + pass + try: + bdecode('2:abfdjslhfld') + assert 0 + except ValueError: + pass + assert bdecode('0:') == '' + assert bdecode('3:abc') == 'abc' + assert bdecode('10:1234567890') == '1234567890' + try: + bdecode('02:xy') + assert 0 + except ValueError: + pass + try: + bdecode('l') + assert 0 + except ValueError: + pass + assert bdecode('le') == [] + try: + bdecode('leanfdldjfh') + assert 0 + except ValueError: + pass + assert bdecode('l0:0:0:e') == ['', '', ''] + try: + bdecode('relwjhrlewjh') + assert 0 + except ValueError: + pass + assert bdecode('li1ei2ei3ee') == [1, 2, 3] + assert bdecode('l3:asd2:xye') == ['asd', 'xy'] + assert bdecode('ll5:Alice3:Bobeli2ei3eee') == [['Alice', 'Bob'], [2, 3]] + try: + bdecode('d') + assert 0 + except ValueError: + pass + try: + bdecode('defoobar') + assert 0 + except ValueError: + pass + assert bdecode('de') == {} + assert bdecode('d3:agei25e4:eyes4:bluee') == {'age': 25, 'eyes': 'blue'} + assert bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee') == {'spam.mp3': {'author': 'Alice', 'length': 100000}} + try: + bdecode('d3:fooe') + assert 0 + except ValueError: + pass + try: + bdecode('di1e0:e') + assert 0 + except ValueError: + pass + try: + bdecode('d1:b0:1:a0:e') + assert 0 + except ValueError: + pass + try: + bdecode('d1:a0:1:a0:e') + assert 0 + except ValueError: + pass + try: + bdecode('i03e') + assert 0 + except ValueError: + pass + try: + bdecode('l01:ae') + assert 0 + except ValueError: + pass + try: + bdecode('9999:x') + assert 0 + except ValueError: + pass + try: + bdecode('l0:') + assert 0 + except ValueError: + pass + try: + bdecode('d0:0:') + assert 0 + except ValueError: + pass + try: + bdecode('d0:') + assert 0 + except ValueError: + pass + +def bencode_rec(x, b): + t = type(x) + if t in (IntType, LongType): + b.write('i%de' % x) + elif t is StringType: + b.write('%d:%s' % (len(x), x)) + elif t in (ListType, TupleType): + b.write('l') + for e in x: + bencode_rec(e, b) + b.write('e') + elif t is DictType: + b.write('d') + keylist = x.keys() + keylist.sort() + for k in keylist: + assert type(k) is StringType + bencode_rec(k, b) + bencode_rec(x[k], b) + b.write('e') + else: + assert 0 + +def bencode(x): + b = StringIO() + bencode_rec(x, b) + return b.getvalue() + +def test_bencode(): + assert bencode(4) == 'i4e' + assert bencode(0) == 'i0e' + assert bencode(-10) == 'i-10e' + assert bencode(12345678901234567890L) == 'i12345678901234567890e' + assert bencode('') == '0:' + assert bencode('abc') == '3:abc' + assert bencode('1234567890') == '10:1234567890' + assert bencode([]) == 'le' + assert bencode([1, 2, 3]) == 'li1ei2ei3ee' + assert bencode([['Alice', 'Bob'], [2, 3]]) == 'll5:Alice3:Bobeli2ei3eee' + assert bencode({}) == 'de' + assert bencode({'age': 25, 'eyes': 'blue'}) == 'd3:agei25e4:eyes4:bluee' + assert bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}) == 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee' + try: + bencode({1: 'foo'}) + assert 0 + except AssertionError: + pass + diff --git a/apt_dht_Khashmir/const.py b/apt_dht_Khashmir/const.py new file mode 100644 index 0000000..58d539e --- /dev/null +++ b/apt_dht_Khashmir/const.py @@ -0,0 +1,60 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +""" +from twisted.internet.default import SelectReactor ## twistedmatrix.com + +reactor = SelectReactor() + +from twisted.internet import main +main.installReactor(reactor) + + +try: + import twisted.names.client + reactor.installResolver(twisted.names.client.theResolver) +except IOError: + print "no resolv.conf!" +""" + +# magic id to use before we know a peer's id +NULL_ID = 20 * '\0' + +# Kademlia "K" constant, this should be an even number +K = 8 + +# SHA1 is 160 bits long +HASH_LENGTH = 160 + +# checkpoint every this many seconds +CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes + + +### SEARCHING/STORING +# concurrent xmlrpc calls per find node/value request! +CONCURRENT_REQS = 4 + +# how many hosts to post to +STORE_REDUNDANCY = 3 + + +### ROUTING TABLE STUFF +# how many times in a row a node can fail to respond before it's booted from the routing table +MAX_FAILURES = 3 + +# never ping a node more often than this +MIN_PING_INTERVAL = 60 * 15 # fifteen minutes + +# refresh buckets that haven't been touched in this long +BUCKET_STALENESS = 60 * 60 # one hour + + +### KEY EXPIRER +# time before expirer starts running +KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db + +# time between expirer runs +KE_DELAY = 60 * 20 # 20 minutes + +# expire entries older than this +KE_AGE = 60 * 60 # 60 minutes diff --git a/apt_dht_Khashmir/khash.py b/apt_dht_Khashmir/khash.py new file mode 100644 index 0000000..1832edf --- /dev/null +++ b/apt_dht_Khashmir/khash.py @@ -0,0 +1,103 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from sha import sha +from os import urandom + +def intify(hstr): + """20 bit hash, big-endian -> long python integer""" + assert len(hstr) == 20 + return long(hstr.encode('hex'), 16) + +def stringify(num): + """long int -> 20-character string""" + str = hex(num)[2:] + if str[-1] == 'L': + str = str[:-1] + if len(str) % 2 != 0: + str = '0' + str + str = str.decode('hex') + return (20 - len(str)) *'\x00' + str + +def distance(a, b): + """distance between two 160-bit hashes expressed as 20-character strings""" + return intify(a) ^ intify(b) + + +def newID(): + """returns a new pseudorandom globally unique ID string""" + h = sha() + h.update(urandom(20)) + return h.digest() + +def newIDInRange(min, max): + return stringify(randRange(min,max)) + +def randRange(min, max): + return min + intify(newID()) % (max - min) + +def newTID(): + return randRange(-2**30, 2**30) + +### Test Cases ### +import unittest + +class NewID(unittest.TestCase): + def testLength(self): + self.assertEqual(len(newID()), 20) + def testHundreds(self): + for x in xrange(100): + self.testLength + +class Intify(unittest.TestCase): + known = [('\0' * 20, 0), + ('\xff' * 20, 2L**160 - 1), + ] + def testKnown(self): + for str, value in self.known: + self.assertEqual(intify(str), value) + def testEndianessOnce(self): + h = newID() + while h[-1] == '\xff': + h = newID() + k = h[:-1] + chr(ord(h[-1]) + 1) + self.assertEqual(intify(k) - intify(h), 1) + def testEndianessLots(self): + for x in xrange(100): + self.testEndianessOnce() + +class Disantance(unittest.TestCase): + known = [ + (("\0" * 20, "\xff" * 20), 2**160L -1), + ((sha("foo").digest(), sha("foo").digest()), 0), + ((sha("bar").digest(), sha("bar").digest()), 0) + ] + def testKnown(self): + for pair, dist in self.known: + self.assertEqual(distance(pair[0], pair[1]), dist) + def testCommutitive(self): + for i in xrange(100): + x, y, z = newID(), newID(), newID() + self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) + +class RandRange(unittest.TestCase): + def testOnce(self): + a = intify(newID()) + b = intify(newID()) + if a < b: + c = randRange(a, b) + self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) + else: + c = randRange(b, a) + assert b <= c < a, "output out of range %d %d %d" % (b, c, a) + + def testOneHundredTimes(self): + for i in xrange(100): + self.testOnce() + + + +if __name__ == '__main__': + unittest.main() + + diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py new file mode 100644 index 0000000..0196fd2 --- /dev/null +++ b/apt_dht_Khashmir/khashmir.py @@ -0,0 +1,349 @@ +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from time import time +from random import randrange +import sqlite ## find this at http://pysqlite.sourceforge.net/ + +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor + +import const +from ktable import KTable +from knode import KNodeBase, KNodeRead, KNodeWrite +from khash import newID, newIDInRange +from actions import FindNode, GetValue, KeyExpirer, StoreValue +import krpc + +class KhashmirDBExcept(Exception): + pass + +# this is the base class, has base functionality and find node, no key-value mappings +class KhashmirBase(protocol.Factory): + _Node = KNodeBase + def __init__(self, host, port, db='khashmir.db'): + self.setup(host, port, db) + + def setup(self, host, port, db='khashmir.db'): + self._findDB(db) + self.port = port + self.node = self._loadSelfNode(host, port) + self.table = KTable(self.node) + #self.app = service.Application("krpc") + self.udp = krpc.hostbroker(self) + self.udp.protocol = krpc.KRPC + self.listenport = reactor.listenUDP(port, self.udp) + self.last = time() + self._loadRoutingTable() + KeyExpirer(store=self.store) + self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) + + def Node(self): + n = self._Node() + n.table = self.table + return n + + def __del__(self): + self.listenport.stopListening() + + def _loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0] + else: + id = newID() + return self._Node().init(id, host, port) + + def _saveSelfNode(self): + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) + self.store.commit() + + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + self.refreshTable() + if auto: + reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) + + def _findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + #self.store.autocommit = 0 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + s = """ + create table kv (key binary, value binary, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id binary primary key, host text, port number); + + create table self (num number primary key, id binary); + """ + c = self.store.cursor() + c.execute(s) + self.store.commit() + + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) + self.store.commit() + + def _loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.table.insertNode(n, contacted=0) + + + ####### + ####### LOCAL INTERFACE - use these methods! + def addContact(self, host, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + n =self.Node().init(const.NULL_ID, host, port) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.sendPing(n, callback=callback) + + ## this call is async! + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callLater(0, state.goWithNodes, nodes) + + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(dict, old=old): + """ called when we get a pong from the old node """ + dict = dict['rsp'] + if dict['id'] == old.id: + self.table.justSeenNode(old.id) + + df = old.ping(self.node.id) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node, callback=None): + """ + ping a node + """ + df = node.ping(self.node.id) + ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, table=self.table, callback=callback): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict['id']} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() + def _defaultPong(err, node=node, table=self.table, callback=callback): + table.nodeFailed(node) + if callback: + callback() + + df.addCallbacks(_pongHandler,_defaultPong) + + def findCloseNodes(self, callback=lambda a: None): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) + + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + + for bucket in self.table.buckets: + if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) + + def stats(self): + """ + Returns (num_contacts, num_nodes) + num_contacts: number contacts in our routing table + num_nodes: number of nodes estimated in the entire dht + """ + num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + return (num_contacts, num_nodes) + + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + + def krpc_find_node(self, target, id, _krpc_sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"nodes" : nodes, "id" : self.node.id} + + +## This class provides read-only access to the DHT, valueForKey +## you probably want to use this mixin and provide your own write methods +class KhashmirRead(KhashmirBase): + _Node = KNodeRead + def retrieveValues(self, key): + c = self.store.cursor() + c.execute("select value from kv where key = %s;", sqlite.encode(key)) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l + ## also async + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback) + reactor.callLater(0, state.goWithNodes, nodes, l) + + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "id": self.node.id} + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes, "id": self.node.id} + +### provides a generic write method, you probably don't want to deploy something that allows +### arbitrary value storage +class KhashmirWrite(KhashmirRead): + _Node = KNodeWrite + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callLater(0, action.goWithNodes, nodes) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time() + c = self.store.cursor() + try: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + self.store.commit() + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + +# the whole shebang, for testing +class Khashmir(KhashmirWrite): + _Node = KNodeWrite diff --git a/apt_dht_Khashmir/knet.py b/apt_dht_Khashmir/knet.py new file mode 100644 index 0000000..eb9215c --- /dev/null +++ b/apt_dht_Khashmir/knet.py @@ -0,0 +1,69 @@ +# +# knet.py +# create a network of khashmir nodes +# usage: knet.py + +from random import randrange +import sys, os + +from twisted.internet import reactor + +from khashmir import Khashmir + +class Network: + def __init__(self, size=0, startport=5555, localip='127.0.0.1'): + self.num = size + self.startport = startport + self.localip = localip + + def _done(self, val): + self.done = 1 + + def setUp(self): + self.kfiles() + self.l = [] + for i in range(self.num): + self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i))) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + i.addContact(self.localip, self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.listenport.stopListening() + self.kfiles() + + def kfiles(self): + for i in range(self.startport, self.startport+self.num): + try: + os.unlink('/tmp/kh%s.db' % i) + except: + pass + + reactor.iterate() + +if __name__ == "__main__": + n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]) + n.setUp() + try: + reactor.run() + finally: + n.tearDown() diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py new file mode 100644 index 0000000..d2cea72 --- /dev/null +++ b/apt_dht_Khashmir/knode.py @@ -0,0 +1,53 @@ +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from const import NULL_ID +from node import Node + +class KNodeBase(Node): + def checkSender(self, dict): + try: + senderid = dict['rsp']['id'] + except KeyError: + print ">>>> No peer id in response" + raise Exception, "No peer id in response." + else: + if self.id != NULL_ID and senderid != self.id: + print "Got response from different node than expected." + self.table.invalidateNode(self) + + return dict + + def errBack(self, err): + print ">>> ", err + return err + + def ping(self, id): + df = self.conn.sendRequest('ping', {"id":id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + def findNode(self, target, id): + df = self.conn.sendRequest('find_node', {"target" : target, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + +class KNodeRead(KNodeBase): + def findValue(self, key, id): + df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + +class KNodeWrite(KNodeRead): + def storeValue(self, key, value, id): + df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + def storeValues(self, key, value, id): + df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py new file mode 100644 index 0000000..8a60092 --- /dev/null +++ b/apt_dht_Khashmir/krpc.py @@ -0,0 +1,159 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from bencode import bencode, bdecode +from time import asctime +import sys +from traceback import format_exception + +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor + +KRPC_TIMEOUT = 20 + +KRPC_ERROR = 1 +KRPC_ERROR_METHOD_UNKNOWN = 2 +KRPC_ERROR_RECEIVED_UNKNOWN = 3 +KRPC_ERROR_TIMEOUT = 4 + +# commands +TID = 't' +REQ = 'q' +RSP = 'r' +TYP = 'y' +ARG = 'a' +ERR = 'e' + +class hostbroker(protocol.DatagramProtocol): + def __init__(self, server): + self.server = server + # this should be changed to storage that drops old entries + self.connections = {} + + def datagramReceived(self, datagram, addr): + #print `addr`, `datagram` + #if addr != self.addr: + c = self.connectionForAddr(addr) + c.datagramReceived(datagram, addr) + #if c.idle(): + # del self.connections[addr] + + def connectionForAddr(self, addr): + if addr == self.addr: + raise Exception + if not self.connections.has_key(addr): + conn = self.protocol(addr, self.server, self.transport) + self.connections[addr] = conn + else: + conn = self.connections[addr] + return conn + + def makeConnection(self, transport): + protocol.DatagramProtocol.makeConnection(self, transport) + tup = transport.getHost() + self.addr = (tup.host, tup.port) + +## connection +class KRPC: + noisy = 1 + def __init__(self, addr, server, transport): + self.transport = transport + self.factory = server + self.addr = addr + self.tids = {} + self.mtid = 0 + + def datagramReceived(self, str, addr): + # bdecode + try: + msg = bdecode(str) + except Exception, e: + if self.noisy: + print "response decode error: " + `e` + else: + #if self.noisy: + # print msg + # look at msg type + if msg[TYP] == REQ: + ilen = len(str) + # if request + # tell factory to handle + f = getattr(self.factory ,"krpc_" + msg[REQ], None) + msg[ARG]['_krpc_sender'] = self.addr + if f and callable(f): + try: + ret = apply(f, (), msg[ARG]) + except Exception, e: + ## send error + out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) + olen = len(out) + self.transport.write(out, addr) + else: + if ret: + # make response + out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) + else: + out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) + # send response + olen = len(out) + self.transport.write(out, addr) + + else: + if self.noisy: + print "don't know about method %s" % msg[REQ] + # unknown method + out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) + olen = len(out) + self.transport.write(out, addr) + if self.noisy: + print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, + ilen, msg[REQ], olen) + elif msg[TYP] == RSP: + # if response + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + del(self.tids[msg[TID]]) + df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) + else: + print 'timeout ' + `msg[RSP]['id']` + # no tid, this transaction timed out already... + elif msg[TYP] == ERR: + # if error + # lookup tid + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] + # callback + df.errback(msg[ERR]) + del(self.tids[msg[TID]]) + else: + # day late and dollar short + pass + else: + print "unknown message type " + `msg` + # unknown message type + df = self.tids[msg[TID]] + # callback + df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) + del(self.tids[msg[TID]]) + + def sendRequest(self, method, args): + # make message + # send it + msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} + self.mtid = (self.mtid + 1) % 256 + str = bencode(msg) + d = Deferred() + self.tids[msg[TID]] = d + def timeOut(tids = self.tids, id = msg[TID]): + if tids.has_key(id): + df = tids[id] + del(tids[id]) + print ">>>>>> KRPC_ERROR_TIMEOUT" + df.errback(KRPC_ERROR_TIMEOUT) + reactor.callLater(KRPC_TIMEOUT, timeOut) + self.transport.write(str, self.addr) + return d + diff --git a/apt_dht_Khashmir/ktable.py b/apt_dht_Khashmir/ktable.py new file mode 100644 index 0000000..e0ff8df --- /dev/null +++ b/apt_dht_Khashmir/ktable.py @@ -0,0 +1,241 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from time import time +from bisect import bisect_left + +from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES +import khash +from node import Node + +class KTable: + """local routing table for a kademlia like distributed hash table""" + def __init__(self, node): + # this is the root node, a.k.a. US! + self.node = node + self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] + self.insertNode(node) + + def _bucketIndexForInt(self, num): + """the index of the bucket that should hold int""" + return bisect_left(self.buckets, num) + + def findNodes(self, id): + """ + return K nodes in our own local table closest to the ID. + """ + + if isinstance(id, str): + num = khash.intify(id) + elif isinstance(id, Node): + num = id.num + elif isinstance(id, int) or isinstance(id, long): + num = id + else: + raise TypeError, "findNodes requires an int, string, or Node" + + nodes = [] + i = self._bucketIndexForInt(num) + + # if this node is already in our table then return it + try: + index = self.buckets[i].l.index(num) + except ValueError: + pass + else: + return [self.buckets[i].l[index]] + + # don't have the node, get the K closest nodes + nodes = nodes + self.buckets[i].l + if len(nodes) < K: + # need more nodes + min = i - 1 + max = i + 1 + while len(nodes) < K and (min >= 0 or max < len(self.buckets)): + #ASw: note that this requires K be even + if min >= 0: + nodes = nodes + self.buckets[min].l + if max < len(self.buckets): + nodes = nodes + self.buckets[max].l + min = min - 1 + max = max + 1 + + nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) + return nodes[:K] + + def _splitBucket(self, a): + diff = (a.max - a.min) / 2 + b = KBucket([], a.max - diff, a.max) + self.buckets.insert(self.buckets.index(a.min) + 1, b) + a.max = a.max - diff + # transfer nodes to new bucket + for anode in a.l[:]: + if anode.num >= a.max: + a.l.remove(anode) + b.l.append(anode) + + def replaceStaleNode(self, stale, new): + """this is used by clients to replace a node returned by insertNode after + it fails to respond to a Pong message""" + i = self._bucketIndexForInt(stale.num) + try: + it = self.buckets[i].l.index(stale.num) + except ValueError: + return + + del(self.buckets[i].l[it]) + if new: + self.buckets[i].l.append(new) + + def insertNode(self, node, contacted=1): + """ + this insert the node, returning None if successful, returns the oldest node in the bucket if it's full + the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! + contacted means that yes, we contacted THEM and we know the node is reachable + """ + assert node.id != NULL_ID + if node.id == self.node.id: return + # get the bucket for this node + i = self. _bucketIndexForInt(node.num) + # check to see if node is in the bucket already + try: + it = self.buckets[i].l.index(node.num) + except ValueError: + # no + pass + else: + if contacted: + node.updateLastSeen() + # move node to end of bucket + xnode = self.buckets[i].l[it] + del(self.buckets[i].l[it]) + # note that we removed the original and replaced it with the new one + # utilizing this nodes new contact info + self.buckets[i].l.append(xnode) + self.buckets[i].touch() + return + + # we don't have this node, check to see if the bucket is full + if len(self.buckets[i].l) < K: + # no, append this node and return + if contacted: + node.updateLastSeen() + self.buckets[i].l.append(node) + self.buckets[i].touch() + return + + # bucket is full, check to see if self.node is in the bucket + if not (self.buckets[i].min <= self.node < self.buckets[i].max): + return self.buckets[i].l[0] + + # this bucket is full and contains our node, split the bucket + if len(self.buckets) >= HASH_LENGTH: + # our table is FULL, this is really unlikely + print "Hash Table is FULL! Increase K!" + return + + self._splitBucket(self.buckets[i]) + + # now that the bucket is split and balanced, try to insert the node again + return self.insertNode(node) + + def justSeenNode(self, id): + """call this any time you get a message from a node + it will update it in the table if it's there """ + try: + n = self.findNodes(id)[0] + except IndexError: + return None + else: + tstamp = n.lastSeen + n.updateLastSeen() + return tstamp + + def invalidateNode(self, n): + """ + forget about node n - use when you know that node is invalid + """ + self.replaceStaleNode(n, None) + + def nodeFailed(self, node): + """ call this when a node fails to respond to a message, to invalidate that node """ + try: + n = self.findNodes(node.num)[0] + except IndexError: + return None + else: + if n.msgFailed() >= MAX_FAILURES: + self.invalidateNode(n) + +class KBucket: + def __init__(self, contents, min, max): + self.l = contents + self.min = min + self.max = max + self.lastAccessed = time() + + def touch(self): + self.lastAccessed = time() + + def getNodeWithInt(self, num): + if num in self.l: return num + else: raise ValueError + + def __repr__(self): + return "" % (len(self.l), self.min, self.max) + + ## Comparators + # necessary for bisecting list of buckets with a hash expressed as an integer or a distance + # compares integer or node object with the bucket's range + def __lt__(self, a): + if isinstance(a, Node): a = a.num + return self.max <= a + def __le__(self, a): + if isinstance(a, Node): a = a.num + return self.min < a + def __gt__(self, a): + if isinstance(a, Node): a = a.num + return self.min > a + def __ge__(self, a): + if isinstance(a, Node): a = a.num + return self.max >= a + def __eq__(self, a): + if isinstance(a, Node): a = a.num + return self.min <= a and self.max > a + def __ne__(self, a): + if isinstance(a, Node): a = a.num + return self.min >= a or self.max < a + + +### UNIT TESTS ### +import unittest + +class TestKTable(unittest.TestCase): + def setUp(self): + self.a = Node().init(khash.newID(), 'localhost', 2002) + self.t = KTable(self.a) + + def testAddNode(self): + self.b = Node().init(khash.newID(), 'localhost', 2003) + self.t.insertNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + def testRemove(self): + self.testAddNode() + self.t.invalidateNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + + def testFail(self): + self.testAddNode() + for i in range(MAX_FAILURES - 1): + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/apt_dht_Khashmir/node.py b/apt_dht_Khashmir/node.py new file mode 100644 index 0000000..35dadc6 --- /dev/null +++ b/apt_dht_Khashmir/node.py @@ -0,0 +1,82 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from time import time +from types import InstanceType + +import khash + +class Node: + """encapsulate contact info""" + def __init__(self): + self.fails = 0 + self.lastSeen = 0 + self.id = self.host = self.port = '' + + def init(self, id, host, port): + self.id = id + self.num = khash.intify(id) + self.host = host + self.port = port + self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host} + return self + + def initWithDict(self, dict): + self._senderDict = dict + self.id = dict['id'] + self.num = khash.intify(self.id) + self.port = dict['port'] + self.host = dict['host'] + return self + + def updateLastSeen(self): + self.lastSeen = time() + self.fails = 0 + + def msgFailed(self): + self.fails = self.fails + 1 + return self.fails + + def senderDict(self): + return self._senderDict + + def __repr__(self): + return `(self.id, self.host, self.port)` + + ## these comparators let us bisect/index a list full of nodes with either a node or an int/long + def __lt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num < a + def __le__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num <= a + def __gt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num > a + def __ge__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num >= a + def __eq__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num == a + def __ne__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num != a + + +import unittest + +class TestNode(unittest.TestCase): + def setUp(self): + self.node = Node().init(khash.newID(), 'localhost', 2002) + def testUpdateLastSeen(self): + t = self.node.lastSeen + self.node.updateLastSeen() + assert t < self.node.lastSeen + \ No newline at end of file diff --git a/apt_dht_Khashmir/test.py b/apt_dht_Khashmir/test.py new file mode 100644 index 0000000..bd949fd --- /dev/null +++ b/apt_dht_Khashmir/test.py @@ -0,0 +1,7 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +import unittest + +tests = unittest.defaultTestLoader.loadTestsFromNames(['khash', 'node', 'knode', 'actions', 'ktable', 'test_krpc']) +result = unittest.TextTestRunner().run(tests) diff --git a/apt_dht_Khashmir/test_khashmir.py b/apt_dht_Khashmir/test_khashmir.py new file mode 100644 index 0000000..24a8760 --- /dev/null +++ b/apt_dht_Khashmir/test_khashmir.py @@ -0,0 +1,145 @@ +from unittest import defaultTestLoader, TextTestRunner, TestCase +from sha import sha +from random import randrange +import os, sys + +from twisted.internet import reactor + +from khashmir import Khashmir +from khash import newID + +if __name__ =="__main__": + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) + +class SimpleTests(TestCase): + def setUp(self): + self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test') + self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test') + + def tearDown(self): + self.a.listenport.stopListening() + self.b.listenport.stopListening() + os.unlink('/tmp/a.test') + os.unlink('/tmp/b.test') + reactor.iterate() + reactor.iterate() + + def addContacts(self): + self.a.addContact('127.0.0.1', 4045) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + def testAddContact(self): + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 0) + + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 0) + + self.addContacts() + + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 1) + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 1) + + def testStoreRetrieve(self): + self.addContacts() + self.got = 0 + self.a.storeValueForKey(sha('foo').digest(), 'foobar') + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.a.valueForKey(sha('foo').digest(), self._cb) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + def _cb(self, val): + if not val: + self.assertEqual(self.got, 1) + elif 'foobar' in val: + self.got = 1 + + +class MultiTest(TestCase): + num = 20 + def _done(self, val): + self.done = 1 + + def setUp(self): + self.l = [] + self.startport = 4088 + for i in range(self.num): + self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i))) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.listenport.stopListening() + for i in range(self.startport, self.startport+self.num): + os.unlink('/tmp/%s.test' % i) + + reactor.iterate() + + def testStoreRetrieve(self): + for i in range(10): + K = newID() + V = newID() + + for a in range(3): + self.done = 0 + def _scb(val): + self.done = 1 + self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) + while not self.done: + reactor.iterate() + + + def _rcb(val): + if not val: + self.done = 1 + self.assertEqual(self.got, 1) + elif V in val: + self.got = 1 + for x in range(3): + self.got = 0 + self.done = 0 + self.l[randrange(0, self.num)].valueForKey(K, _rcb) + while not self.done: + reactor.iterate() + + + + + diff --git a/apt_dht_Khashmir/test_krpc.py b/apt_dht_Khashmir/test_krpc.py new file mode 100644 index 0000000..3c41d07 --- /dev/null +++ b/apt_dht_Khashmir/test_krpc.py @@ -0,0 +1,174 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from unittest import defaultTestLoader, TestCase, TextTestRunner +import sys + +from twisted.internet import protocol +from twisted.internet import reactor + +from krpc import KRPC, hostbroker, KRPC_ERROR_METHOD_UNKNOWN + +KRPC.noisy = 0 + +if __name__ =="__main__": + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) + + +def connectionForAddr(host, port): + return host + + +class Receiver(protocol.Factory): + protocol = KRPC + def __init__(self): + self.buf = [] + def krpc_store(self, msg, _krpc_sender): + self.buf += [msg] + def krpc_echo(self, msg, _krpc_sender): + return msg + +def make(port): + af = Receiver() + a = hostbroker(af) + a.protocol = KRPC + p = reactor.listenUDP(port, a) + return af, a, p + +class KRPCTests(TestCase): + def setUp(self): + self.noisy = 0 + self.af, self.a, self.ap = make(1180) + self.bf, self.b, self.bp = make(1181) + + def tearDown(self): + self.ap.stopListening() + self.bp.stopListening() + reactor.iterate() + reactor.iterate() + + def testSimpleMessage(self): + self.noisy = 0 + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.bf.buf, ["This is a test."]) + + def testMessageBlast(self): + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.bf.buf, ["This is a test."]) + self.bf.buf = [] + + for i in range(100): + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + #self.bf.buf = [] + self.assertEqual(self.bf.buf, ["This is a test."] * 100) + + def testEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] + self.msg = msg + + def testManyEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + for i in xrange(100): + self.msg = None + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + def testMultiEcho(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is another test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is yet another test.") + + def testEchoReset(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is another test.") + + del(self.a.connections[('127.0.0.1', 1181)]) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is yet another test.") + + def testLotsofEchoReset(self): + for i in range(100): + self.testEchoReset() + + def testUnknownMeth(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) + df.addErrback(self.gotErr) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN) + + def gotErr(self, err): + self.err = err.value + \ No newline at end of file diff --git a/apt_dht_Khashmir/util.py b/apt_dht_Khashmir/util.py new file mode 100644 index 0000000..7808857 --- /dev/null +++ b/apt_dht_Khashmir/util.py @@ -0,0 +1,23 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +def bucket_stats(l): + """given a list of khashmir instances, finds min, max, and average number of nodes in tables""" + max = avg = 0 + min = None + def count(buckets): + c = 0 + for bucket in buckets: + c = c + len(bucket.l) + return c + for node in l: + c = count(node.table.buckets) + if min == None: + min = c + elif c < min: + min = c + if c > max: + max = c + avg = avg + c + avg = avg / len(l) + return {'min':min, 'max':max, 'avg':avg} diff --git a/apt_dht_conf.py b/apt_dht_conf.py deleted file mode 100644 index 32ffa06..0000000 --- a/apt_dht_conf.py +++ /dev/null @@ -1,105 +0,0 @@ - -import os, sys -from ConfigParser import SafeConfigParser - -from twisted.python import log, versions - -class ConfigError(Exception): - def __init__(self, message): - self.message = message - def __str__(self): - return repr(self.message) - -version = versions.Version('apt-dht', 0, 0, 0) -home = os.path.expandvars('${HOME}') -if home == '${HOME}' or not os.path.isdir(home): - home = os.path.expanduser('~') - if not os.path.isdir(home): - home = os.path.abspath(os.path.dirname(sys.argv[0])) - -DEFAULTS = { - - # Port to listen on for all requests (TCP and UDP) - 'port': '9977', - - # Directory to store the downloaded files in - 'cache_dir': home + '/.apt-dht/cache', - - # User name to try and run as - 'username': '', - - # Which DHT implementation to use - 'DHT': 'Khashmir', -} - -DHT_DEFAULTS = { - # magic id to use before we know a peer's id - 'NULL_ID': 20 * '\0', - - # Kademlia "K" constant, this should be an even number - 'K': '8', - - # SHA1 is 160 bits long - 'HASH_LENGTH': '160', - - # checkpoint every this many seconds - 'CHECKPOINT_INTERVAL': '15m', # fifteen minutes - - ### SEARCHING/STORING - # concurrent xmlrpc calls per find node/value request! - 'CONCURRENT_REQS': '4', - - # how many hosts to post to - 'STORE_REDUNDANCY': '3', - - ### ROUTING TABLE STUFF - # how many times in a row a node can fail to respond before it's booted from the routing table - 'MAX_FAILURES': '3', - - # never ping a node more often than this - 'MIN_PING_INTERVAL': '15m', # fifteen minutes - - # refresh buckets that haven't been touched in this long - 'BUCKET_STALENESS': '1h', # one hour - - ### KEY EXPIRER - # time before expirer starts running - 'KEINITIAL_DELAY': '15s', # 15 seconds - to clean out old stuff in persistent db - - # time between expirer runs - 'KE_DELAY': '20m', # 20 minutes - - # expire entries older than this - 'KE_AGE': '1h', # 60 minutes -} - -class AptDHTConfigParser(SafeConfigParser): - """ - Adds 'gettime' to ConfigParser to interpret the suffixes. - """ - time_multipliers={ - 's': 1, #seconds - 'm': 60, #minutes - 'h': 3600, #hours - 'd': 86400,#days - } - - def gettime(self, section, option): - mult = 1 - value = self.get(section, option) - if len(value) == 0: - raise ConfigError("Configuration parse error: [%s] %s" % (section, option)) - suffix = value[-1].lower() - if suffix in self.time_multipliers.keys(): - mult = self.time_multipliers[suffix] - value = value[:-1] - return int(value)*mult - def getstring(self, section, option): - return self.get(section,option) - def getstringlist(self, section, option): - return self.get(section,option).split() - -config = AptDHTConfigParser(DEFAULTS) -config.add_section(config.get('DEFAULT', 'DHT')) -for k in DHT_DEFAULTS: - config.set(config.get('DEFAULT', 'DHT'), k, DHT_DEFAULTS[k]) diff --git a/bencode.py b/bencode.py deleted file mode 100644 index b3f9a68..0000000 --- a/bencode.py +++ /dev/null @@ -1,254 +0,0 @@ -# Written by Petru Paler -# see LICENSE.txt for license information - -from types import IntType, LongType, StringType, ListType, TupleType, DictType -from re import compile -from cStringIO import StringIO - -int_filter = compile('(0|-?[1-9][0-9]*)e') - -def decode_int(x, f): - m = int_filter.match(x, f) - if m is None: - raise ValueError - return (long(m.group(1)), m.end()) - -string_filter = compile('(0|[1-9][0-9]*):') - -def decode_string(x, f): - m = string_filter.match(x, f) - if m is None: - raise ValueError - l = int(m.group(1)) - s = m.end() - return (x[s:s+l], s + l) - -def decode_list(x, f): - r = [] - while x[f] != 'e': - v, f = bdecode_rec(x, f) - r.append(v) - return (r, f + 1) - -def decode_dict(x, f): - r = {} - lastkey = None - while x[f] != 'e': - k, f = decode_string(x, f) - if lastkey is not None and lastkey >= k: - raise ValueError - lastkey = k - v, f = bdecode_rec(x, f) - r[k] = v - return (r, f + 1) - -def bdecode_rec(x, f): - t = x[f] - if t == 'i': - return decode_int(x, f + 1) - elif t == 'l': - return decode_list(x, f + 1) - elif t == 'd': - return decode_dict(x, f + 1) - else: - return decode_string(x, f) - -def bdecode(x): - try: - r, l = bdecode_rec(x, 0) - except IndexError: - raise ValueError - if l != len(x): - raise ValueError - return r - -def test_bdecode(): - try: - bdecode('0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('ie') - assert 0 - except ValueError: - pass - try: - bdecode('i341foo382e') - assert 0 - except ValueError: - pass - assert bdecode('i4e') == 4L - assert bdecode('i0e') == 0L - assert bdecode('i123456789e') == 123456789L - assert bdecode('i-10e') == -10L - try: - bdecode('i-0e') - assert 0 - except ValueError: - pass - try: - bdecode('i123') - assert 0 - except ValueError: - pass - try: - bdecode('') - assert 0 - except ValueError: - pass - try: - bdecode('i6easd') - assert 0 - except ValueError: - pass - try: - bdecode('35208734823ljdahflajhdf') - assert 0 - except ValueError: - pass - try: - bdecode('2:abfdjslhfld') - assert 0 - except ValueError: - pass - assert bdecode('0:') == '' - assert bdecode('3:abc') == 'abc' - assert bdecode('10:1234567890') == '1234567890' - try: - bdecode('02:xy') - assert 0 - except ValueError: - pass - try: - bdecode('l') - assert 0 - except ValueError: - pass - assert bdecode('le') == [] - try: - bdecode('leanfdldjfh') - assert 0 - except ValueError: - pass - assert bdecode('l0:0:0:e') == ['', '', ''] - try: - bdecode('relwjhrlewjh') - assert 0 - except ValueError: - pass - assert bdecode('li1ei2ei3ee') == [1, 2, 3] - assert bdecode('l3:asd2:xye') == ['asd', 'xy'] - assert bdecode('ll5:Alice3:Bobeli2ei3eee') == [['Alice', 'Bob'], [2, 3]] - try: - bdecode('d') - assert 0 - except ValueError: - pass - try: - bdecode('defoobar') - assert 0 - except ValueError: - pass - assert bdecode('de') == {} - assert bdecode('d3:agei25e4:eyes4:bluee') == {'age': 25, 'eyes': 'blue'} - assert bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee') == {'spam.mp3': {'author': 'Alice', 'length': 100000}} - try: - bdecode('d3:fooe') - assert 0 - except ValueError: - pass - try: - bdecode('di1e0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:b0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:a0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('i03e') - assert 0 - except ValueError: - pass - try: - bdecode('l01:ae') - assert 0 - except ValueError: - pass - try: - bdecode('9999:x') - assert 0 - except ValueError: - pass - try: - bdecode('l0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:') - assert 0 - except ValueError: - pass - -def bencode_rec(x, b): - t = type(x) - if t in (IntType, LongType): - b.write('i%de' % x) - elif t is StringType: - b.write('%d:%s' % (len(x), x)) - elif t in (ListType, TupleType): - b.write('l') - for e in x: - bencode_rec(e, b) - b.write('e') - elif t is DictType: - b.write('d') - keylist = x.keys() - keylist.sort() - for k in keylist: - assert type(k) is StringType - bencode_rec(k, b) - bencode_rec(x[k], b) - b.write('e') - else: - assert 0 - -def bencode(x): - b = StringIO() - bencode_rec(x, b) - return b.getvalue() - -def test_bencode(): - assert bencode(4) == 'i4e' - assert bencode(0) == 'i0e' - assert bencode(-10) == 'i-10e' - assert bencode(12345678901234567890L) == 'i12345678901234567890e' - assert bencode('') == '0:' - assert bencode('abc') == '3:abc' - assert bencode('1234567890') == '10:1234567890' - assert bencode([]) == 'le' - assert bencode([1, 2, 3]) == 'li1ei2ei3ee' - assert bencode([['Alice', 'Bob'], [2, 3]]) == 'll5:Alice3:Bobeli2ei3eee' - assert bencode({}) == 'de' - assert bencode({'age': 25, 'eyes': 'blue'}) == 'd3:agei25e4:eyes4:bluee' - assert bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}) == 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee' - try: - bencode({1: 'foo'}) - assert 0 - except AssertionError: - pass - diff --git a/const.py b/const.py deleted file mode 100644 index 58d539e..0000000 --- a/const.py +++ /dev/null @@ -1,60 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -""" -from twisted.internet.default import SelectReactor ## twistedmatrix.com - -reactor = SelectReactor() - -from twisted.internet import main -main.installReactor(reactor) - - -try: - import twisted.names.client - reactor.installResolver(twisted.names.client.theResolver) -except IOError: - print "no resolv.conf!" -""" - -# magic id to use before we know a peer's id -NULL_ID = 20 * '\0' - -# Kademlia "K" constant, this should be an even number -K = 8 - -# SHA1 is 160 bits long -HASH_LENGTH = 160 - -# checkpoint every this many seconds -CHECKPOINT_INTERVAL = 60 * 15 # fifteen minutes - - -### SEARCHING/STORING -# concurrent xmlrpc calls per find node/value request! -CONCURRENT_REQS = 4 - -# how many hosts to post to -STORE_REDUNDANCY = 3 - - -### ROUTING TABLE STUFF -# how many times in a row a node can fail to respond before it's booted from the routing table -MAX_FAILURES = 3 - -# never ping a node more often than this -MIN_PING_INTERVAL = 60 * 15 # fifteen minutes - -# refresh buckets that haven't been touched in this long -BUCKET_STALENESS = 60 * 60 # one hour - - -### KEY EXPIRER -# time before expirer starts running -KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db - -# time between expirer runs -KE_DELAY = 60 * 20 # 20 minutes - -# expire entries older than this -KE_AGE = 60 * 60 # 60 minutes diff --git a/khash.py b/khash.py deleted file mode 100644 index 1832edf..0000000 --- a/khash.py +++ /dev/null @@ -1,103 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from sha import sha -from os import urandom - -def intify(hstr): - """20 bit hash, big-endian -> long python integer""" - assert len(hstr) == 20 - return long(hstr.encode('hex'), 16) - -def stringify(num): - """long int -> 20-character string""" - str = hex(num)[2:] - if str[-1] == 'L': - str = str[:-1] - if len(str) % 2 != 0: - str = '0' + str - str = str.decode('hex') - return (20 - len(str)) *'\x00' + str - -def distance(a, b): - """distance between two 160-bit hashes expressed as 20-character strings""" - return intify(a) ^ intify(b) - - -def newID(): - """returns a new pseudorandom globally unique ID string""" - h = sha() - h.update(urandom(20)) - return h.digest() - -def newIDInRange(min, max): - return stringify(randRange(min,max)) - -def randRange(min, max): - return min + intify(newID()) % (max - min) - -def newTID(): - return randRange(-2**30, 2**30) - -### Test Cases ### -import unittest - -class NewID(unittest.TestCase): - def testLength(self): - self.assertEqual(len(newID()), 20) - def testHundreds(self): - for x in xrange(100): - self.testLength - -class Intify(unittest.TestCase): - known = [('\0' * 20, 0), - ('\xff' * 20, 2L**160 - 1), - ] - def testKnown(self): - for str, value in self.known: - self.assertEqual(intify(str), value) - def testEndianessOnce(self): - h = newID() - while h[-1] == '\xff': - h = newID() - k = h[:-1] + chr(ord(h[-1]) + 1) - self.assertEqual(intify(k) - intify(h), 1) - def testEndianessLots(self): - for x in xrange(100): - self.testEndianessOnce() - -class Disantance(unittest.TestCase): - known = [ - (("\0" * 20, "\xff" * 20), 2**160L -1), - ((sha("foo").digest(), sha("foo").digest()), 0), - ((sha("bar").digest(), sha("bar").digest()), 0) - ] - def testKnown(self): - for pair, dist in self.known: - self.assertEqual(distance(pair[0], pair[1]), dist) - def testCommutitive(self): - for i in xrange(100): - x, y, z = newID(), newID(), newID() - self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) - -class RandRange(unittest.TestCase): - def testOnce(self): - a = intify(newID()) - b = intify(newID()) - if a < b: - c = randRange(a, b) - self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) - else: - c = randRange(b, a) - assert b <= c < a, "output out of range %d %d %d" % (b, c, a) - - def testOneHundredTimes(self): - for i in xrange(100): - self.testOnce() - - - -if __name__ == '__main__': - unittest.main() - - diff --git a/khashmir.py b/khashmir.py deleted file mode 100644 index 0196fd2..0000000 --- a/khashmir.py +++ /dev/null @@ -1,349 +0,0 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from time import time -from random import randrange -import sqlite ## find this at http://pysqlite.sourceforge.net/ - -from twisted.internet.defer import Deferred -from twisted.internet import protocol -from twisted.internet import reactor - -import const -from ktable import KTable -from knode import KNodeBase, KNodeRead, KNodeWrite -from khash import newID, newIDInRange -from actions import FindNode, GetValue, KeyExpirer, StoreValue -import krpc - -class KhashmirDBExcept(Exception): - pass - -# this is the base class, has base functionality and find node, no key-value mappings -class KhashmirBase(protocol.Factory): - _Node = KNodeBase - def __init__(self, host, port, db='khashmir.db'): - self.setup(host, port, db) - - def setup(self, host, port, db='khashmir.db'): - self._findDB(db) - self.port = port - self.node = self._loadSelfNode(host, port) - self.table = KTable(self.node) - #self.app = service.Application("krpc") - self.udp = krpc.hostbroker(self) - self.udp.protocol = krpc.KRPC - self.listenport = reactor.listenUDP(port, self.udp) - self.last = time() - self._loadRoutingTable() - KeyExpirer(store=self.store) - self.refreshTable(force=1) - reactor.callLater(60, self.checkpoint, (1,)) - - def Node(self): - n = self._Node() - n.table = self.table - return n - - def __del__(self): - self.listenport.stopListening() - - def _loadSelfNode(self, host, port): - c = self.store.cursor() - c.execute('select id from self where num = 0;') - if c.rowcount > 0: - id = c.fetchone()[0] - else: - id = newID() - return self._Node().init(id, host, port) - - def _saveSelfNode(self): - c = self.store.cursor() - c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) - self.store.commit() - - def checkpoint(self, auto=0): - self._saveSelfNode() - self._dumpRoutingTable() - self.refreshTable() - if auto: - reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) - - def _findDB(self, db): - import os - try: - os.stat(db) - except OSError: - self._createNewDB(db) - else: - self._loadDB(db) - - def _loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - #self.store.autocommit = 0 - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() - - def _createNewDB(self, db): - self.store = sqlite.connect(db=db) - s = """ - create table kv (key binary, value binary, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); - - create table nodes (id binary primary key, host text, port number); - - create table self (num number primary key, id binary); - """ - c = self.store.cursor() - c.execute(s) - self.store.commit() - - def _dumpRoutingTable(self): - """ - save routing table nodes to the database - """ - c = self.store.cursor() - c.execute("delete from nodes where id not NULL;") - for bucket in self.table.buckets: - for node in bucket.l: - c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) - self.store.commit() - - def _loadRoutingTable(self): - """ - load routing table nodes from database - it's usually a good idea to call refreshTable(force=1) after loading the table - """ - c = self.store.cursor() - c.execute("select * from nodes;") - for rec in c.fetchall(): - n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.table.insertNode(n, contacted=0) - - - ####### - ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port, callback=None): - """ - ping this node and add the contact info to the table on pong! - """ - n =self.Node().init(const.NULL_ID, host, port) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.sendPing(n, callback=callback) - - ## this call is async! - def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : - d.callback(nodes) - else: - # create our search state - state = FindNode(self, id, d.callback) - reactor.callLater(0, state.goWithNodes, nodes) - - def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary - - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n, contacted=contacted) - if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) - - def _notStaleNodeHandler(dict, old=old): - """ called when we get a pong from the old node """ - dict = dict['rsp'] - if dict['id'] == old.id: - self.table.justSeenNode(old.id) - - df = old.ping(self.node.id) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) - - def sendPing(self, node, callback=None): - """ - ping a node - """ - df = node.ping(self.node.id) - ## these are the callbacks we use when we issue a PING - def _pongHandler(dict, node=node, table=self.table, callback=callback): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - sender = {'id' : dict['id']} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - table.insertNode(n) - if callback: - callback() - def _defaultPong(err, node=node, table=self.table, callback=callback): - table.nodeFailed(node) - if callback: - callback() - - df.addCallbacks(_pongHandler,_defaultPong) - - def findCloseNodes(self, callback=lambda a: None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback) - - def refreshTable(self, force=0): - """ - force=1 will refresh table regardless of last bucket access time - """ - def callback(nodes): - pass - - for bucket in self.table.buckets: - if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS): - id = newIDInRange(bucket.min, bucket.max) - self.findNode(id, callback) - - def stats(self): - """ - Returns (num_contacts, num_nodes) - num_contacts: number contacts in our routing table - num_nodes: number of nodes estimated in the entire dht - """ - num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) - num_nodes = const.K * (2**(len(self.table.buckets) - 1)) - return (num_contacts, num_nodes) - - def krpc_ping(self, id, _krpc_sender): - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"id" : self.node.id} - - def krpc_find_node(self, target, id, _krpc_sender): - nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"nodes" : nodes, "id" : self.node.id} - - -## This class provides read-only access to the DHT, valueForKey -## you probably want to use this mixin and provide your own write methods -class KhashmirRead(KhashmirBase): - _Node = KNodeRead - def retrieveValues(self, key): - c = self.store.cursor() - c.execute("select value from kv where key = %s;", sqlite.encode(key)) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - ## also async - def valueForKey(self, key, callback, searchlocal = 1): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - if searchlocal: - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, (l)) - else: - l = [] - - # create our search state - state = GetValue(self, key, callback) - reactor.callLater(0, state.goWithNodes, nodes, l) - - def krpc_find_value(self, key, id, _krpc_sender): - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - - l = self.retrieveValues(key) - if len(l) > 0: - return {'values' : l, "id": self.node.id} - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes, "id": self.node.id} - -### provides a generic write method, you probably don't want to deploy something that allows -### arbitrary value storage -class KhashmirWrite(KhashmirRead): - _Node = KNodeWrite - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not response: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler - action = StoreValue(self.table, key, value, response) - reactor.callLater(0, action.goWithNodes, nodes) - - # this call is asynch - self.findNode(key, _storeValueForKey) - - def krpc_store_value(self, key, value, id, _krpc_sender): - t = "%0.6f" % time() - c = self.store.cursor() - try: - c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) - except sqlite.IntegrityError, reason: - # update last insert time - c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) - self.store.commit() - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"id" : self.node.id} - -# the whole shebang, for testing -class Khashmir(KhashmirWrite): - _Node = KNodeWrite diff --git a/knet.py b/knet.py deleted file mode 100644 index eb9215c..0000000 --- a/knet.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# knet.py -# create a network of khashmir nodes -# usage: knet.py - -from random import randrange -import sys, os - -from twisted.internet import reactor - -from khashmir import Khashmir - -class Network: - def __init__(self, size=0, startport=5555, localip='127.0.0.1'): - self.num = size - self.startport = startport - self.localip = localip - - def _done(self, val): - self.done = 1 - - def setUp(self): - self.kfiles() - self.l = [] - for i in range(self.num): - self.l.append(Khashmir('', self.startport + i, '/tmp/kh%s.db' % (self.startport + i))) - reactor.iterate() - reactor.iterate() - - for i in self.l: - i.addContact(self.localip, self.l[randrange(0,self.num)].port) - i.addContact(self.localip, self.l[randrange(0,self.num)].port) - i.addContact(self.localip, self.l[randrange(0,self.num)].port) - reactor.iterate() - reactor.iterate() - reactor.iterate() - - for i in self.l: - self.done = 0 - i.findCloseNodes(self._done) - while not self.done: - reactor.iterate() - for i in self.l: - self.done = 0 - i.findCloseNodes(self._done) - while not self.done: - reactor.iterate() - - def tearDown(self): - for i in self.l: - i.listenport.stopListening() - self.kfiles() - - def kfiles(self): - for i in range(self.startport, self.startport+self.num): - try: - os.unlink('/tmp/kh%s.db' % i) - except: - pass - - reactor.iterate() - -if __name__ == "__main__": - n = Network(int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]) - n.setUp() - try: - reactor.run() - finally: - n.tearDown() diff --git a/knode.py b/knode.py deleted file mode 100644 index d2cea72..0000000 --- a/knode.py +++ /dev/null @@ -1,53 +0,0 @@ -## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from const import NULL_ID -from node import Node - -class KNodeBase(Node): - def checkSender(self, dict): - try: - senderid = dict['rsp']['id'] - except KeyError: - print ">>>> No peer id in response" - raise Exception, "No peer id in response." - else: - if self.id != NULL_ID and senderid != self.id: - print "Got response from different node than expected." - self.table.invalidateNode(self) - - return dict - - def errBack(self, err): - print ">>> ", err - return err - - def ping(self, id): - df = self.conn.sendRequest('ping', {"id":id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df - def findNode(self, target, id): - df = self.conn.sendRequest('find_node', {"target" : target, "id": id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df - -class KNodeRead(KNodeBase): - def findValue(self, key, id): - df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df - -class KNodeWrite(KNodeRead): - def storeValue(self, key, value, id): - df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df - def storeValues(self, key, value, id): - df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df diff --git a/krpc.py b/krpc.py deleted file mode 100644 index 8a60092..0000000 --- a/krpc.py +++ /dev/null @@ -1,159 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from bencode import bencode, bdecode -from time import asctime -import sys -from traceback import format_exception - -from twisted.internet.defer import Deferred -from twisted.internet import protocol -from twisted.internet import reactor - -KRPC_TIMEOUT = 20 - -KRPC_ERROR = 1 -KRPC_ERROR_METHOD_UNKNOWN = 2 -KRPC_ERROR_RECEIVED_UNKNOWN = 3 -KRPC_ERROR_TIMEOUT = 4 - -# commands -TID = 't' -REQ = 'q' -RSP = 'r' -TYP = 'y' -ARG = 'a' -ERR = 'e' - -class hostbroker(protocol.DatagramProtocol): - def __init__(self, server): - self.server = server - # this should be changed to storage that drops old entries - self.connections = {} - - def datagramReceived(self, datagram, addr): - #print `addr`, `datagram` - #if addr != self.addr: - c = self.connectionForAddr(addr) - c.datagramReceived(datagram, addr) - #if c.idle(): - # del self.connections[addr] - - def connectionForAddr(self, addr): - if addr == self.addr: - raise Exception - if not self.connections.has_key(addr): - conn = self.protocol(addr, self.server, self.transport) - self.connections[addr] = conn - else: - conn = self.connections[addr] - return conn - - def makeConnection(self, transport): - protocol.DatagramProtocol.makeConnection(self, transport) - tup = transport.getHost() - self.addr = (tup.host, tup.port) - -## connection -class KRPC: - noisy = 1 - def __init__(self, addr, server, transport): - self.transport = transport - self.factory = server - self.addr = addr - self.tids = {} - self.mtid = 0 - - def datagramReceived(self, str, addr): - # bdecode - try: - msg = bdecode(str) - except Exception, e: - if self.noisy: - print "response decode error: " + `e` - else: - #if self.noisy: - # print msg - # look at msg type - if msg[TYP] == REQ: - ilen = len(str) - # if request - # tell factory to handle - f = getattr(self.factory ,"krpc_" + msg[REQ], None) - msg[ARG]['_krpc_sender'] = self.addr - if f and callable(f): - try: - ret = apply(f, (), msg[ARG]) - except Exception, e: - ## send error - out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) - olen = len(out) - self.transport.write(out, addr) - else: - if ret: - # make response - out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) - else: - out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) - # send response - olen = len(out) - self.transport.write(out, addr) - - else: - if self.noisy: - print "don't know about method %s" % msg[REQ] - # unknown method - out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) - olen = len(out) - self.transport.write(out, addr) - if self.noisy: - print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, - ilen, msg[REQ], olen) - elif msg[TYP] == RSP: - # if response - # lookup tid - if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] - # callback - del(self.tids[msg[TID]]) - df.callback({'rsp' : msg[RSP], '_krpc_sender': addr}) - else: - print 'timeout ' + `msg[RSP]['id']` - # no tid, this transaction timed out already... - elif msg[TYP] == ERR: - # if error - # lookup tid - if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] - # callback - df.errback(msg[ERR]) - del(self.tids[msg[TID]]) - else: - # day late and dollar short - pass - else: - print "unknown message type " + `msg` - # unknown message type - df = self.tids[msg[TID]] - # callback - df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) - del(self.tids[msg[TID]]) - - def sendRequest(self, method, args): - # make message - # send it - msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} - self.mtid = (self.mtid + 1) % 256 - str = bencode(msg) - d = Deferred() - self.tids[msg[TID]] = d - def timeOut(tids = self.tids, id = msg[TID]): - if tids.has_key(id): - df = tids[id] - del(tids[id]) - print ">>>>>> KRPC_ERROR_TIMEOUT" - df.errback(KRPC_ERROR_TIMEOUT) - reactor.callLater(KRPC_TIMEOUT, timeOut) - self.transport.write(str, self.addr) - return d - diff --git a/ktable.py b/ktable.py deleted file mode 100644 index e0ff8df..0000000 --- a/ktable.py +++ /dev/null @@ -1,241 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from time import time -from bisect import bisect_left - -from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES -import khash -from node import Node - -class KTable: - """local routing table for a kademlia like distributed hash table""" - def __init__(self, node): - # this is the root node, a.k.a. US! - self.node = node - self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] - self.insertNode(node) - - def _bucketIndexForInt(self, num): - """the index of the bucket that should hold int""" - return bisect_left(self.buckets, num) - - def findNodes(self, id): - """ - return K nodes in our own local table closest to the ID. - """ - - if isinstance(id, str): - num = khash.intify(id) - elif isinstance(id, Node): - num = id.num - elif isinstance(id, int) or isinstance(id, long): - num = id - else: - raise TypeError, "findNodes requires an int, string, or Node" - - nodes = [] - i = self._bucketIndexForInt(num) - - # if this node is already in our table then return it - try: - index = self.buckets[i].l.index(num) - except ValueError: - pass - else: - return [self.buckets[i].l[index]] - - # don't have the node, get the K closest nodes - nodes = nodes + self.buckets[i].l - if len(nodes) < K: - # need more nodes - min = i - 1 - max = i + 1 - while len(nodes) < K and (min >= 0 or max < len(self.buckets)): - #ASw: note that this requires K be even - if min >= 0: - nodes = nodes + self.buckets[min].l - if max < len(self.buckets): - nodes = nodes + self.buckets[max].l - min = min - 1 - max = max + 1 - - nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) - return nodes[:K] - - def _splitBucket(self, a): - diff = (a.max - a.min) / 2 - b = KBucket([], a.max - diff, a.max) - self.buckets.insert(self.buckets.index(a.min) + 1, b) - a.max = a.max - diff - # transfer nodes to new bucket - for anode in a.l[:]: - if anode.num >= a.max: - a.l.remove(anode) - b.l.append(anode) - - def replaceStaleNode(self, stale, new): - """this is used by clients to replace a node returned by insertNode after - it fails to respond to a Pong message""" - i = self._bucketIndexForInt(stale.num) - try: - it = self.buckets[i].l.index(stale.num) - except ValueError: - return - - del(self.buckets[i].l[it]) - if new: - self.buckets[i].l.append(new) - - def insertNode(self, node, contacted=1): - """ - this insert the node, returning None if successful, returns the oldest node in the bucket if it's full - the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! - contacted means that yes, we contacted THEM and we know the node is reachable - """ - assert node.id != NULL_ID - if node.id == self.node.id: return - # get the bucket for this node - i = self. _bucketIndexForInt(node.num) - # check to see if node is in the bucket already - try: - it = self.buckets[i].l.index(node.num) - except ValueError: - # no - pass - else: - if contacted: - node.updateLastSeen() - # move node to end of bucket - xnode = self.buckets[i].l[it] - del(self.buckets[i].l[it]) - # note that we removed the original and replaced it with the new one - # utilizing this nodes new contact info - self.buckets[i].l.append(xnode) - self.buckets[i].touch() - return - - # we don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < K: - # no, append this node and return - if contacted: - node.updateLastSeen() - self.buckets[i].l.append(node) - self.buckets[i].touch() - return - - # bucket is full, check to see if self.node is in the bucket - if not (self.buckets[i].min <= self.node < self.buckets[i].max): - return self.buckets[i].l[0] - - # this bucket is full and contains our node, split the bucket - if len(self.buckets) >= HASH_LENGTH: - # our table is FULL, this is really unlikely - print "Hash Table is FULL! Increase K!" - return - - self._splitBucket(self.buckets[i]) - - # now that the bucket is split and balanced, try to insert the node again - return self.insertNode(node) - - def justSeenNode(self, id): - """call this any time you get a message from a node - it will update it in the table if it's there """ - try: - n = self.findNodes(id)[0] - except IndexError: - return None - else: - tstamp = n.lastSeen - n.updateLastSeen() - return tstamp - - def invalidateNode(self, n): - """ - forget about node n - use when you know that node is invalid - """ - self.replaceStaleNode(n, None) - - def nodeFailed(self, node): - """ call this when a node fails to respond to a message, to invalidate that node """ - try: - n = self.findNodes(node.num)[0] - except IndexError: - return None - else: - if n.msgFailed() >= MAX_FAILURES: - self.invalidateNode(n) - -class KBucket: - def __init__(self, contents, min, max): - self.l = contents - self.min = min - self.max = max - self.lastAccessed = time() - - def touch(self): - self.lastAccessed = time() - - def getNodeWithInt(self, num): - if num in self.l: return num - else: raise ValueError - - def __repr__(self): - return "" % (len(self.l), self.min, self.max) - - ## Comparators - # necessary for bisecting list of buckets with a hash expressed as an integer or a distance - # compares integer or node object with the bucket's range - def __lt__(self, a): - if isinstance(a, Node): a = a.num - return self.max <= a - def __le__(self, a): - if isinstance(a, Node): a = a.num - return self.min < a - def __gt__(self, a): - if isinstance(a, Node): a = a.num - return self.min > a - def __ge__(self, a): - if isinstance(a, Node): a = a.num - return self.max >= a - def __eq__(self, a): - if isinstance(a, Node): a = a.num - return self.min <= a and self.max > a - def __ne__(self, a): - if isinstance(a, Node): a = a.num - return self.min >= a or self.max < a - - -### UNIT TESTS ### -import unittest - -class TestKTable(unittest.TestCase): - def setUp(self): - self.a = Node().init(khash.newID(), 'localhost', 2002) - self.t = KTable(self.a) - - def testAddNode(self): - self.b = Node().init(khash.newID(), 'localhost', 2003) - self.t.insertNode(self.b) - self.assertEqual(len(self.t.buckets[0].l), 1) - self.assertEqual(self.t.buckets[0].l[0], self.b) - - def testRemove(self): - self.testAddNode() - self.t.invalidateNode(self.b) - self.assertEqual(len(self.t.buckets[0].l), 0) - - def testFail(self): - self.testAddNode() - for i in range(MAX_FAILURES - 1): - self.t.nodeFailed(self.b) - self.assertEqual(len(self.t.buckets[0].l), 1) - self.assertEqual(self.t.buckets[0].l[0], self.b) - - self.t.nodeFailed(self.b) - self.assertEqual(len(self.t.buckets[0].l), 0) - - -if __name__ == "__main__": - unittest.main() diff --git a/node.py b/node.py deleted file mode 100644 index 35dadc6..0000000 --- a/node.py +++ /dev/null @@ -1,82 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from time import time -from types import InstanceType - -import khash - -class Node: - """encapsulate contact info""" - def __init__(self): - self.fails = 0 - self.lastSeen = 0 - self.id = self.host = self.port = '' - - def init(self, id, host, port): - self.id = id - self.num = khash.intify(id) - self.host = host - self.port = port - self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host} - return self - - def initWithDict(self, dict): - self._senderDict = dict - self.id = dict['id'] - self.num = khash.intify(self.id) - self.port = dict['port'] - self.host = dict['host'] - return self - - def updateLastSeen(self): - self.lastSeen = time() - self.fails = 0 - - def msgFailed(self): - self.fails = self.fails + 1 - return self.fails - - def senderDict(self): - return self._senderDict - - def __repr__(self): - return `(self.id, self.host, self.port)` - - ## these comparators let us bisect/index a list full of nodes with either a node or an int/long - def __lt__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num < a - def __le__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num <= a - def __gt__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num > a - def __ge__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num >= a - def __eq__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num == a - def __ne__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num != a - - -import unittest - -class TestNode(unittest.TestCase): - def setUp(self): - self.node = Node().init(khash.newID(), 'localhost', 2002) - def testUpdateLastSeen(self): - t = self.node.lastSeen - self.node.updateLastSeen() - assert t < self.node.lastSeen - \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index bd949fd..0000000 --- a/test.py +++ /dev/null @@ -1,7 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -import unittest - -tests = unittest.defaultTestLoader.loadTestsFromNames(['khash', 'node', 'knode', 'actions', 'ktable', 'test_krpc']) -result = unittest.TextTestRunner().run(tests) diff --git a/test_khashmir.py b/test_khashmir.py deleted file mode 100644 index 24a8760..0000000 --- a/test_khashmir.py +++ /dev/null @@ -1,145 +0,0 @@ -from unittest import defaultTestLoader, TextTestRunner, TestCase -from sha import sha -from random import randrange -import os, sys - -from twisted.internet import reactor - -from khashmir import Khashmir -from khash import newID - -if __name__ =="__main__": - tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) - result = TextTestRunner().run(tests) - -class SimpleTests(TestCase): - def setUp(self): - self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test') - self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test') - - def tearDown(self): - self.a.listenport.stopListening() - self.b.listenport.stopListening() - os.unlink('/tmp/a.test') - os.unlink('/tmp/b.test') - reactor.iterate() - reactor.iterate() - - def addContacts(self): - self.a.addContact('127.0.0.1', 4045) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - - def testAddContact(self): - self.assertEqual(len(self.a.table.buckets), 1) - self.assertEqual(len(self.a.table.buckets[0].l), 0) - - self.assertEqual(len(self.b.table.buckets), 1) - self.assertEqual(len(self.b.table.buckets[0].l), 0) - - self.addContacts() - - self.assertEqual(len(self.a.table.buckets), 1) - self.assertEqual(len(self.a.table.buckets[0].l), 1) - self.assertEqual(len(self.b.table.buckets), 1) - self.assertEqual(len(self.b.table.buckets[0].l), 1) - - def testStoreRetrieve(self): - self.addContacts() - self.got = 0 - self.a.storeValueForKey(sha('foo').digest(), 'foobar') - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.a.valueForKey(sha('foo').digest(), self._cb) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - - def _cb(self, val): - if not val: - self.assertEqual(self.got, 1) - elif 'foobar' in val: - self.got = 1 - - -class MultiTest(TestCase): - num = 20 - def _done(self, val): - self.done = 1 - - def setUp(self): - self.l = [] - self.startport = 4088 - for i in range(self.num): - self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i))) - reactor.iterate() - reactor.iterate() - - for i in self.l: - i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) - i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) - i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) - reactor.iterate() - reactor.iterate() - reactor.iterate() - - for i in self.l: - self.done = 0 - i.findCloseNodes(self._done) - while not self.done: - reactor.iterate() - for i in self.l: - self.done = 0 - i.findCloseNodes(self._done) - while not self.done: - reactor.iterate() - - def tearDown(self): - for i in self.l: - i.listenport.stopListening() - for i in range(self.startport, self.startport+self.num): - os.unlink('/tmp/%s.test' % i) - - reactor.iterate() - - def testStoreRetrieve(self): - for i in range(10): - K = newID() - V = newID() - - for a in range(3): - self.done = 0 - def _scb(val): - self.done = 1 - self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) - while not self.done: - reactor.iterate() - - - def _rcb(val): - if not val: - self.done = 1 - self.assertEqual(self.got, 1) - elif V in val: - self.got = 1 - for x in range(3): - self.got = 0 - self.done = 0 - self.l[randrange(0, self.num)].valueForKey(K, _rcb) - while not self.done: - reactor.iterate() - - - - - diff --git a/test_krpc.py b/test_krpc.py deleted file mode 100644 index 3c41d07..0000000 --- a/test_krpc.py +++ /dev/null @@ -1,174 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from unittest import defaultTestLoader, TestCase, TextTestRunner -import sys - -from twisted.internet import protocol -from twisted.internet import reactor - -from krpc import KRPC, hostbroker, KRPC_ERROR_METHOD_UNKNOWN - -KRPC.noisy = 0 - -if __name__ =="__main__": - tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) - result = TextTestRunner().run(tests) - - -def connectionForAddr(host, port): - return host - - -class Receiver(protocol.Factory): - protocol = KRPC - def __init__(self): - self.buf = [] - def krpc_store(self, msg, _krpc_sender): - self.buf += [msg] - def krpc_echo(self, msg, _krpc_sender): - return msg - -def make(port): - af = Receiver() - a = hostbroker(af) - a.protocol = KRPC - p = reactor.listenUDP(port, a) - return af, a, p - -class KRPCTests(TestCase): - def setUp(self): - self.noisy = 0 - self.af, self.a, self.ap = make(1180) - self.bf, self.b, self.bp = make(1181) - - def tearDown(self): - self.ap.stopListening() - self.bp.stopListening() - reactor.iterate() - reactor.iterate() - - def testSimpleMessage(self): - self.noisy = 0 - self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.bf.buf, ["This is a test."]) - - def testMessageBlast(self): - self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.bf.buf, ["This is a test."]) - self.bf.buf = [] - - for i in range(100): - self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) - reactor.iterate() - #self.bf.buf = [] - self.assertEqual(self.bf.buf, ["This is a test."] * 100) - - def testEcho(self): - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is a test.") - - def gotMsg(self, dict): - _krpc_sender = dict['_krpc_sender'] - msg = dict['rsp'] - self.msg = msg - - def testManyEcho(self): - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is a test.") - for i in xrange(100): - self.msg = None - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is a test.") - - def testMultiEcho(self): - self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is a test.") - - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is another test.") - - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is yet another test.") - - def testEchoReset(self): - self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is a test.") - - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is another test.") - - del(self.a.connections[('127.0.0.1', 1181)]) - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) - df.addCallback(self.gotMsg) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.msg, "This is yet another test.") - - def testLotsofEchoReset(self): - for i in range(100): - self.testEchoReset() - - def testUnknownMeth(self): - self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) - df.addErrback(self.gotErr) - reactor.iterate() - reactor.iterate() - reactor.iterate() - reactor.iterate() - self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN) - - def gotErr(self, err): - self.err = err.value - \ No newline at end of file diff --git a/util.py b/util.py deleted file mode 100644 index 7808857..0000000 --- a/util.py +++ /dev/null @@ -1,23 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -def bucket_stats(l): - """given a list of khashmir instances, finds min, max, and average number of nodes in tables""" - max = avg = 0 - min = None - def count(buckets): - c = 0 - for bucket in buckets: - c = c + len(bucket.l) - return c - for node in l: - c = count(node.table.buckets) - if min == None: - min = c - elif c < min: - min = c - if c > max: - max = c - avg = avg + c - avg = avg / len(l) - return {'min':min, 'max':max, 'avg':avg}