from twisted.internet import threads, defer
from twisted.python import log
+from twisted.python.filepath import FilePath
from twisted.trial import unittest
import apt_pkg, apt_inst
def __init__(self, cache_dir):
self.cache_dir = cache_dir
- if not os.path.exists(self.cache_dir):
- os.makedirs(self.cache_dir)
+ self.cache_dir.restat(False)
+ if not self.cache_dir.exists():
+ self.cache_dir.makedirs()
self.packages = None
self.open()
def open(self):
"""Open the persistent dictionary of files in this backend."""
if self.packages is None:
- self.packages = shelve.open(self.cache_dir+'/packages.db')
+ self.packages = shelve.open(self.cache_dir.child('packages.db').path)
def close(self):
"""Close the persistent dictionary."""
"""Check all files in the database to make sure they exist."""
files = self.packages.keys()
for f in files:
- if not os.path.exists(self.packages[f]):
+ self.packages[f].restat(False)
+ if not self.packages[f].exists():
log.msg("File in packages database has been deleted: "+f)
del self.packages[f]
self.apt_config = deepcopy(self.DEFAULT_APT_CONFIG)
for dir in self.essential_dirs:
- path = os.path.join(self.cache_dir, dir)
- if not os.path.exists(path):
- os.makedirs(path)
+ path = self.cache_dir.preauthChild(dir)
+ if not path.exists():
+ path.makedirs()
for file in self.essential_files:
- path = os.path.join(self.cache_dir, file)
- if not os.path.exists(path):
- f = open(path,'w')
- f.close()
- del f
+ path = self.cache_dir.preauthChild(file)
+ if not path.exists():
+ path.touch()
- self.apt_config['Dir'] = self.cache_dir
- self.apt_config['Dir::State::status'] = os.path.join(self.cache_dir,
- self.apt_config['Dir::State'], self.apt_config['Dir::State::status'])
+ self.apt_config['Dir'] = self.cache_dir.path
+ self.apt_config['Dir::State::status'] = self.cache_dir.preauthChild(self.apt_config['Dir::State']).preauthChild(self.apt_config['Dir::State::status']).path
self.packages = PackageFileList(cache_dir)
self.loaded = 0
self.loading = None
self.indexrecords[cache_path] = {}
read_packages = False
- f = open(file_path, 'r')
+ f = file_path.open('r')
for line in f:
line = line.rstrip()
"""Regenerates the fake configuration and load the packages cache."""
if self.loaded: return True
apt_pkg.InitSystem()
- rmtree(os.path.join(self.cache_dir, self.apt_config['Dir::State'],
- self.apt_config['Dir::State::Lists']))
- os.makedirs(os.path.join(self.cache_dir, self.apt_config['Dir::State'],
- self.apt_config['Dir::State::Lists'], 'partial'))
- sources_filename = os.path.join(self.cache_dir, self.apt_config['Dir::Etc'],
- self.apt_config['Dir::Etc::sourcelist'])
- sources = open(sources_filename, 'w')
+ self.cache_dir.preauthChild(self.apt_config['Dir::State']
+ ).preauthChild(self.apt_config['Dir::State::Lists']).remove()
+ self.cache_dir.preauthChild(self.apt_config['Dir::State']
+ ).preauthChild(self.apt_config['Dir::State::Lists']
+ ).child('partial').makedirs()
+ sources_file = self.cache_dir.preauthChild(self.apt_config['Dir::Etc']
+ ).preauthChild(self.apt_config['Dir::Etc::sourcelist'])
+ sources = sources_file.open('w')
sources_count = 0
deb_src_added = False
self.packages.check_files()
for f in self.packages:
# we should probably clear old entries from self.packages and
# take into account the recorded mtime as optimization
- filepath = self.packages[f]
+ file = self.packages[f]
if f.split('/')[-1] == "Release":
- self.addRelease(f, filepath)
+ self.addRelease(f, file)
fake_uri='http://apt-dht'+f
fake_dirname = '/'.join(fake_uri.split('/')[:-1])
if f.endswith('Sources'):
source_line='deb-src '+fake_dirname+'/ /'
else:
source_line='deb '+fake_dirname+'/ /'
- listpath=(os.path.join(self.cache_dir, self.apt_config['Dir::State'],
- self.apt_config['Dir::State::Lists'],
- apt_pkg.URItoFileName(fake_uri)))
+ listpath = self.cache_dir.preauthChild(self.apt_config['Dir::State']
+ ).preauthChild(self.apt_config['Dir::State::Lists']
+ ).child(apt_pkg.URItoFileName(fake_uri))
sources.write(source_line+'\n')
log.msg("Sources line: " + source_line)
sources_count = sources_count + 1
- try:
+ if listpath.exists():
#we should empty the directory instead
- os.unlink(listpath)
- except:
- pass
- os.symlink(filepath, listpath)
+ listpath.remove()
+ os.symlink(file.path, listpath.path)
sources.close()
if sources_count == 0:
- log.msg("No Packages files available for %s backend"%(self.cache_dir))
+ log.msg("No Packages files available for %s backend"%(self.cache_dir.path))
return False
- log.msg("Loading Packages database for "+self.cache_dir)
+ log.msg("Loading Packages database for "+self.cache_dir.path)
for key, value in self.apt_config.items():
apt_pkg.Config[key] = value
releaseFile = ''
def setUp(self):
- self.client = AptPackages('/tmp/.apt-dht')
+ self.client = AptPackages(FilePath('/tmp/.apt-dht'))
self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n')
self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Sources$" | tail -n 1').read().rstrip('\n')
break
self.client.file_updated(self.releaseFile[self.releaseFile.find('_dists_'):].replace('_','/'),
- '/var/lib/apt/lists/' + self.releaseFile)
+ FilePath('/var/lib/apt/lists/' + self.releaseFile))
self.client.file_updated(self.packagesFile[self.packagesFile.find('_dists_'):].replace('_','/'),
- '/var/lib/apt/lists/' + self.packagesFile)
+ FilePath('/var/lib/apt/lists/' + self.packagesFile))
self.client.file_updated(self.sourcesFile[self.sourcesFile.find('_dists_'):].replace('_','/'),
- '/var/lib/apt/lists/' + self.sourcesFile)
+ FilePath('/var/lib/apt/lists/' + self.sourcesFile))
def test_pkg_hash(self):
self.client._load()
--- /dev/null
+
+from bz2 import BZ2Decompressor
+from zlib import decompressobj, MAX_WBITS
+from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
+from urlparse import urlparse
+import os
+
+from twisted.python import log
+from twisted.python.filepath import FilePath
+from twisted.internet import defer
+from twisted.trial import unittest
+from twisted.web2 import stream
+from twisted.web2.http import splitHostPort
+
+from AptPackages import AptPackages
+
+aptpkg_dir='apt-packages'
+
+DECOMPRESS_EXTS = ['.gz', '.bz2']
+DECOMPRESS_FILES = ['release', 'sources', 'packages']
+
+class ProxyFileStream(stream.SimpleStream):
+ """Saves a stream to a file while providing a new stream."""
+
+ def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
+ """Initializes the proxy.
+
+ @type stream: C{twisted.web2.stream.IByteStream}
+ @param stream: the input stream to read from
+ @type outFile: C{twisted.python.FilePath}
+ @param outFile: the file to write to
+ @type hash: L{Hash.HashObject}
+ @param hash: the hash object to use for the file
+ @type decompress: C{string}
+ @param decompress: also decompress the file as this type
+ (currently only '.gz' and '.bz2' are supported)
+ @type decFile: C{twisted.python.FilePath}
+ @param decFile: the file to write the decompressed data to
+ """
+ self.stream = stream
+ self.outFile = outFile.open('w')
+ self.hash = hash
+ self.hash.new()
+ self.gzfile = None
+ self.bz2file = None
+ if decompress == ".gz":
+ self.gzheader = True
+ self.gzfile = decFile.open('w')
+ self.gzdec = decompressobj(-MAX_WBITS)
+ elif decompress == ".bz2":
+ self.bz2file = decFile.open('w')
+ self.bz2dec = BZ2Decompressor()
+ self.length = self.stream.length
+ self.start = 0
+ self.doneDefer = defer.Deferred()
+
+ def _done(self):
+ """Close the output file."""
+ if not self.outFile.closed:
+ self.outFile.close()
+ self.hash.digest()
+ if self.gzfile:
+ data_dec = self.gzdec.flush()
+ self.gzfile.write(data_dec)
+ self.gzfile.close()
+ self.gzfile = None
+ if self.bz2file:
+ self.bz2file.close()
+ self.bz2file = None
+
+ self.doneDefer.callback(self.hash)
+
+ def read(self):
+ """Read some data from the stream."""
+ if self.outFile.closed:
+ return None
+
+ data = self.stream.read()
+ if isinstance(data, defer.Deferred):
+ data.addCallbacks(self._write, self._done)
+ return data
+
+ self._write(data)
+ return data
+
+ def _write(self, data):
+ """Write the stream data to the file and return it for others to use."""
+ if data is None:
+ self._done()
+ return data
+
+ self.outFile.write(data)
+ self.hash.update(data)
+ if self.gzfile:
+ if self.gzheader:
+ self.gzheader = False
+ new_data = self._remove_gzip_header(data)
+ dec_data = self.gzdec.decompress(new_data)
+ else:
+ dec_data = self.gzdec.decompress(data)
+ self.gzfile.write(dec_data)
+ if self.bz2file:
+ dec_data = self.bz2dec.decompress(data)
+ self.bz2file.write(dec_data)
+ return data
+
+ def _remove_gzip_header(self, data):
+ if data[:2] != '\037\213':
+ raise IOError, 'Not a gzipped file'
+ if ord(data[2]) != 8:
+ raise IOError, 'Unknown compression method'
+ flag = ord(data[3])
+ # modtime = self.fileobj.read(4)
+ # extraflag = self.fileobj.read(1)
+ # os = self.fileobj.read(1)
+
+ skip = 10
+ if flag & FEXTRA:
+ # Read & discard the extra field, if present
+ xlen = ord(data[10])
+ xlen = xlen + 256*ord(data[11])
+ skip = skip + 2 + xlen
+ if flag & FNAME:
+ # Read and discard a null-terminated string containing the filename
+ while True:
+ if not data[skip] or data[skip] == '\000':
+ break
+ skip += 1
+ skip += 1
+ if flag & FCOMMENT:
+ # Read and discard a null-terminated string containing a comment
+ while True:
+ if not data[skip] or data[skip] == '\000':
+ break
+ skip += 1
+ skip += 1
+ if flag & FHCRC:
+ skip += 2 # Read & discard the 16-bit header CRC
+ return data[skip:]
+
+ def close(self):
+ """Clean everything up and return None to future reads."""
+ self.length = 0
+ self._done()
+ self.stream.close()
+
+class CacheManager:
+ """Manages all requests for cached objects."""
+
+ def __init__(self, cache_dir, db, manager = None):
+ self.cache_dir = cache_dir
+ self.db = db
+ self.manager = manager
+
+ def save_file(self, response, hash, url):
+ """Save a downloaded file to the cache and stream it."""
+ if response.code != 200:
+ log.msg('File was not found (%r): %s' % (response, url))
+ return response
+
+ log.msg('Returning file: %s' % url)
+
+ parsed = urlparse(url)
+ destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
+ log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
+
+ if destFile.exists():
+ log.msg('File already exists, removing: %s' % destFile.path)
+ destFile.remove()
+ elif not destFile.parent().exists():
+ destFile.parent().makedirs()
+
+ root, ext = os.path.splitext(destFile.basename())
+ if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
+ ext = ext.lower()
+ decFile = destFile.sibling(root)
+ log.msg('Decompressing to: %s' % decFile.path)
+ if decFile.exists():
+ log.msg('File already exists, removing: %s' % decFile.path)
+ decFile.remove()
+ else:
+ ext = None
+ decFile = None
+
+ orig_stream = response.stream
+ response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
+ response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
+ response.headers.getHeader('Last-Modified'),
+ ext, decFile)
+ response.stream.doneDefer.addErrback(self.save_error, url)
+ return response
+
+ def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
+ """Update the modification time and AptPackages."""
+ if modtime:
+ os.utime(destFile.path, (modtime, modtime))
+ if ext:
+ os.utime(decFile.path, (modtime, modtime))
+
+ result = hash.verify()
+ if result or result is None:
+ if result:
+ log.msg('Hashes match: %s' % url)
+ else:
+ log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
+
+ urlpath, newdir = self.db.storeFile(destFile, hash.digest(), self.cache_dir)
+ log.msg('now avaliable at %s: %s' % (urlpath, url))
+
+ if self.manager:
+ self.manager.new_cached_file(url, destFile, hash, urlpath)
+ if ext:
+ self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath)
+ else:
+ log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
+ destFile.remove()
+ if ext:
+ decFile.remove()
+
+ def save_error(self, failure, url):
+ """An error has occurred in downloadign or saving the file."""
+ log.msg('Error occurred downloading %s' % url)
+ log.err(failure)
+ return failure
+
+class TestMirrorManager(unittest.TestCase):
+ """Unit tests for the mirror manager."""
+
+ timeout = 20
+ pending_calls = []
+ client = None
+
+ def setUp(self):
+ self.client = CacheManager(FilePath('/tmp/.apt-dht'))
+
+ def tearDown(self):
+ for p in self.pending_calls:
+ if p.active():
+ p.cancel()
+ self.client = None
+
\ No newline at end of file
-import os.path, time
from twisted.python import log
from twisted.internet import defer
name = segments[0]
if name in self.subdirs:
log.msg('Sharing %s with %s' % (request.uri, request.remoteAddr))
- return static.File(self.subdirs[name]), segments[1:]
+ return static.File(self.subdirs[name].path), segments[1:]
if request.remoteAddr.host != "127.0.0.1":
log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
return None, ()
if len(name) > 1:
- return FileDownloader(self.directory, self.manager), segments[0:]
+ return FileDownloader(self.directory.path, self.manager), segments[0:]
else:
return self, ()
-from bz2 import BZ2Decompressor
-from zlib import decompressobj, MAX_WBITS
-from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
from urlparse import urlparse
import os
-from twisted.python import log, filepath
+from twisted.python import log
+from twisted.python.filepath import FilePath
from twisted.internet import defer
from twisted.trial import unittest
-from twisted.web2 import stream
from twisted.web2.http import splitHostPort
from AptPackages import AptPackages
-aptpkg_dir='.apt-dht'
-
-DECOMPRESS_EXTS = ['.gz', '.bz2']
-DECOMPRESS_FILES = ['release', 'sources', 'packages']
+aptpkg_dir='apt-packages'
class MirrorError(Exception):
"""Exception raised when there's a problem with the mirror."""
-class ProxyFileStream(stream.SimpleStream):
- """Saves a stream to a file while providing a new stream."""
-
- def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
- """Initializes the proxy.
-
- @type stream: C{twisted.web2.stream.IByteStream}
- @param stream: the input stream to read from
- @type outFile: C{twisted.python.filepath.FilePath}
- @param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
- @type decompress: C{string}
- @param decompress: also decompress the file as this type
- (currently only '.gz' and '.bz2' are supported)
- @type decFile: C{twisted.python.filepath.FilePath}
- @param decFile: the file to write the decompressed data to
- """
- self.stream = stream
- self.outFile = outFile.open('w')
- self.hash = hash
- self.hash.new()
- self.gzfile = None
- self.bz2file = None
- if decompress == ".gz":
- self.gzheader = True
- self.gzfile = decFile.open('w')
- self.gzdec = decompressobj(-MAX_WBITS)
- elif decompress == ".bz2":
- self.bz2file = decFile.open('w')
- self.bz2dec = BZ2Decompressor()
- self.length = self.stream.length
- self.start = 0
- self.doneDefer = defer.Deferred()
-
- def _done(self):
- """Close the output file."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- if self.gzfile:
- data_dec = self.gzdec.flush()
- self.gzfile.write(data_dec)
- self.gzfile.close()
- self.gzfile = None
- if self.bz2file:
- self.bz2file.close()
- self.bz2file = None
-
- self.doneDefer.callback(self.hash)
-
- def read(self):
- """Read some data from the stream."""
- if self.outFile.closed:
- return None
-
- data = self.stream.read()
- if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._done)
- return data
-
- self._write(data)
- return data
-
- def _write(self, data):
- """Write the stream data to the file and return it for others to use."""
- if data is None:
- self._done()
- return data
-
- self.outFile.write(data)
- self.hash.update(data)
- if self.gzfile:
- if self.gzheader:
- self.gzheader = False
- new_data = self._remove_gzip_header(data)
- dec_data = self.gzdec.decompress(new_data)
- else:
- dec_data = self.gzdec.decompress(data)
- self.gzfile.write(dec_data)
- if self.bz2file:
- dec_data = self.bz2dec.decompress(data)
- self.bz2file.write(dec_data)
- return data
-
- def _remove_gzip_header(self, data):
- if data[:2] != '\037\213':
- raise IOError, 'Not a gzipped file'
- if ord(data[2]) != 8:
- raise IOError, 'Unknown compression method'
- flag = ord(data[3])
- # modtime = self.fileobj.read(4)
- # extraflag = self.fileobj.read(1)
- # os = self.fileobj.read(1)
-
- skip = 10
- if flag & FEXTRA:
- # Read & discard the extra field, if present
- xlen = ord(data[10])
- xlen = xlen + 256*ord(data[11])
- skip = skip + 2 + xlen
- if flag & FNAME:
- # Read and discard a null-terminated string containing the filename
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FCOMMENT:
- # Read and discard a null-terminated string containing a comment
- while True:
- if not data[skip] or data[skip] == '\000':
- break
- skip += 1
- skip += 1
- if flag & FHCRC:
- skip += 2 # Read & discard the 16-bit header CRC
- return data[skip:]
-
- def close(self):
- """Clean everything up and return None to future reads."""
- self.length = 0
- self._done()
- self.stream.close()
-
class MirrorManager:
"""Manages all requests for mirror objects."""
- def __init__(self, cache_dir, manager = None):
- self.manager = manager
+ def __init__(self, cache_dir):
self.cache_dir = cache_dir
- self.cache = filepath.FilePath(self.cache_dir)
self.apt_caches = {}
def extractPath(self, url):
self.apt_caches[site] = {}
if baseDir not in self.apt_caches[site]:
- site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_'))
+ site_cache = self.cache_dir.child(aptpkg_dir).child('mirrors').child(site + baseDir.replace('/', '_'))
+ site_cache.makedirs
self.apt_caches[site][baseDir] = AptPackages(site_cache)
def updatedFile(self, url, file_path):
d.errback(MirrorError("Site Not Found"))
return d
- def save_file(self, response, hash, url):
- """Save a downloaded file to the cache and stream it."""
- if response.code != 200:
- log.msg('File was not found (%r): %s' % (response, url))
- return response
-
- log.msg('Returning file: %s' % url)
-
- parsed = urlparse(url)
- destFile = self.cache.preauthChild(parsed[1] + parsed[2])
- log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
-
- if destFile.exists():
- log.msg('File already exists, removing: %s' % destFile.path)
- destFile.remove()
- else:
- destFile.parent().makedirs()
-
- root, ext = os.path.splitext(destFile.basename())
- if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
- ext = ext.lower()
- decFile = destFile.sibling(root)
- log.msg('Decompressing to: %s' % decFile.path)
- if decFile.exists():
- log.msg('File already exists, removing: %s' % decFile.path)
- decFile.remove()
- else:
- ext = None
- decFile = None
-
- orig_stream = response.stream
- response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
- response.stream.doneDefer.addCallback(self.save_complete, url, destFile,
- response.headers.getHeader('Last-Modified'),
- ext, decFile)
- response.stream.doneDefer.addErrback(self.save_error, url)
- return response
-
- def save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
- """Update the modification time and AptPackages."""
- if modtime:
- os.utime(destFile.path, (modtime, modtime))
- if ext:
- os.utime(decFile.path, (modtime, modtime))
-
- result = hash.verify()
- if result or result is None:
- if result:
- log.msg('Hashes match: %s' % url)
- else:
- log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
-
- self.updatedFile(url, destFile.path)
- if ext:
- self.updatedFile(url[:-len(ext)], decFile.path)
-
- if self.manager:
- self.manager.cached_file(hash, url, destFile.path)
- else:
- log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
-
- def save_error(self, failure, url):
- """An error has occurred in downloadign or saving the file."""
- log.msg('Error occurred downloading %s' % url)
- log.err(failure)
- return failure
-
class TestMirrorManager(unittest.TestCase):
"""Unit tests for the mirror manager."""
client = None
def setUp(self):
- self.client = MirrorManager('/tmp')
+ self.client = MirrorManager(FilePath('/tmp/.apt-dht'))
def test_extractPath(self):
site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release')
break
self.client.updatedFile('http://' + self.releaseFile.replace('_','/'),
- '/var/lib/apt/lists/' + self.releaseFile)
+ FilePath('/var/lib/apt/lists/' + self.releaseFile))
self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'),
- '/var/lib/apt/lists/' + self.packagesFile)
+ FilePath('/var/lib/apt/lists/' + self.packagesFile))
self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'),
- '/var/lib/apt/lists/' + self.sourcesFile)
+ FilePath('/var/lib/apt/lists/' + self.sourcesFile))
lastDefer = defer.Deferred()
from twisted.internet import defer
from twisted.web2 import server, http, http_headers
from twisted.python import log
+from twisted.python.filepath import FilePath
from apt_dht_conf import config
from PeerManager import PeerManager
from HTTPServer import TopLevel
from MirrorManager import MirrorManager
+from CacheManager import CacheManager
from Hash import HashObject
from db import DB
from util import findMyIPAddr
+download_dir = 'cache'
+
class AptDHT:
def __init__(self, dht):
log.msg('Initializing the main apt_dht application')
- self.db = DB(config.get('DEFAULT', 'cache_dir') + '/.apt-dht.db')
+ self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
+ if not self.cache_dir.child(download_dir).exists():
+ self.cache_dir.child(download_dir).makedirs()
+ self.db = DB(self.cache_dir.child('apt-dht.db'))
self.dht = dht
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
- self.http_server = TopLevel(config.get('DEFAULT', 'cache_dir'), self)
+ self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
self.http_site = server.Site(self.http_server)
self.peers = PeerManager()
- self.mirrors = MirrorManager(config.get('DEFAULT', 'cache_dir'), self)
+ self.mirrors = MirrorManager(self.cache_dir)
+ self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_addr = None
def getSite(self):
def joinError(self, failure):
log.msg("joining DHT failed miserably")
log.err(failure)
+ raise RuntimeError, "IP address for this machine could not be found"
def check_freshness(self, path, modtime, resp):
log.msg('Checking if %s is still fresh' % path)
if not locations:
log.msg('Peers for %s were not found' % path)
getDefer = self.peers.get([path])
- getDefer.addCallback(self.mirrors.save_file, hash, path)
- getDefer.addErrback(self.mirrors.save_error, path)
+ getDefer.addCallback(self.cache.save_file, hash, path)
+ getDefer.addErrback(self.cache.save_error, path)
getDefer.addCallbacks(d.callback, d.errback)
else:
log.msg('Found peers for %s: %r' % (path, locations))
# Download from the found peers
getDefer = self.peers.get(locations)
getDefer.addCallback(self.check_response, hash, path)
- getDefer.addCallback(self.mirrors.save_file, hash, path)
- getDefer.addErrback(self.mirrors.save_error, path)
+ getDefer.addCallback(self.cache.save_file, hash, path)
+ getDefer.addErrback(self.cache.save_error, path)
getDefer.addCallbacks(d.callback, d.errback)
def check_response(self, response, hash, path):
return getDefer
return response
- def cached_file(self, hash, url, file_path):
- assert file_path.startswith(config.get('DEFAULT', 'cache_dir'))
- urlpath, newdir = self.db.storeFile(file_path, hash.digest(), config.get('DEFAULT', 'cache_dir'))
- log.msg('now avaliable at %s: %s' % (urlpath, url))
-
- if self.my_addr:
+ def new_cached_file(self, url, file_path, hash, urlpath):
+ self.mirrors.updatedFile(url, file_path)
+
+ if self.my_addr and hash:
site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
full_path = urlunparse(('http', site, urlpath, None, None, None))
key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
from time import sleep
import os
+from twisted.python.filepath import FilePath
from twisted.trial import unittest
assert sqlite.version_info >= (2, 1)
def __init__(self, db):
self.db = db
- try:
- os.stat(db)
- except OSError:
- self._createNewDB(db)
+ self.db.restat(False)
+ if self.db.exists():
+ self._loadDB()
else:
- self._loadDB(db)
+ self._createNewDB()
self.conn.text_factory = str
self.conn.row_factory = sqlite.Row
- def _loadDB(self, db):
+ def _loadDB(self):
try:
- self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
+ self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
except:
import traceback
raise DBExcept, "Couldn't open DB", traceback.format_exc()
- def _createNewDB(self, db):
- self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
+ def _createNewDB(self):
+ if not self.db.parent().exists():
+ self.db.parent().makedirs()
+ self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
c.execute("CREATE INDEX files_urldir ON files(urldir)")
c.close()
self.conn.commit()
- def _removeChanged(self, path, row):
+ def _removeChanged(self, file, row):
res = None
if row:
- try:
- stat = os.stat(path)
- except:
- stat = None
- if stat:
- res = (row['size'] == stat.st_size and row['mtime'] == stat.st_mtime)
+ file.restat(False)
+ if file.exists():
+ res = (row['size'] == file.getsize() and row['mtime'] == file.getmtime())
if not res:
c = self.conn.cursor()
- c.execute("DELETE FROM files WHERE path = ?", (path, ))
+ c.execute("DELETE FROM files WHERE path = ?", (file.path, ))
self.conn.commit()
c.close()
return res
- def storeFile(self, path, hash, directory):
+ def storeFile(self, file, hash, directory):
"""Store or update a file in the database.
@return: the urlpath to access the file, and whether a
new url top-level directory was needed
"""
- path = os.path.abspath(path)
- directory = os.path.abspath(directory)
- assert path.startswith(directory)
- stat = os.stat(path)
+ file.restat()
c = self.conn.cursor()
- c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (path, ))
+ c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, ))
row = c.fetchone()
if row and directory == row['directory']:
c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?",
- (khash(hash), stat.st_size, stat.st_mtime, datetime.now()))
+ (khash(hash), file.getsize(), file.getmtime(), datetime.now()))
newdir = False
urldir = row['urldir']
else:
urldir, newdir = self.findDirectory(directory)
c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)",
- (path, khash(hash), urldir, len(directory), stat.st_size, stat.st_mtime, datetime.now()))
+ (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now()))
self.conn.commit()
c.close()
- return '/~' + str(urldir) + path[len(directory):], newdir
+ return '/~' + str(urldir) + file.path[len(directory.path):], newdir
- def getFile(self, path):
+ def getFile(self, file):
"""Get a file from the database.
If it has changed or is missing, it is removed from the database.
@return: dictionary of info for the file, False if changed, or
None if not in database or missing
"""
- path = os.path.abspath(path)
c = self.conn.cursor()
- c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (path, ))
+ c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- res = self._removeChanged(path, row)
+ res = self._removeChanged(file, row)
if res:
res = {}
res['hash'] = row['hash']
- res['urlpath'] = '/~' + str(row['urldir']) + path[row['dirlength']:]
+ res['size'] = row['size']
+ res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
c.close()
return res
- def isUnchanged(self, path):
+ def isUnchanged(self, file):
"""Check if a file in the file system has changed.
If it has changed, it is removed from the table.
@return: True if unchanged, False if changed, None if not in database
"""
- path = os.path.abspath(path)
c = self.conn.cursor()
- c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, ))
+ c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- return self._removeChanged(path, row)
+ return self._removeChanged(file, row)
- def refreshFile(self, path):
+ def refreshFile(self, file):
"""Refresh the publishing time of a file.
If it has changed or is missing, it is removed from the table.
@return: True if unchanged, False if changed, None if not in database
"""
- path = os.path.abspath(path)
c = self.conn.cursor()
- c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, ))
+ c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- res = self._removeChanged(path, row)
+ res = self._removeChanged(file, row)
if res:
- c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), path))
+ c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
return res
def expiredFiles(self, expireAfter):
row = c.fetchone()
expired = {}
while row:
- res = self._removeChanged(row['path'], row)
+ res = self._removeChanged(FilePath(row['path']), row)
if res:
expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:])
row = c.fetchone()
newdirs = []
sql = "WHERE"
for dir in dirs:
- newdirs.append(os.path.abspath(dir) + os.sep + '*')
+ newdirs.append(dir.child('*').path)
sql += " path NOT GLOB ? AND"
sql = sql[:-4]
row = c.fetchone()
removed = []
while row:
- removed.append(row['path'])
+ removed.append(FilePath(row['path']))
row = c.fetchone()
if removed:
@return: the index of the url directory, and whether it is new or not
"""
- directory = os.path.abspath(directory)
c = self.conn.cursor()
- c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory, ))
+ c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, ))
row = c.fetchone()
c.close()
if row['urldir']:
# Not found, need to add a new one
c = self.conn.cursor()
- c.execute("INSERT INTO dirs (path) VALUES (?)", (directory, ))
+ c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, ))
self.conn.commit()
urldir = c.lastrowid
c.close()
row = c.fetchone()
dirs = {}
while row:
- dirs['~' + str(row['urldir'])] = row['path']
+ dirs['~' + str(row['urldir'])] = FilePath(row['path'])
row = c.fetchone()
c.close()
return dirs
"""Tests for the khashmir database."""
timeout = 5
- db = '/tmp/khashmir.db'
- path = '/tmp/khashmir.test'
+ db = FilePath('/tmp/khashmir.db')
+ file = FilePath('/tmp/apt-dht/khashmir.test')
hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
- directory = '/tmp/'
+ directory = FilePath('/tmp/apt-dht/')
urlpath = '/~1/khashmir.test'
- dirs = ['/tmp/apt-dht/top1', '/tmp/apt-dht/top2/sub1', '/tmp/apt-dht/top2/sub2/']
+ testfile = 'tmp/khashmir.test'
+ dirs = [FilePath('/tmp/apt-dht/top1'),
+ FilePath('/tmp/apt-dht/top2/sub1'),
+ FilePath('/tmp/apt-dht/top2/sub2/')]
def setUp(self):
- f = open(self.path, 'w')
- f.write('fgfhds')
- f.close()
- os.utime(self.path, None)
+ if not self.file.parent().exists():
+ self.file.parent().makedirs()
+ self.file.setContent('fgfhds')
+ self.file.touch()
self.store = DB(self.db)
- self.store.storeFile(self.path, self.hash, self.directory)
+ self.store.storeFile(self.file, self.hash, self.directory)
+
+ def test_openExistsingDB(self):
+ self.store.close()
+ self.store = None
+ sleep(1)
+ self.store = DB(self.db)
+ res = self.store.isUnchanged(self.file)
+ self.failUnless(res)
def test_getFile(self):
- res = self.store.getFile(self.path)
+ res = self.store.getFile(self.file)
self.failUnless(res)
self.failUnlessEqual(res['hash'], self.hash)
self.failUnlessEqual(res['urlpath'], self.urlpath)
self.failUnless(res)
self.failUnlessEqual(len(res.keys()), 1)
self.failUnlessEqual(res.keys()[0], '~1')
- self.failUnlessEqual(res['~1'], os.path.abspath(self.directory))
+ self.failUnlessEqual(res['~1'], self.directory)
def test_isUnchanged(self):
- res = self.store.isUnchanged(self.path)
+ res = self.store.isUnchanged(self.file)
self.failUnless(res)
sleep(2)
- os.utime(self.path, None)
- res = self.store.isUnchanged(self.path)
+ self.file.touch()
+ res = self.store.isUnchanged(self.file)
self.failUnless(res == False)
- os.unlink(self.path)
- res = self.store.isUnchanged(self.path)
+ self.file.remove()
+ res = self.store.isUnchanged(self.file)
self.failUnless(res == None)
def test_expiry(self):
self.failUnlessEqual(res.keys()[0], self.hash)
self.failUnlessEqual(len(res[self.hash]), 1)
self.failUnlessEqual(res[self.hash][0], self.urlpath)
- res = self.store.refreshFile(self.path)
+ res = self.store.refreshFile(self.file)
self.failUnless(res)
res = self.store.expiredFiles(1)
self.failUnlessEqual(len(res.keys()), 0)
def build_dirs(self):
for dir in self.dirs:
- path = os.path.join(dir, self.path[1:])
- os.makedirs(os.path.dirname(path))
- f = open(path, 'w')
- f.write(path)
- f.close()
- os.utime(path, None)
- self.store.storeFile(path, self.hash, dir)
+ file = dir.preauthChild(self.testfile)
+ if not file.parent().exists():
+ file.parent().makedirs()
+ file.setContent(file.path)
+ file.touch()
+ self.store.storeFile(file, self.hash, dir)
def test_removeUntracked(self):
self.build_dirs()
res = self.store.removeUntrackedFiles(self.dirs)
self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res)
- self.failUnlessEqual(res[0], self.path, 'Got removed paths: %r' % res)
+ self.failUnlessEqual(res[0], self.file, 'Got removed paths: %r' % res)
res = self.store.removeUntrackedFiles(self.dirs)
self.failUnlessEqual(len(res), 0, 'Got removed paths: %r' % res)
res = self.store.removeUntrackedFiles(self.dirs[1:])
self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res)
- self.failUnlessEqual(res[0], os.path.join(self.dirs[0], self.path[1:]), 'Got removed paths: %r' % res)
+ self.failUnlessEqual(res[0], self.dirs[0].preauthChild(self.testfile), 'Got removed paths: %r' % res)
res = self.store.removeUntrackedFiles(self.dirs[:1])
self.failUnlessEqual(len(res), 2, 'Got removed paths: %r' % res)
- self.failUnlessIn(os.path.join(self.dirs[1], self.path[1:]), res, 'Got removed paths: %r' % res)
- self.failUnlessIn(os.path.join(self.dirs[2], self.path[1:]), res, 'Got removed paths: %r' % res)
+ self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
+ self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
def test_reconcileDirectories(self):
self.build_dirs()
res = self.store.getAllDirectories()
self.failUnless(res)
self.failUnlessEqual(len(res.keys()), 1)
- res = self.store.removeUntrackedFiles(['/what'])
+ res = self.store.removeUntrackedFiles([FilePath('/what')])
res = self.store.reconcileDirectories()
self.failUnlessEqual(res, True)
res = self.store.getAllDirectories()
self.failUnlessEqual(len(res.keys()), 0)
def tearDown(self):
- for root, dirs, files in os.walk('/tmp/apt-dht', topdown=False):
- for name in files:
- os.remove(os.path.join(root, name))
- for name in dirs:
- os.rmdir(os.path.join(root, name))
+ self.directory.remove()
self.store.close()
- os.unlink(self.db)
+ self.db.remove()
from apt_dht.interfaces import IDHT
from khashmir import Khashmir
+khashmir_dir = 'apt-dht-Khashmir'
+
class DHTError(Exception):
"""Represents errors that occur in the DHT."""
self.config_parser = config
self.section = section
self.config = {}
- self.cache_dir = self.config_parser.get(section, 'cache_dir')
+ self.cache_dir = os.path.join(self.config_parser.get(section, 'cache_dir'), khashmir_dir)
+ if not os.path.exists(self.cache_dir):
+ os.makedirs(self.cache_dir)
self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
for k in self.config_parser.options(section):
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, 'SPEW': True, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
self.a = DHT()
def setup(self, config, cache_dir):
self.config = config
self.port = config['PORT']
- self.store = DB(os.path.join(cache_dir, '.khashmir.' + str(self.port) + '.db'))
+ self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
#self.app = service.Application("krpc")