CacheManager scans the cache directory during initialization.
authorCameron Dale <camrdale@gmail.com>
Tue, 15 Jan 2008 05:55:22 +0000 (21:55 -0800)
committerCameron Dale <camrdale@gmail.com>
Tue, 15 Jan 2008 05:55:22 +0000 (21:55 -0800)
Actually the scanning is done after a successful join so that
files can be added to the DHT.

The HashObjects can now hash a file in a separate thread.

The donwload cache is scanned for new files too, causing the
decompressed versions of compressed Packages files to be
hashed and added to the DHT. This isn't currently consistent
since they are not added when they are first downloaded
and decompressed.

apt_dht/CacheManager.py
apt_dht/Hash.py
apt_dht/apt_dht.py

index 368714d017d6856de43f8fb69f4d1ddb35f6e5d2..4fdcf40c0a1069ec548d48bffa8b5bac7fe89ca2 100644 (file)
@@ -7,12 +7,12 @@ import os
 
 from twisted.python import log
 from twisted.python.filepath import FilePath
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import splitHostPort
 
-from AptPackages import AptPackages
+from Hash import HashObject
 
 aptpkg_dir='apt-packages'
 
@@ -151,7 +151,79 @@ class CacheManager:
         self.cache_dir = cache_dir
         self.db = db
         self.manager = manager
+        self.scanning = []
+        
+        # Init the database, remove old files, init the HTTP dirs
+        self.db.removeUntrackedFiles([self.cache_dir])
+        self.db.reconcileDirectories()
+        self.manager.setDirectories(self.db.getAllDirectories())
+        
+        
+    def scanDirectories(self):
+        """Scan the cache directories, hashing new and rehashing changed files."""
+        assert not self.scanning, "a directory scan is already under way"
+        self.scanning.append(self.cache_dir)
+        self._scanDirectories()
+
+    def _scanDirectories(self, walker = None):
+        # Need to start waling a new directory
+        if walker is None:
+            # If there are any left, get them
+            if self.scanning:
+                log.msg('started scanning directory: %s' % self.scanning[0].path)
+                walker = self.scanning[0].walk()
+            else:
+                # Done, just check if the HTTP directories need updating
+                log.msg('cache directory scan complete')
+                if self.db.reconcileDirectories():
+                    self.manager.setDirectories(self.db.getAllDirectories())
+                return
+            
+        try:
+            # Get the next file in the directory
+            file = walker.next()
+        except StopIteration:
+            # No files left, go to the next directory
+            log.msg('done scanning directory: %s' % self.scanning[0].path)
+            self.scanning.pop(0)
+            reactor.callLater(0, self._scanDirectories)
+            return
+
+        # If it's not a file, or it's already properly in the DB, ignore it
+        if not file.isfile() or self.db.isUnchanged(file):
+            if not file.isfile():
+                log.msg('entering directory: %s' % file.path)
+            else:
+                log.msg('file is unchanged: %s' % file.path)
+            reactor.callLater(0, self._scanDirectories, walker)
+            return
+
+        # Otherwise hash it
+        log.msg('start hash checking file: %s' % file.path)
+        hash = HashObject()
+        df = hash.hashInThread(file)
+        df.addBoth(self._doneHashing, file, walker)
+        df.addErrback(log.err)
+    
+    def _doneHashing(self, result, file, walker):
+        reactor.callLater(0, self._scanDirectories, walker)
     
+        if isinstance(result, HashObject):
+            log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+            if self.scanning[0] == self.cache_dir:
+                mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
+                urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
+                url = 'http:/' + file.path[len(self.cache_dir.path):]
+            else:
+                urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
+                url = None
+            if newdir:
+                self.manager.setDirectories(self.db.getAllDirectories())
+            self.manager.new_cached_file(file, result, urlpath, url)
+        else:
+            log.msg('hash check of %s failed' % file.path)
+            log.err(result)
+
     def save_file(self, response, hash, url):
         """Save a downloaded file to the cache and stream it."""
         if response.code != 200:
@@ -207,14 +279,15 @@ class CacheManager:
             mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
             urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
             log.msg('now avaliable at %s: %s' % (urlpath, url))
-            if newdir and self.manager:
-                log.msg('A new web directory was created, so enable it')
-                self.manager.setDirectories(self.db.getAllDirectories())
 
             if self.manager:
-                self.manager.new_cached_file(url, destFile, hash, urlpath)
+                if newdir:
+                    log.msg('A new web directory was created, so enable it')
+                    self.manager.setDirectories(self.db.getAllDirectories())
+    
+                self.manager.new_cached_file(destFile, hash, urlpath, url)
                 if ext:
-                    self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath)
+                    self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()
index bb993f1755bbbb80d5b8a23a2afb40a17b2c53fb..2223876947bda6ebe9fcbfc6aebf729ec4604da7 100644 (file)
@@ -2,8 +2,12 @@
 from binascii import b2a_hex, a2b_hex
 import sys
 
+from twisted.internet import threads, defer
 from twisted.trial import unittest
 
+class HashError(ValueError):
+    """An error has occurred while hashing a file."""
+    
 class HashObject:
     """Manages hashes and hashing for a file."""
     
