From: Cameron Dale Date: Mon, 14 Jan 2008 06:49:16 +0000 (-0800) Subject: Use FilePath everywhere and create new CacheManager module. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=8297bc9a2aa8132ea1a7363761d9d5c73a1efca2 Use FilePath everywhere and create new CacheManager module. All cache related things moved from MirrorManager to the new CacheManager. --- diff --git a/apt_dht/AptPackages.py b/apt_dht/AptPackages.py index 1987f8c..11a1c78 100644 --- a/apt_dht/AptPackages.py +++ b/apt_dht/AptPackages.py @@ -10,6 +10,7 @@ from UserDict import DictMixin from twisted.internet import threads, defer from twisted.python import log +from twisted.python.filepath import FilePath from twisted.trial import unittest import apt_pkg, apt_inst @@ -30,15 +31,16 @@ class PackageFileList(DictMixin): def __init__(self, cache_dir): self.cache_dir = cache_dir - if not os.path.exists(self.cache_dir): - os.makedirs(self.cache_dir) + self.cache_dir.restat(False) + if not self.cache_dir.exists(): + self.cache_dir.makedirs() self.packages = None self.open() def open(self): """Open the persistent dictionary of files in this backend.""" if self.packages is None: - self.packages = shelve.open(self.cache_dir+'/packages.db') + self.packages = shelve.open(self.cache_dir.child('packages.db').path) def close(self): """Close the persistent dictionary.""" @@ -62,7 +64,8 @@ class PackageFileList(DictMixin): """Check all files in the database to make sure they exist.""" files = self.packages.keys() for f in files: - if not os.path.exists(self.packages[f]): + self.packages[f].restat(False) + if not self.packages[f].exists(): log.msg("File in packages database has been deleted: "+f) del self.packages[f] @@ -124,19 +127,16 @@ class AptPackages: self.apt_config = deepcopy(self.DEFAULT_APT_CONFIG) for dir in self.essential_dirs: - path = os.path.join(self.cache_dir, dir) - if not os.path.exists(path): - os.makedirs(path) + path = self.cache_dir.preauthChild(dir) + if not path.exists(): + path.makedirs() for file in self.essential_files: - path = os.path.join(self.cache_dir, file) - if not os.path.exists(path): - f = open(path,'w') - f.close() - del f + path = self.cache_dir.preauthChild(file) + if not path.exists(): + path.touch() - self.apt_config['Dir'] = self.cache_dir - self.apt_config['Dir::State::status'] = os.path.join(self.cache_dir, - self.apt_config['Dir::State'], self.apt_config['Dir::State::status']) + self.apt_config['Dir'] = self.cache_dir.path + self.apt_config['Dir::State::status'] = self.cache_dir.preauthChild(self.apt_config['Dir::State']).preauthChild(self.apt_config['Dir::State::status']).path self.packages = PackageFileList(cache_dir) self.loaded = 0 self.loading = None @@ -152,7 +152,7 @@ class AptPackages: self.indexrecords[cache_path] = {} read_packages = False - f = open(file_path, 'r') + f = file_path.open('r') for line in f: line = line.rstrip() @@ -204,13 +204,14 @@ class AptPackages: """Regenerates the fake configuration and load the packages cache.""" if self.loaded: return True apt_pkg.InitSystem() - rmtree(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'])) - os.makedirs(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'], 'partial')) - sources_filename = os.path.join(self.cache_dir, self.apt_config['Dir::Etc'], - self.apt_config['Dir::Etc::sourcelist']) - sources = open(sources_filename, 'w') + self.cache_dir.preauthChild(self.apt_config['Dir::State'] + ).preauthChild(self.apt_config['Dir::State::Lists']).remove() + self.cache_dir.preauthChild(self.apt_config['Dir::State'] + ).preauthChild(self.apt_config['Dir::State::Lists'] + ).child('partial').makedirs() + sources_file = self.cache_dir.preauthChild(self.apt_config['Dir::Etc'] + ).preauthChild(self.apt_config['Dir::Etc::sourcelist']) + sources = sources_file.open('w') sources_count = 0 deb_src_added = False self.packages.check_files() @@ -218,9 +219,9 @@ class AptPackages: for f in self.packages: # we should probably clear old entries from self.packages and # take into account the recorded mtime as optimization - filepath = self.packages[f] + file = self.packages[f] if f.split('/')[-1] == "Release": - self.addRelease(f, filepath) + self.addRelease(f, file) fake_uri='http://apt-dht'+f fake_dirname = '/'.join(fake_uri.split('/')[:-1]) if f.endswith('Sources'): @@ -228,26 +229,24 @@ class AptPackages: source_line='deb-src '+fake_dirname+'/ /' else: source_line='deb '+fake_dirname+'/ /' - listpath=(os.path.join(self.cache_dir, self.apt_config['Dir::State'], - self.apt_config['Dir::State::Lists'], - apt_pkg.URItoFileName(fake_uri))) + listpath = self.cache_dir.preauthChild(self.apt_config['Dir::State'] + ).preauthChild(self.apt_config['Dir::State::Lists'] + ).child(apt_pkg.URItoFileName(fake_uri)) sources.write(source_line+'\n') log.msg("Sources line: " + source_line) sources_count = sources_count + 1 - try: + if listpath.exists(): #we should empty the directory instead - os.unlink(listpath) - except: - pass - os.symlink(filepath, listpath) + listpath.remove() + os.symlink(file.path, listpath.path) sources.close() if sources_count == 0: - log.msg("No Packages files available for %s backend"%(self.cache_dir)) + log.msg("No Packages files available for %s backend"%(self.cache_dir.path)) return False - log.msg("Loading Packages database for "+self.cache_dir) + log.msg("Loading Packages database for "+self.cache_dir.path) for key, value in self.apt_config.items(): apt_pkg.Config[key] = value @@ -355,7 +354,7 @@ class TestAptPackages(unittest.TestCase): releaseFile = '' def setUp(self): - self.client = AptPackages('/tmp/.apt-dht') + self.client = AptPackages(FilePath('/tmp/.apt-dht')) self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n') self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Sources$" | tail -n 1').read().rstrip('\n') @@ -365,11 +364,11 @@ class TestAptPackages(unittest.TestCase): break self.client.file_updated(self.releaseFile[self.releaseFile.find('_dists_'):].replace('_','/'), - '/var/lib/apt/lists/' + self.releaseFile) + FilePath('/var/lib/apt/lists/' + self.releaseFile)) self.client.file_updated(self.packagesFile[self.packagesFile.find('_dists_'):].replace('_','/'), - '/var/lib/apt/lists/' + self.packagesFile) + FilePath('/var/lib/apt/lists/' + self.packagesFile)) self.client.file_updated(self.sourcesFile[self.sourcesFile.find('_dists_'):].replace('_','/'), - '/var/lib/apt/lists/' + self.sourcesFile) + FilePath('/var/lib/apt/lists/' + self.sourcesFile)) def test_pkg_hash(self): self.client._load() diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py new file mode 100644 index 0000000..3601619 --- /dev/null +++ b/apt_dht/CacheManager.py @@ -0,0 +1,241 @@ + +from bz2 import BZ2Decompressor +from zlib import decompressobj, MAX_WBITS +from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT +from urlparse import urlparse +import os + +from twisted.python import log +from twisted.python.filepath import FilePath +from twisted.internet import defer +from twisted.trial import unittest +from twisted.web2 import stream +from twisted.web2.http import splitHostPort + +from AptPackages import AptPackages + +aptpkg_dir='apt-packages' + +DECOMPRESS_EXTS = ['.gz', '.bz2'] +DECOMPRESS_FILES = ['release', 'sources', 'packages'] + +class ProxyFileStream(stream.SimpleStream): + """Saves a stream to a file while providing a new stream.""" + + def __init__(self, stream, outFile, hash, decompress = None, decFile = None): + """Initializes the proxy. + + @type stream: C{twisted.web2.stream.IByteStream} + @param stream: the input stream to read from + @type outFile: C{twisted.python.FilePath} + @param outFile: the file to write to + @type hash: L{Hash.HashObject} + @param hash: the hash object to use for the file + @type decompress: C{string} + @param decompress: also decompress the file as this type + (currently only '.gz' and '.bz2' are supported) + @type decFile: C{twisted.python.FilePath} + @param decFile: the file to write the decompressed data to + """ + self.stream = stream + self.outFile = outFile.open('w') + self.hash = hash + self.hash.new() + self.gzfile = None + self.bz2file = None + if decompress == ".gz": + self.gzheader = True + self.gzfile = decFile.open('w') + self.gzdec = decompressobj(-MAX_WBITS) + elif decompress == ".bz2": + self.bz2file = decFile.open('w') + self.bz2dec = BZ2Decompressor() + self.length = self.stream.length + self.start = 0 + self.doneDefer = defer.Deferred() + + def _done(self): + """Close the output file.""" + if not self.outFile.closed: + self.outFile.close() + self.hash.digest() + if self.gzfile: + data_dec = self.gzdec.flush() + self.gzfile.write(data_dec) + self.gzfile.close() + self.gzfile = None + if self.bz2file: + self.bz2file.close() + self.bz2file = None + + self.doneDefer.callback(self.hash) + + def read(self): + """Read some data from the stream.""" + if self.outFile.closed: + return None + + data = self.stream.read() + if isinstance(data, defer.Deferred): + data.addCallbacks(self._write, self._done) + return data + + self._write(data) + return data + + def _write(self, data): + """Write the stream data to the file and return it for others to use.""" + if data is None: + self._done() + return data + + self.outFile.write(data) + self.hash.update(data) + if self.gzfile: + if self.gzheader: + self.gzheader = False + new_data = self._remove_gzip_header(data) + dec_data = self.gzdec.decompress(new_data) + else: + dec_data = self.gzdec.decompress(data) + self.gzfile.write(dec_data) + if self.bz2file: + dec_data = self.bz2dec.decompress(data) + self.bz2file.write(dec_data) + return data + + def _remove_gzip_header(self, data): + if data[:2] != '\037\213': + raise IOError, 'Not a gzipped file' + if ord(data[2]) != 8: + raise IOError, 'Unknown compression method' + flag = ord(data[3]) + # modtime = self.fileobj.read(4) + # extraflag = self.fileobj.read(1) + # os = self.fileobj.read(1) + + skip = 10 + if flag & FEXTRA: + # Read & discard the extra field, if present + xlen = ord(data[10]) + xlen = xlen + 256*ord(data[11]) + skip = skip + 2 + xlen + if flag & FNAME: + # Read and discard a null-terminated string containing the filename + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FCOMMENT: + # Read and discard a null-terminated string containing a comment + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FHCRC: + skip += 2 # Read & discard the 16-bit header CRC + return data[skip:] + + def close(self): + """Clean everything up and return None to future reads.""" + self.length = 0 + self._done() + self.stream.close() + +class CacheManager: + """Manages all requests for cached objects.""" + + def __init__(self, cache_dir, db, manager = None): + self.cache_dir = cache_dir + self.db = db + self.manager = manager + + def save_file(self, response, hash, url): + """Save a downloaded file to the cache and stream it.""" + if response.code != 200: + log.msg('File was not found (%r): %s' % (response, url)) + return response + + log.msg('Returning file: %s' % url) + + parsed = urlparse(url) + destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2]) + log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path)) + + if destFile.exists(): + log.msg('File already exists, removing: %s' % destFile.path) + destFile.remove() + elif not destFile.parent().exists(): + destFile.parent().makedirs() + + root, ext = os.path.splitext(destFile.basename()) + if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS: + ext = ext.lower() + decFile = destFile.sibling(root) + log.msg('Decompressing to: %s' % decFile.path) + if decFile.exists(): + log.msg('File already exists, removing: %s' % decFile.path) + decFile.remove() + else: + ext = None + decFile = None + + orig_stream = response.stream + response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile) + response.stream.doneDefer.addCallback(self._save_complete, url, destFile, + response.headers.getHeader('Last-Modified'), + ext, decFile) + response.stream.doneDefer.addErrback(self.save_error, url) + return response + + def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None): + """Update the modification time and AptPackages.""" + if modtime: + os.utime(destFile.path, (modtime, modtime)) + if ext: + os.utime(decFile.path, (modtime, modtime)) + + result = hash.verify() + if result or result is None: + if result: + log.msg('Hashes match: %s' % url) + else: + log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) + + urlpath, newdir = self.db.storeFile(destFile, hash.digest(), self.cache_dir) + log.msg('now avaliable at %s: %s' % (urlpath, url)) + + if self.manager: + self.manager.new_cached_file(url, destFile, hash, urlpath) + if ext: + self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath) + else: + log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) + destFile.remove() + if ext: + decFile.remove() + + def save_error(self, failure, url): + """An error has occurred in downloadign or saving the file.""" + log.msg('Error occurred downloading %s' % url) + log.err(failure) + return failure + +class TestMirrorManager(unittest.TestCase): + """Unit tests for the mirror manager.""" + + timeout = 20 + pending_calls = [] + client = None + + def setUp(self): + self.client = CacheManager(FilePath('/tmp/.apt-dht')) + + def tearDown(self): + for p in self.pending_calls: + if p.active(): + p.cancel() + self.client = None + \ No newline at end of file diff --git a/apt_dht/HTTPServer.py b/apt_dht/HTTPServer.py index 7e2ac68..0e53c23 100644 --- a/apt_dht/HTTPServer.py +++ b/apt_dht/HTTPServer.py @@ -1,4 +1,3 @@ -import os.path, time from twisted.python import log from twisted.internet import defer @@ -65,14 +64,14 @@ class TopLevel(resource.Resource): name = segments[0] if name in self.subdirs: log.msg('Sharing %s with %s' % (request.uri, request.remoteAddr)) - return static.File(self.subdirs[name]), segments[1:] + return static.File(self.subdirs[name].path), segments[1:] if request.remoteAddr.host != "127.0.0.1": log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr)) return None, () if len(name) > 1: - return FileDownloader(self.directory, self.manager), segments[0:] + return FileDownloader(self.directory.path, self.manager), segments[0:] else: return self, () diff --git a/apt_dht/MirrorManager.py b/apt_dht/MirrorManager.py index c41dbe2..84065c9 100644 --- a/apt_dht/MirrorManager.py +++ b/apt_dht/MirrorManager.py @@ -1,158 +1,25 @@ -from bz2 import BZ2Decompressor -from zlib import decompressobj, MAX_WBITS -from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT from urlparse import urlparse import os -from twisted.python import log, filepath +from twisted.python import log +from twisted.python.filepath import FilePath from twisted.internet import defer from twisted.trial import unittest -from twisted.web2 import stream from twisted.web2.http import splitHostPort from AptPackages import AptPackages -aptpkg_dir='.apt-dht' - -DECOMPRESS_EXTS = ['.gz', '.bz2'] -DECOMPRESS_FILES = ['release', 'sources', 'packages'] +aptpkg_dir='apt-packages' class MirrorError(Exception): """Exception raised when there's a problem with the mirror.""" -class ProxyFileStream(stream.SimpleStream): - """Saves a stream to a file while providing a new stream.""" - - def __init__(self, stream, outFile, hash, decompress = None, decFile = None): - """Initializes the proxy. - - @type stream: C{twisted.web2.stream.IByteStream} - @param stream: the input stream to read from - @type outFile: C{twisted.python.filepath.FilePath} - @param outFile: the file to write to - @type hash: L{Hash.HashObject} - @param hash: the hash object to use for the file - @type decompress: C{string} - @param decompress: also decompress the file as this type - (currently only '.gz' and '.bz2' are supported) - @type decFile: C{twisted.python.filepath.FilePath} - @param decFile: the file to write the decompressed data to - """ - self.stream = stream - self.outFile = outFile.open('w') - self.hash = hash - self.hash.new() - self.gzfile = None - self.bz2file = None - if decompress == ".gz": - self.gzheader = True - self.gzfile = decFile.open('w') - self.gzdec = decompressobj(-MAX_WBITS) - elif decompress == ".bz2": - self.bz2file = decFile.open('w') - self.bz2dec = BZ2Decompressor() - self.length = self.stream.length - self.start = 0 - self.doneDefer = defer.Deferred() - - def _done(self): - """Close the output file.""" - if not self.outFile.closed: - self.outFile.close() - self.hash.digest() - if self.gzfile: - data_dec = self.gzdec.flush() - self.gzfile.write(data_dec) - self.gzfile.close() - self.gzfile = None - if self.bz2file: - self.bz2file.close() - self.bz2file = None - - self.doneDefer.callback(self.hash) - - def read(self): - """Read some data from the stream.""" - if self.outFile.closed: - return None - - data = self.stream.read() - if isinstance(data, defer.Deferred): - data.addCallbacks(self._write, self._done) - return data - - self._write(data) - return data - - def _write(self, data): - """Write the stream data to the file and return it for others to use.""" - if data is None: - self._done() - return data - - self.outFile.write(data) - self.hash.update(data) - if self.gzfile: - if self.gzheader: - self.gzheader = False - new_data = self._remove_gzip_header(data) - dec_data = self.gzdec.decompress(new_data) - else: - dec_data = self.gzdec.decompress(data) - self.gzfile.write(dec_data) - if self.bz2file: - dec_data = self.bz2dec.decompress(data) - self.bz2file.write(dec_data) - return data - - def _remove_gzip_header(self, data): - if data[:2] != '\037\213': - raise IOError, 'Not a gzipped file' - if ord(data[2]) != 8: - raise IOError, 'Unknown compression method' - flag = ord(data[3]) - # modtime = self.fileobj.read(4) - # extraflag = self.fileobj.read(1) - # os = self.fileobj.read(1) - - skip = 10 - if flag & FEXTRA: - # Read & discard the extra field, if present - xlen = ord(data[10]) - xlen = xlen + 256*ord(data[11]) - skip = skip + 2 + xlen - if flag & FNAME: - # Read and discard a null-terminated string containing the filename - while True: - if not data[skip] or data[skip] == '\000': - break - skip += 1 - skip += 1 - if flag & FCOMMENT: - # Read and discard a null-terminated string containing a comment - while True: - if not data[skip] or data[skip] == '\000': - break - skip += 1 - skip += 1 - if flag & FHCRC: - skip += 2 # Read & discard the 16-bit header CRC - return data[skip:] - - def close(self): - """Clean everything up and return None to future reads.""" - self.length = 0 - self._done() - self.stream.close() - class MirrorManager: """Manages all requests for mirror objects.""" - def __init__(self, cache_dir, manager = None): - self.manager = manager + def __init__(self, cache_dir): self.cache_dir = cache_dir - self.cache = filepath.FilePath(self.cache_dir) self.apt_caches = {} def extractPath(self, url): @@ -190,7 +57,8 @@ class MirrorManager: self.apt_caches[site] = {} if baseDir not in self.apt_caches[site]: - site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_')) + site_cache = self.cache_dir.child(aptpkg_dir).child('mirrors').child(site + baseDir.replace('/', '_')) + site_cache.makedirs self.apt_caches[site][baseDir] = AptPackages(site_cache) def updatedFile(self, url, file_path): @@ -206,73 +74,6 @@ class MirrorManager: d.errback(MirrorError("Site Not Found")) return d - def save_file(self, response, hash, url): - """Save a downloaded file to the cache and stream it.""" - if response.code != 200: - log.msg('File was not found (%r): %s' % (response, url)) - return response - - log.msg('Returning file: %s' % url) - - parsed = urlparse(url) - destFile = self.cache.preauthChild(parsed[1] + parsed[2]) - log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path)) - - if destFile.exists(): - log.msg('File already exists, removing: %s' % destFile.path) - destFile.remove() - else: - destFile.parent().makedirs() - - root, ext = os.path.splitext(destFile.basename()) - if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS: - ext = ext.lower() - decFile = destFile.sibling(root) - log.msg('Decompressing to: %s' % decFile.path) - if decFile.exists(): - log.msg('File already exists, removing: %s' % decFile.path) - decFile.remove() - else: - ext = None - decFile = None - - orig_stream = response.stream - response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile) - response.stream.doneDefer.addCallback(self.save_complete, url, destFile, - response.headers.getHeader('Last-Modified'), - ext, decFile) - response.stream.doneDefer.addErrback(self.save_error, url) - return response - - def save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None): - """Update the modification time and AptPackages.""" - if modtime: - os.utime(destFile.path, (modtime, modtime)) - if ext: - os.utime(decFile.path, (modtime, modtime)) - - result = hash.verify() - if result or result is None: - if result: - log.msg('Hashes match: %s' % url) - else: - log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) - - self.updatedFile(url, destFile.path) - if ext: - self.updatedFile(url[:-len(ext)], decFile.path) - - if self.manager: - self.manager.cached_file(hash, url, destFile.path) - else: - log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) - - def save_error(self, failure, url): - """An error has occurred in downloadign or saving the file.""" - log.msg('Error occurred downloading %s' % url) - log.err(failure) - return failure - class TestMirrorManager(unittest.TestCase): """Unit tests for the mirror manager.""" @@ -281,7 +82,7 @@ class TestMirrorManager(unittest.TestCase): client = None def setUp(self): - self.client = MirrorManager('/tmp') + self.client = MirrorManager(FilePath('/tmp/.apt-dht')) def test_extractPath(self): site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release') @@ -312,13 +113,13 @@ class TestMirrorManager(unittest.TestCase): break self.client.updatedFile('http://' + self.releaseFile.replace('_','/'), - '/var/lib/apt/lists/' + self.releaseFile) + FilePath('/var/lib/apt/lists/' + self.releaseFile)) self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.packagesFile) + FilePath('/var/lib/apt/lists/' + self.packagesFile)) self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'), - '/var/lib/apt/lists/' + self.sourcesFile) + FilePath('/var/lib/apt/lists/' + self.sourcesFile)) lastDefer = defer.Deferred() diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index f25e234..3679685 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -6,26 +6,34 @@ import os, re from twisted.internet import defer from twisted.web2 import server, http, http_headers from twisted.python import log +from twisted.python.filepath import FilePath from apt_dht_conf import config from PeerManager import PeerManager from HTTPServer import TopLevel from MirrorManager import MirrorManager +from CacheManager import CacheManager from Hash import HashObject from db import DB from util import findMyIPAddr +download_dir = 'cache' + class AptDHT: def __init__(self, dht): log.msg('Initializing the main apt_dht application') - self.db = DB(config.get('DEFAULT', 'cache_dir') + '/.apt-dht.db') + self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir')) + if not self.cache_dir.child(download_dir).exists(): + self.cache_dir.child(download_dir).makedirs() + self.db = DB(self.cache_dir.child('apt-dht.db')) self.dht = dht self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) - self.http_server = TopLevel(config.get('DEFAULT', 'cache_dir'), self) + self.http_server = TopLevel(self.cache_dir.child(download_dir), self) self.http_site = server.Site(self.http_server) self.peers = PeerManager() - self.mirrors = MirrorManager(config.get('DEFAULT', 'cache_dir'), self) + self.mirrors = MirrorManager(self.cache_dir) + self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_addr = None def getSite(self): @@ -39,6 +47,7 @@ class AptDHT: def joinError(self, failure): log.msg("joining DHT failed miserably") log.err(failure) + raise RuntimeError, "IP address for this machine could not be found" def check_freshness(self, path, modtime, resp): log.msg('Checking if %s is still fresh' % path) @@ -84,16 +93,16 @@ class AptDHT: if not locations: log.msg('Peers for %s were not found' % path) getDefer = self.peers.get([path]) - getDefer.addCallback(self.mirrors.save_file, hash, path) - getDefer.addErrback(self.mirrors.save_error, path) + getDefer.addCallback(self.cache.save_file, hash, path) + getDefer.addErrback(self.cache.save_error, path) getDefer.addCallbacks(d.callback, d.errback) else: log.msg('Found peers for %s: %r' % (path, locations)) # Download from the found peers getDefer = self.peers.get(locations) getDefer.addCallback(self.check_response, hash, path) - getDefer.addCallback(self.mirrors.save_file, hash, path) - getDefer.addErrback(self.mirrors.save_error, path) + getDefer.addCallback(self.cache.save_file, hash, path) + getDefer.addErrback(self.cache.save_error, path) getDefer.addCallbacks(d.callback, d.errback) def check_response(self, response, hash, path): @@ -103,12 +112,10 @@ class AptDHT: return getDefer return response - def cached_file(self, hash, url, file_path): - assert file_path.startswith(config.get('DEFAULT', 'cache_dir')) - urlpath, newdir = self.db.storeFile(file_path, hash.digest(), config.get('DEFAULT', 'cache_dir')) - log.msg('now avaliable at %s: %s' % (urlpath, url)) - - if self.my_addr: + def new_cached_file(self, url, file_path, hash, urlpath): + self.mirrors.updatedFile(url, file_path) + + if self.my_addr and hash: site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT')) full_path = urlunparse(('http', site, urlpath, None, None, None)) key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) diff --git a/apt_dht/db.py b/apt_dht/db.py index 9725aa8..1d2e342 100644 --- a/apt_dht/db.py +++ b/apt_dht/db.py @@ -5,6 +5,7 @@ from binascii import a2b_base64, b2a_base64 from time import sleep import os +from twisted.python.filepath import FilePath from twisted.trial import unittest assert sqlite.version_info >= (2, 1) @@ -25,24 +26,25 @@ class DB: def __init__(self, db): self.db = db - try: - os.stat(db) - except OSError: - self._createNewDB(db) + self.db.restat(False) + if self.db.exists(): + self._loadDB() else: - self._loadDB(db) + self._createNewDB() self.conn.text_factory = str self.conn.row_factory = sqlite.Row - def _loadDB(self, db): + def _loadDB(self): try: - self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) + self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) except: import traceback raise DBExcept, "Couldn't open DB", traceback.format_exc() - def _createNewDB(self, db): - self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) + def _createNewDB(self): + if not self.db.parent().exists(): + self.db.parent().makedirs() + self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") c.execute("CREATE INDEX files_urldir ON files(urldir)") @@ -52,49 +54,43 @@ class DB: c.close() self.conn.commit() - def _removeChanged(self, path, row): + def _removeChanged(self, file, row): res = None if row: - try: - stat = os.stat(path) - except: - stat = None - if stat: - res = (row['size'] == stat.st_size and row['mtime'] == stat.st_mtime) + file.restat(False) + if file.exists(): + res = (row['size'] == file.getsize() and row['mtime'] == file.getmtime()) if not res: c = self.conn.cursor() - c.execute("DELETE FROM files WHERE path = ?", (path, )) + c.execute("DELETE FROM files WHERE path = ?", (file.path, )) self.conn.commit() c.close() return res - def storeFile(self, path, hash, directory): + def storeFile(self, file, hash, directory): """Store or update a file in the database. @return: the urlpath to access the file, and whether a new url top-level directory was needed """ - path = os.path.abspath(path) - directory = os.path.abspath(directory) - assert path.startswith(directory) - stat = os.stat(path) + file.restat() c = self.conn.cursor() - c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (path, )) + c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, )) row = c.fetchone() if row and directory == row['directory']: c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", - (khash(hash), stat.st_size, stat.st_mtime, datetime.now())) + (khash(hash), file.getsize(), file.getmtime(), datetime.now())) newdir = False urldir = row['urldir'] else: urldir, newdir = self.findDirectory(directory) c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)", - (path, khash(hash), urldir, len(directory), stat.st_size, stat.st_mtime, datetime.now())) + (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now())) self.conn.commit() c.close() - return '/~' + str(urldir) + path[len(directory):], newdir + return '/~' + str(urldir) + file.path[len(directory.path):], newdir - def getFile(self, path): + def getFile(self, file): """Get a file from the database. If it has changed or is missing, it is removed from the database. @@ -102,45 +98,43 @@ class DB: @return: dictionary of info for the file, False if changed, or None if not in database or missing """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(path, row) + res = self._removeChanged(file, row) if res: res = {} res['hash'] = row['hash'] - res['urlpath'] = '/~' + str(row['urldir']) + path[row['dirlength']:] + res['size'] = row['size'] + res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] c.close() return res - def isUnchanged(self, path): + def isUnchanged(self, file): """Check if a file in the file system has changed. If it has changed, it is removed from the table. @return: True if unchanged, False if changed, None if not in database """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - return self._removeChanged(path, row) + return self._removeChanged(file, row) - def refreshFile(self, path): + def refreshFile(self, file): """Refresh the publishing time of a file. If it has changed or is missing, it is removed from the table. @return: True if unchanged, False if changed, None if not in database """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(path, row) + res = self._removeChanged(file, row) if res: - c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), path)) + c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path)) return res def expiredFiles(self, expireAfter): @@ -156,7 +150,7 @@ class DB: row = c.fetchone() expired = {} while row: - res = self._removeChanged(row['path'], row) + res = self._removeChanged(FilePath(row['path']), row) if res: expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:]) row = c.fetchone() @@ -174,7 +168,7 @@ class DB: newdirs = [] sql = "WHERE" for dir in dirs: - newdirs.append(os.path.abspath(dir) + os.sep + '*') + newdirs.append(dir.child('*').path) sql += " path NOT GLOB ? AND" sql = sql[:-4] @@ -183,7 +177,7 @@ class DB: row = c.fetchone() removed = [] while row: - removed.append(row['path']) + removed.append(FilePath(row['path'])) row = c.fetchone() if removed: @@ -196,9 +190,8 @@ class DB: @return: the index of the url directory, and whether it is new or not """ - directory = os.path.abspath(directory) c = self.conn.cursor() - c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory, )) + c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, )) row = c.fetchone() c.close() if row['urldir']: @@ -206,7 +199,7 @@ class DB: # Not found, need to add a new one c = self.conn.cursor() - c.execute("INSERT INTO dirs (path) VALUES (?)", (directory, )) + c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, )) self.conn.commit() urldir = c.lastrowid c.close() @@ -219,7 +212,7 @@ class DB: row = c.fetchone() dirs = {} while row: - dirs['~' + str(row['urldir'])] = row['path'] + dirs['~' + str(row['urldir'])] = FilePath(row['path']) row = c.fetchone() c.close() return dirs @@ -238,23 +231,34 @@ class TestDB(unittest.TestCase): """Tests for the khashmir database.""" timeout = 5 - db = '/tmp/khashmir.db' - path = '/tmp/khashmir.test' + db = FilePath('/tmp/khashmir.db') + file = FilePath('/tmp/apt-dht/khashmir.test') hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!' - directory = '/tmp/' + directory = FilePath('/tmp/apt-dht/') urlpath = '/~1/khashmir.test' - dirs = ['/tmp/apt-dht/top1', '/tmp/apt-dht/top2/sub1', '/tmp/apt-dht/top2/sub2/'] + testfile = 'tmp/khashmir.test' + dirs = [FilePath('/tmp/apt-dht/top1'), + FilePath('/tmp/apt-dht/top2/sub1'), + FilePath('/tmp/apt-dht/top2/sub2/')] def setUp(self): - f = open(self.path, 'w') - f.write('fgfhds') - f.close() - os.utime(self.path, None) + if not self.file.parent().exists(): + self.file.parent().makedirs() + self.file.setContent('fgfhds') + self.file.touch() self.store = DB(self.db) - self.store.storeFile(self.path, self.hash, self.directory) + self.store.storeFile(self.file, self.hash, self.directory) + + def test_openExistsingDB(self): + self.store.close() + self.store = None + sleep(1) + self.store = DB(self.db) + res = self.store.isUnchanged(self.file) + self.failUnless(res) def test_getFile(self): - res = self.store.getFile(self.path) + res = self.store.getFile(self.file) self.failUnless(res) self.failUnlessEqual(res['hash'], self.hash) self.failUnlessEqual(res['urlpath'], self.urlpath) @@ -264,17 +268,17 @@ class TestDB(unittest.TestCase): self.failUnless(res) self.failUnlessEqual(len(res.keys()), 1) self.failUnlessEqual(res.keys()[0], '~1') - self.failUnlessEqual(res['~1'], os.path.abspath(self.directory)) + self.failUnlessEqual(res['~1'], self.directory) def test_isUnchanged(self): - res = self.store.isUnchanged(self.path) + res = self.store.isUnchanged(self.file) self.failUnless(res) sleep(2) - os.utime(self.path, None) - res = self.store.isUnchanged(self.path) + self.file.touch() + res = self.store.isUnchanged(self.file) self.failUnless(res == False) - os.unlink(self.path) - res = self.store.isUnchanged(self.path) + self.file.remove() + res = self.store.isUnchanged(self.file) self.failUnless(res == None) def test_expiry(self): @@ -286,35 +290,34 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(res.keys()[0], self.hash) self.failUnlessEqual(len(res[self.hash]), 1) self.failUnlessEqual(res[self.hash][0], self.urlpath) - res = self.store.refreshFile(self.path) + res = self.store.refreshFile(self.file) self.failUnless(res) res = self.store.expiredFiles(1) self.failUnlessEqual(len(res.keys()), 0) def build_dirs(self): for dir in self.dirs: - path = os.path.join(dir, self.path[1:]) - os.makedirs(os.path.dirname(path)) - f = open(path, 'w') - f.write(path) - f.close() - os.utime(path, None) - self.store.storeFile(path, self.hash, dir) + file = dir.preauthChild(self.testfile) + if not file.parent().exists(): + file.parent().makedirs() + file.setContent(file.path) + file.touch() + self.store.storeFile(file, self.hash, dir) def test_removeUntracked(self): self.build_dirs() res = self.store.removeUntrackedFiles(self.dirs) self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res) - self.failUnlessEqual(res[0], self.path, 'Got removed paths: %r' % res) + self.failUnlessEqual(res[0], self.file, 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs) self.failUnlessEqual(len(res), 0, 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs[1:]) self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res) - self.failUnlessEqual(res[0], os.path.join(self.dirs[0], self.path[1:]), 'Got removed paths: %r' % res) + self.failUnlessEqual(res[0], self.dirs[0].preauthChild(self.testfile), 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs[:1]) self.failUnlessEqual(len(res), 2, 'Got removed paths: %r' % res) - self.failUnlessIn(os.path.join(self.dirs[1], self.path[1:]), res, 'Got removed paths: %r' % res) - self.failUnlessIn(os.path.join(self.dirs[2], self.path[1:]), res, 'Got removed paths: %r' % res) + self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) + self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) def test_reconcileDirectories(self): self.build_dirs() @@ -338,17 +341,13 @@ class TestDB(unittest.TestCase): res = self.store.getAllDirectories() self.failUnless(res) self.failUnlessEqual(len(res.keys()), 1) - res = self.store.removeUntrackedFiles(['/what']) + res = self.store.removeUntrackedFiles([FilePath('/what')]) res = self.store.reconcileDirectories() self.failUnlessEqual(res, True) res = self.store.getAllDirectories() self.failUnlessEqual(len(res.keys()), 0) def tearDown(self): - for root, dirs, files in os.walk('/tmp/apt-dht', topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) + self.directory.remove() self.store.close() - os.unlink(self.db) + self.db.remove() diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index 235c8d0..d16f598 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -11,6 +11,8 @@ from zope.interface import implements from apt_dht.interfaces import IDHT from khashmir import Khashmir +khashmir_dir = 'apt-dht-Khashmir' + class DHTError(Exception): """Represents errors that occur in the DHT.""" @@ -36,7 +38,9 @@ class DHT: self.config_parser = config self.section = section self.config = {} - self.cache_dir = self.config_parser.get(section, 'cache_dir') + self.cache_dir = os.path.join(self.config_parser.get(section, 'cache_dir'), khashmir_dir) + if not os.path.exists(self.cache_dir): + os.makedirs(self.cache_dir) self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP') self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE') for k in self.config_parser.options(section): @@ -182,7 +186,7 @@ class TestSimpleDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, 'SPEW': True, } + 'KE_AGE': 3600, 'SPEW': False, } def setUp(self): self.a = DHT() diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index 5d14c7b..ae11dd7 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -30,7 +30,7 @@ class KhashmirBase(protocol.Factory): def setup(self, config, cache_dir): self.config = config self.port = config['PORT'] - self.store = DB(os.path.join(cache_dir, '.khashmir.' + str(self.port) + '.db')) + self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) #self.app = service.Application("krpc")