from twisted.python import log
from twisted.python.filepath import FilePath
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from twisted.trial import unittest
from twisted.web2 import stream
from twisted.web2.http import splitHostPort
-from AptPackages import AptPackages
+from Hash import HashObject
aptpkg_dir='apt-packages'
self.cache_dir = cache_dir
self.db = db
self.manager = manager
+ self.scanning = []
+
+ # Init the database, remove old files, init the HTTP dirs
+ self.db.removeUntrackedFiles([self.cache_dir])
+ self.db.reconcileDirectories()
+ self.manager.setDirectories(self.db.getAllDirectories())
+
+
+ def scanDirectories(self):
+ """Scan the cache directories, hashing new and rehashing changed files."""
+ assert not self.scanning, "a directory scan is already under way"
+ self.scanning.append(self.cache_dir)
+ self._scanDirectories()
+
+ def _scanDirectories(self, walker = None):
+ # Need to start waling a new directory
+ if walker is None:
+ # If there are any left, get them
+ if self.scanning:
+ log.msg('started scanning directory: %s' % self.scanning[0].path)
+ walker = self.scanning[0].walk()
+ else:
+ # Done, just check if the HTTP directories need updating
+ log.msg('cache directory scan complete')
+ if self.db.reconcileDirectories():
+ self.manager.setDirectories(self.db.getAllDirectories())
+ return
+
+ try:
+ # Get the next file in the directory
+ file = walker.next()
+ except StopIteration:
+ # No files left, go to the next directory
+ log.msg('done scanning directory: %s' % self.scanning[0].path)
+ self.scanning.pop(0)
+ reactor.callLater(0, self._scanDirectories)
+ return
+
+ # If it's not a file, or it's already properly in the DB, ignore it
+ if not file.isfile() or self.db.isUnchanged(file):
+ if not file.isfile():
+ log.msg('entering directory: %s' % file.path)
+ else:
+ log.msg('file is unchanged: %s' % file.path)
+ reactor.callLater(0, self._scanDirectories, walker)
+ return
+
+ # Otherwise hash it
+ log.msg('start hash checking file: %s' % file.path)
+ hash = HashObject()
+ df = hash.hashInThread(file)
+ df.addBoth(self._doneHashing, file, walker)
+ df.addErrback(log.err)
+
+ def _doneHashing(self, result, file, walker):
+ reactor.callLater(0, self._scanDirectories, walker)
+ if isinstance(result, HashObject):
+ log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+ if self.scanning[0] == self.cache_dir:
+ mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
+ urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
+ url = 'http:/' + file.path[len(self.cache_dir.path):]
+ else:
+ urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
+ url = None
+ if newdir:
+ self.manager.setDirectories(self.db.getAllDirectories())
+ self.manager.new_cached_file(file, result, urlpath, url)
+ else:
+ log.msg('hash check of %s failed' % file.path)
+ log.err(result)
+
def save_file(self, response, hash, url):
"""Save a downloaded file to the cache and stream it."""
if response.code != 200:
mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
log.msg('now avaliable at %s: %s' % (urlpath, url))
- if newdir and self.manager:
- log.msg('A new web directory was created, so enable it')
- self.manager.setDirectories(self.db.getAllDirectories())
if self.manager:
- self.manager.new_cached_file(url, destFile, hash, urlpath)
+ if newdir:
+ log.msg('A new web directory was created, so enable it')
+ self.manager.setDirectories(self.db.getAllDirectories())
+
+ self.manager.new_cached_file(destFile, hash, urlpath, url)
if ext:
- self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath)
+ self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
else:
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
destFile.remove()
from binascii import b2a_hex, a2b_hex
import sys
+from twisted.internet import threads, defer
from twisted.trial import unittest
+class HashError(ValueError):
+ """An error has occurred while hashing a file."""
+
class HashObject:
"""Manages hashes and hashing for a file."""
if bits is not None:
bytes = (bits - 1) // 8 + 1
else:
- assert bytes is not None, "you must specify one of bits or bytes"
+ if bytes is None:
+ raise HashError, "you must specify one of bits or bytes"
if len(hashString) < bytes:
hashString = hashString + '\000'*(bytes - len(hashString))
elif len(hashString) > bytes:
def update(self, data):
"""Add more data to the file hasher."""
if self.result is None:
- assert self.done == False, "Already done, you can't add more data after calling digest() or verify()"
- assert self.fileHasher is not None, "file hasher not initialized"
+ if self.done:
+ raise HashError, "Already done, you can't add more data after calling digest() or verify()"
+ if self.fileHasher is None:
+ raise HashError, "file hasher not initialized"
self.fileHasher.update(data)
self.size += len(data)
def digest(self):
"""Get the hash of the added file data."""
if self.fileHash is None:
- assert self.fileHasher is not None, "you must hash some data first"
+ if self.fileHasher is None:
+ raise HashError, "you must hash some data first"
self.fileHash = self.fileHasher.digest()
self.done = True
return self.fileHash
self.result = (self.fileHash == self.expHash and self.size == self.expSize)
return self.result
+ def hashInThread(self, file):
+ """Hashes a file in a separate thread, callback with the result."""
+ file.restat(False)
+ if not file.exists():
+ df = defer.Deferred()
+ df.errback(HashError("file not found"))
+ return df
+
+ df = threads.deferToThread(self._hashInThread, file)
+ return df
+
+ def _hashInThread(self, file):
+ """Hashes a file, returning itself as the result."""
+ f = file.open()
+ self.new(force = True)
+ data = f.read(4096)
+ while data:
+ self.update(data)
+ data = f.read(4096)
+ self.digest()
+ return self
+
#### Methods for setting the expected hash
def set(self, hashType, hashHex, size):
"""Initialize the hash object.
def test_failure(self):
h = HashObject()
h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
- self.failUnlessRaises(AssertionError, h.normexpected)
- self.failUnlessRaises(AssertionError, h.digest)
- self.failUnlessRaises(AssertionError, h.hexdigest)
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.normexpected)
+ self.failUnlessRaises(HashError, h.digest)
+ self.failUnlessRaises(HashError, h.hexdigest)
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
def test_sha1(self):
h = HashObject()
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
def test_md5(self):
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
def test_sha256(self):
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
if sys.version_info < (2, 5):
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
+ self.setDirectories = self.http_server.setDirectories
self.http_site = server.Site(self.http_server)
self.peers = PeerManager()
self.mirrors = MirrorManager(self.cache_dir)
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_addr = None
- self.setDirectories = self.http_server.setDirectories
def getSite(self):
return self.http_site
self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
if not self.my_addr:
raise RuntimeError, "IP address for this machine could not be found"
+ self.cache.scanDirectories()
def joinError(self, failure):
log.msg("joining DHT failed miserably")
return getDefer
return response
- def new_cached_file(self, url, file_path, hash, urlpath):
- self.mirrors.updatedFile(url, file_path)
+ def new_cached_file(self, file_path, hash, urlpath, url = None):
+ """Add a newly cached file to the DHT.
+
+ If the file was downloaded, set url to the path it was downloaded for.
+ """
+ if url:
+ self.mirrors.updatedFile(url, file_path)
if self.my_addr and hash:
site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))