from twisted.web2.http import splitHostPort
from Hash import HashObject
+from apt_p2p_conf import config
DECOMPRESS_EXTS = ['.gz', '.bz2']
DECOMPRESS_FILES = ['release', 'sources', 'packages']
+class CacheError(Exception):
+ """Error occurred downloading a file to the cache."""
+
class ProxyFileStream(stream.SimpleStream):
"""Saves a stream to a file while providing a new stream.
if self.bz2file:
self.bz2file.close()
self.bz2file = None
-
- self.doneDefer.callback(self.hash)
+ def _error(self, err):
+ """Close all the output files, return the error."""
+ if not self.outFile.closed:
+ self._done()
+ self.stream.close()
+ self.doneDefer.errback(err)
+
def read(self):
"""Read some data from the stream."""
if self.outFile.closed:
# Read data from the stream, deal with the possible deferred
data = self.stream.read()
if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._done)
+ data.addCallbacks(self._write, self._error)
return data
self._write(data)
Also optionally decompresses it.
"""
if data is None:
- self._done()
+ if not self.outFile.closed:
+ self._done()
+ self.doneDefer.callback(self.hash)
return data
# Write and hash the streamed data
def close(self):
"""Clean everything up and return None to future reads."""
+ log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
+ if self.hash.size < self.length:
+ self._error(CacheError('Prematurely closed, all data was not written'))
+ elif not self.outFile.closed:
+ self._done()
+ self.doneDefer.callback(self.hash)
self.length = 0
- self._done()
self.stream.close()
class CacheManager:
@ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
"""
- def __init__(self, cache_dir, db, other_dirs = [], manager = None):
+ def __init__(self, cache_dir, db, manager = None):
"""Initialize the instance and remove any untracked files from the DB..
@type cache_dir: L{twisted.python.filepath.FilePath}
@param cache_dir: the directory to use for storing all files
@type db: L{db.DB}
@param db: the database to use for tracking files and hashes
- @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
- @param other_dirs: the other directories that have shared files in them
- (optional, defaults to only using the cache directory)
@type manager: L{apt_p2p.AptP2P}
@param manager: the main program object to send requests to
(optional, defaults to not calling back with cached files)
"""
self.cache_dir = cache_dir
- self.other_dirs = other_dirs
+ self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
self.all_dirs = self.other_dirs[:]
self.all_dirs.insert(0, self.cache_dir)
self.db = db
self.db.removeUntrackedFiles(self.all_dirs)
#{ Scanning directories
- def scanDirectories(self):
+ def scanDirectories(self, result = None):
"""Scan the cache directories, hashing new and rehashing changed files."""
assert not self.scanning, "a directory scan is already under way"
self.scanning = self.all_dirs[:]
# If it's not a file ignore it
if not file.isfile():
- log.msg('entering directory: %s' % file.path)
reactor.callLater(0, self._scanDirectories, None, walker)
return
# If it's already properly in the DB, ignore it
db_status = self.db.isUnchanged(file)
if db_status:
- log.msg('file is unchanged: %s' % file.path)
reactor.callLater(0, self._scanDirectories, None, walker)
return
hash = HashObject()
df = hash.hashInThread(file)
df.addBoth(self._doneHashing, file, walker)
- df.addErrback(log.err)
def _doneHashing(self, result, file, walker):
"""If successful, add the hashed file to the DB and inform the main program."""
url = 'http:/' + file.path[len(self.cache_dir.path):]
# Store the hashed file in the database
- new_hash = self.db.storeFile(file, result.digest())
+ new_hash = self.db.storeFile(file, result.digest(), True,
+ ''.join(result.pieceDigests()))
# Tell the main program to handle the new cache file
df = self.manager.new_cached_file(file, result, new_hash, url, True)
if destFile.exists():
log.msg('File already exists, removing: %s' % destFile.path)
destFile.remove()
- elif not destFile.parent().exists():
+ if not destFile.parent().exists():
destFile.parent().makedirs()
# Determine whether it needs to be decompressed and how
response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
response.headers.getHeader('Last-Modified'),
decFile)
- response.stream.doneDefer.addErrback(self.save_error, url)
+ response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile)
# Return the modified response with the new stream
return response
@param decFile: the file where the decompressed download was written to
(optional, defaults to the file not having been compressed)
"""
- if modtime:
- os.utime(destFile.path, (modtime, modtime))
- if decFile:
- os.utime(decFile.path, (modtime, modtime))
-
result = hash.verify()
if result or result is None:
+ if modtime:
+ os.utime(destFile.path, (modtime, modtime))
+
if result:
log.msg('Hashes match: %s' % url)
+ dht = True
else:
log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
+ dht = False
- new_hash = self.db.storeFile(destFile, hash.digest())
- log.msg('now avaliable: %s' % (url))
+ new_hash = self.db.storeFile(destFile, hash.digest(), dht,
+ ''.join(hash.pieceDigests()))
if self.manager:
self.manager.new_cached_file(destFile, hash, new_hash, url)
- if decFile:
- ext_len = len(destFile.path) - len(decFile.path)
- self.manager.new_cached_file(decFile, None, False, url[:-ext_len])
+
+ if decFile:
+ # Hash the decompressed file and add it to the DB
+ decHash = HashObject()
+ ext_len = len(destFile.path) - len(decFile.path)
+ df = decHash.hashInThread(decFile)
+ df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime)
+ df.addErrback(self._save_error, url[:-ext_len], decFile)
else:
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
destFile.remove()
if decFile:
decFile.remove()
+ def _save_error(self, failure, url, destFile, decFile = None):
+ """Remove the destination files."""
+ log.msg('Error occurred downloading %s' % url)
+ log.err(failure)
+ destFile.restat(False)
+ if destFile.exists():
+ log.msg('Removing the incomplete file: %s' % destFile.path)
+ destFile.remove()
+ if decFile:
+ decFile.restat(False)
+ if decFile.exists():
+ log.msg('Removing the incomplete file: %s' % decFile.path)
+ decFile.remove()
+
def save_error(self, failure, url):
- """An error has occurred in downloadign or saving the file."""
+ """An error has occurred in downloading or saving the file"""
log.msg('Error occurred downloading %s' % url)
log.err(failure)
return failure