@@ -53,7 +57,8 @@ class HashObject:
         if bits is not None:
             bytes = (bits - 1) // 8 + 1
         else:
-            assert bytes is not None, "you must specify one of bits or bytes"
+            if bytes is None:
+                raise HashError, "you must specify one of bits or bytes"
         if len(hashString) < bytes:
             hashString = hashString + '\000'*(bytes - len(hashString))
         elif len(hashString) > bytes:
@@ -102,15 +107,18 @@ class HashObject:
     def update(self, data):
         """Add more data to the file hasher."""
         if self.result is None:
-            assert self.done == False, "Already done, you can't add more data after calling digest() or verify()"
-            assert self.fileHasher is not None, "file hasher not initialized"
+            if self.done:
+                raise HashError, "Already done, you can't add more data after calling digest() or verify()"
+            if self.fileHasher is None:
+                raise HashError, "file hasher not initialized"
             self.fileHasher.update(data)
             self.size += len(data)
         
     def digest(self):
         """Get the hash of the added file data."""
         if self.fileHash is None:
-            assert self.fileHasher is not None, "you must hash some data first"
+            if self.fileHasher is None:
+                raise HashError, "you must hash some data first"
             self.fileHash = self.fileHasher.digest()
             self.done = True
         return self.fileHash
@@ -136,6 +144,28 @@ class HashObject:
             self.result = (self.fileHash == self.expHash and self.size == self.expSize)
         return self.result
     
+    def hashInThread(self, file):
+        """Hashes a file in a separate thread, callback with the result."""
+        file.restat(False)
+        if not file.exists():
+            df = defer.Deferred()
+            df.errback(HashError("file not found"))
+            return df
+        
+        df = threads.deferToThread(self._hashInThread, file)
+        return df
+    
+    def _hashInThread(self, file):
+        """Hashes a file, returning itself as the result."""
+        f = file.open()
+        self.new(force = True)
+        data = f.read(4096)
+        while data:
+            self.update(data)
+            data = f.read(4096)
+        self.digest()
+        return self
+
     #### Methods for setting the expected hash
     def set(self, hashType, hashHex, size):
         """Initialize the hash object.
@@ -213,10 +243,10 @@ class TestHashObject(unittest.TestCase):
     def test_failure(self):
         h = HashObject()
         h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
-        self.failUnlessRaises(AssertionError, h.normexpected)
-        self.failUnlessRaises(AssertionError, h.digest)
-        self.failUnlessRaises(AssertionError, h.hexdigest)
-        self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+        self.failUnlessRaises(HashError, h.normexpected)
+        self.failUnlessRaises(HashError, h.digest)
+        self.failUnlessRaises(HashError, h.hexdigest)
+        self.failUnlessRaises(HashError, h.update, 'gfgf')
     
     def test_sha1(self):
         h = HashObject()
@@ -230,7 +260,7 @@ class TestHashObject(unittest.TestCase):
         h.new()
         h.update('apt-dht is the best')
         self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
-        self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+        self.failUnlessRaises(HashError, h.update, 'gfgf')
         self.failUnless(h.verify() == True)
         
     def test_md5(self):
@@ -245,7 +275,7 @@ class TestHashObject(unittest.TestCase):
         h.new()
         h.update('apt-dht is the best')
         self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
-        self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+        self.failUnlessRaises(HashError, h.update, 'gfgf')
         self.failUnless(h.verify() == True)
         
     def test_sha256(self):
@@ -260,7 +290,7 @@ class TestHashObject(unittest.TestCase):
         h.new()
         h.update('apt-dht is the best')
         self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
-        self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+        self.failUnlessRaises(HashError, h.update, 'gfgf')
         self.failUnless(h.verify() == True)
 
     if sys.version_info < (2, 5):
index 1abeaa5323ec0bf5b690ec2646bc309e7158387f..2211c1dfacd4dd64d4fe4ffcb2d6d8805ebeb39a 100644 (file)
@@ -30,12 +30,12 @@ class AptDHT:
         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
         self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
+        self.setDirectories = self.http_server.setDirectories
         self.http_site = server.Site(self.http_server)
         self.peers = PeerManager()
         self.mirrors = MirrorManager(self.cache_dir)
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
         self.my_addr = None
-        self.setDirectories = self.http_server.setDirectories
     
     def getSite(self):
         return self.http_site
@@ -44,6 +44,7 @@ class AptDHT:
         self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
         if not self.my_addr:
             raise RuntimeError, "IP address for this machine could not be found"
+        self.cache.scanDirectories()
 
     def joinError(self, failure):
         log.msg("joining DHT failed miserably")
@@ -113,8 +114,13 @@ class AptDHT:
             return getDefer
         return response
         
-    def new_cached_file(self, url, file_path, hash, urlpath):
-        self.mirrors.updatedFile(url, file_path)
+    def new_cached_file(self, file_path, hash, urlpath, url = None):
+        """Add a newly cached file to the DHT.
+        
+        If the file was downloaded, set url to the path it was downloaded for.
+        """
+        if url:
+            self.mirrors.updatedFile(url, file_path)
         
         if self.my_addr and hash:
             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))