HTTPServer uses the hash to lookup the file in the DB (no more directories).
authorCameron Dale <camrdale@gmail.com>
Mon, 18 Feb 2008 22:40:04 +0000 (14:40 -0800)
committerCameron Dale <camrdale@gmail.com>
Mon, 18 Feb 2008 22:40:04 +0000 (14:40 -0800)
apt_dht/CacheManager.py
apt_dht/HTTPServer.py
apt_dht/apt_dht.py
apt_dht/db.py

index 9820ae1..517a009 100644 (file)
@@ -156,10 +156,8 @@ class CacheManager:
         self.manager = manager
         self.scanning = []
         
-        # Init the database, remove old files, init the HTTP dirs
+        # Init the database, remove old files
         self.db.removeUntrackedFiles(self.all_dirs)
-        self.db.reconcileDirectories()
-        self.manager.setDirectories(self.db.getAllDirectories())
         
         
     def scanDirectories(self):
@@ -176,10 +174,7 @@ class CacheManager:
                 log.msg('started scanning directory: %s' % self.scanning[0].path)
                 walker = self.scanning[0].walk()
             else:
-                # Done, just check if the HTTP directories need updating
                 log.msg('cache directory scan complete')
-                if self.db.reconcileDirectories():
-                    self.manager.setDirectories(self.db.getAllDirectories())
                 return
             
         try:
@@ -212,16 +207,11 @@ class CacheManager:
     
         if isinstance(result, HashObject):
             log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+            url = None
             if self.scanning[0] == self.cache_dir:
-                mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
-                urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
                 url = 'http:/' + file.path[len(self.cache_dir.path):]
-            else:
-                urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
-                url = None
-            if newdir:
-                self.manager.setDirectories(self.db.getAllDirectories())
-            df = self.manager.new_cached_file(file, result, urlpath, url)
+            self.db.storeFile(file, result.digest())
+            df = self.manager.new_cached_file(file, result, url)
             if df is None:
                 reactor.callLater(0, self._scanDirectories, None, walker)
             else:
@@ -283,18 +273,13 @@ class CacheManager:
             else:
                 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
                 
-            mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
-            urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
-            log.msg('now avaliable at %s: %s' % (urlpath, url))
+            self.db.storeFile(destFile, hash.digest())
+            log.msg('now avaliable: %s' % (url))
 
             if self.manager:
-                if newdir:
-                    log.msg('A new web directory was created, so enable it')
-                    self.manager.setDirectories(self.db.getAllDirectories())
-    
-                self.manager.new_cached_file(destFile, hash, urlpath, url)
+                self.manager.new_cached_file(destFile, hash, url)
                 if ext:
-                    self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
+                    self.manager.new_cached_file(decFile, None, url[:-len(ext)])
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()
index 38c7c09..266078e 100644 (file)
@@ -40,10 +40,10 @@ class FileDownloader(static.File):
 class TopLevel(resource.Resource):
     addSlash = True
     
-    def __init__(self, directory, manager):
+    def __init__(self, directory, db, manager):
         self.directory = directory
+        self.db = db
         self.manager = manager
-        self.subdirs = {}
         self.factory = None
 
     def getHTTPFactory(self):
@@ -53,14 +53,6 @@ class TopLevel(resource.Resource):
                                                   'betweenRequestsTimeOut': 60})
         return self.factory
 
-    def setDirectories(self, dirs):
-        self.subdirs = {}
-        for k in dirs:
-            # Don't allow empty subdirectory
-            if k:
-                self.subdirs[k] = dirs[k]
-        log.msg('new subdirectories initialized')
-    
     def render(self, ctx):
         return http.Response(
             200,
@@ -71,9 +63,17 @@ class TopLevel(resource.Resource):
 
     def locateChild(self, request, segments):
         name = segments[0]
-        if name in self.subdirs:
-            log.msg('Sharing %s with %s' % (request.uri, request.remoteAddr))
-            return static.File(self.subdirs[name].path), segments[1:]
+        if name == '~':
+            if len(segments) != 2:
+                log.msg('Got a malformed request from %s' % request.remoteAddr)
+                return None, ()
+            hash = segments[1]
+            files = self.db.lookupHash(hash)
+            if files:
+                log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
+                return static.File(files[0]['path'].path), ()
+            else:
+                log.msg('Hash could not be found in database: %s' % hash)
         
         if request.remoteAddr.host != "127.0.0.1":
             log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
@@ -83,6 +83,9 @@ class TopLevel(resource.Resource):
             return FileDownloader(self.directory.path, self.manager), segments[0:]
         else:
             return self, ()
+        
+        log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
+        return None, ()
 
 if __name__ == '__builtin__':
     # Running from twistd -y
index aeda56b..264be0f 100644 (file)
@@ -29,8 +29,7 @@ class AptDHT:
         self.dht = dht
         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
-        self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
-        self.setDirectories = self.http_server.setDirectories
+        self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
         self.getHTTPFactory = self.http_server.getHTTPFactory
         self.peers = PeerManager()
         self.mirrors = MirrorManager(self.cache_dir)
@@ -153,7 +152,7 @@ class AptDHT:
             return getDefer
         return response
         
-    def new_cached_file(self, file_path, hash, urlpath, url = None):
+    def new_cached_file(self, file_path, hash, url = None):
         """Add a newly cached file to the DHT.
         
         If the file was downloaded, set url to the path it was downloaded for.
@@ -163,13 +162,12 @@ class AptDHT:
         
         if self.my_addr and hash:
             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
-            full_path = urlunparse(('http', site, urlpath, None, None, None))
             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
-            storeDefer = self.dht.storeValue(key, full_path)
-            storeDefer.addCallback(self.store_done, full_path)
+            storeDefer = self.dht.storeValue(key, site)
+            storeDefer.addCallback(self.store_done, hash)
             return storeDefer
         return None
 
-    def store_done(self, result, path):
-        log.msg('Added %s to the DHT: %r' % (path, result))
+    def store_done(self, result, hash):
+        log.msg('Added %s to the DHT: %r' % (hash, result))
         
\ No newline at end of file
index 1f4d6e3..7b225a1 100644 (file)
@@ -46,12 +46,9 @@ class DB:
             self.db.parent().makedirs()
         self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
-        c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+        c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
         c.execute("CREATE INDEX files_hash ON files(hash)")
-        c.execute("CREATE INDEX files_urldir ON files(urldir)")
         c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
-        c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
-        c.execute("CREATE INDEX dirs_path ON dirs(path)")
         c.close()
         self.conn.commit()
 
@@ -68,28 +65,20 @@ class DB:
                 c.close()
         return res
         
-    def storeFile(self, file, hash, directory):
-        """Store or update a file in the database.
-        
-        @return: the urlpath to access the file, and whether a
-            new url top-level directory was needed
-        """
+    def storeFile(self, file, hash):
+        """Store or update a file in the database."""
         file.restat()
         c = self.conn.cursor()
-        c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, ))
+        c.execute("SELECT path FROM files WHERE path = ?", (file.path, ))
         row = c.fetchone()
-        if row and directory == row['directory']:
+        if row:
             c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", 
                       (khash(hash), file.getsize(), file.getmtime(), datetime.now()))
-            newdir = False
-            urldir = row['urldir']
         else:
-            urldir, newdir = self.findDirectory(directory)
-            c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)",
-                      (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now()))
+            c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)",
+                      (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now()))
         self.conn.commit()
         c.close()
-        return '/~' + str(urldir) + file.path[len(directory.path):], newdir
         
     def getFile(self, file):
         """Get a file from the database.
@@ -100,7 +89,7 @@ class DB:
             None if not in database or missing
         """
         c = self.conn.cursor()
-        c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
+        c.execute("SELECT hash, size, mtime FROM files WHERE path = ?", (file.path, ))
         row = c.fetchone()
         res = None
         if row:
@@ -109,7 +98,6 @@ class DB:
                 res = {}
                 res['hash'] = row['hash']
                 res['size'] = row['size']
-                res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
         c.close()
         return res
         
@@ -122,7 +110,7 @@ class DB:
         @return: list of dictionaries of info for the found files
         """
         c = self.conn.cursor()
-        c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), ))
+        c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
         row = c.fetchone()
         files = []
         while row:
@@ -132,7 +120,6 @@ class DB:
                 res = {}
                 res['path'] = file
                 res['size'] = row['size']
-                res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
                 files.append(res)
             row = c.fetchone()
         c.close()
@@ -172,17 +159,17 @@ class DB:
         
         Also removes any entries from the table that no longer exist.
         
-        @return: dictionary with keys the hashes, values a list of url paths
+        @return: dictionary with keys the hashes, values a list of FilePaths
         """
         t = datetime.now() - timedelta(seconds=expireAfter)
         c = self.conn.cursor()
-        c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, ))
+        c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, ))
         row = c.fetchone()
         expired = {}
         while row:
             res = self._removeChanged(FilePath(row['path']), row)
             if res:
-                expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:])
+                expired.setdefault(row['hash'], []).append(FilePath(row['path']))
             row = c.fetchone()
         c.close()
         return expired
@@ -215,45 +202,6 @@ class DB:
         self.conn.commit()
         return removed
     
-    def findDirectory(self, directory):
-        """Store or update a directory in the database.
-        
-        @return: the index of the url directory, and whether it is new or not
-        """
-        c = self.conn.cursor()
-        c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, ))
-        row = c.fetchone()
-        c.close()
-        if row['urldir']:
-            return row['urldir'], False
-
-        # Not found, need to add a new one
-        c = self.conn.cursor()
-        c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, ))
-        self.conn.commit()
-        urldir = c.lastrowid
-        c.close()
-        return urldir, True
-        
-    def getAllDirectories(self):
-        """Get all the current directories avaliable."""
-        c = self.conn.cursor()
-        c.execute("SELECT urldir, path FROM dirs")
-        row = c.fetchone()
-        dirs = {}
-        while row:
-            dirs['~' + str(row['urldir'])] = FilePath(row['path'])
-            row = c.fetchone()
-        c.close()
-        return dirs
-    
-    def reconcileDirectories(self):
-        """Remove any unneeded directories by checking which are used by files."""
-        c = self.conn.cursor()
-        c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)')
-        self.conn.commit()
-        return bool(c.rowcount)
-        
     def close(self):
         self.conn.close()
 
@@ -262,10 +210,9 @@ class TestDB(unittest.TestCase):
     
     timeout = 5
     db = FilePath('/tmp/khashmir.db')
-    file = FilePath('/tmp/apt-dht/khashmir.test')
     hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
     directory = FilePath('/tmp/apt-dht/')
-    urlpath = '/~1/khashmir.test'
+    file = FilePath('/tmp/apt-dht/khashmir.test')
     testfile = 'tmp/khashmir.test'
     dirs = [FilePath('/tmp/apt-dht/top1'),
             FilePath('/tmp/apt-dht/top2/sub1'),
@@ -277,7 +224,7 @@ class TestDB(unittest.TestCase):
         self.file.setContent('fgfhds')
         self.file.touch()
         self.store = DB(self.db)
-        self.store.storeFile(self.file, self.hash, self.directory)
+        self.store.storeFile(self.file, self.hash)
 
     def test_openExistsingDB(self):
         self.store.close()
@@ -291,14 +238,6 @@ class TestDB(unittest.TestCase):
         res = self.store.getFile(self.file)
         self.failUnless(res)
         self.failUnlessEqual(res['hash'], self.hash)
-        self.failUnlessEqual(res['urlpath'], self.urlpath)
-        
-    def test_getAllDirectories(self):
-        res = self.store.getAllDirectories()
-        self.failUnless(res)
-        self.failUnlessEqual(len(res.keys()), 1)
-        self.failUnlessEqual(res.keys()[0], '~1')
-        self.failUnlessEqual(res['~1'], self.directory)
         
     def test_isUnchanged(self):
         res = self.store.isUnchanged(self.file)
@@ -319,7 +258,6 @@ class TestDB(unittest.TestCase):
         self.failUnlessEqual(len(res.keys()), 1)
         self.failUnlessEqual(res.keys()[0], self.hash)
         self.failUnlessEqual(len(res[self.hash]), 1)
-        self.failUnlessEqual(res[self.hash][0], self.urlpath)
         res = self.store.refreshFile(self.file)
         self.failUnless(res)
         res = self.store.expiredFiles(1)
@@ -332,7 +270,7 @@ class TestDB(unittest.TestCase):
                 file.parent().makedirs()
             file.setContent(file.path)
             file.touch()
-            self.store.storeFile(file, self.hash, dir)
+            self.store.storeFile(file, self.hash)
     
     def test_removeUntracked(self):
         self.build_dirs()
@@ -349,34 +287,6 @@ class TestDB(unittest.TestCase):
         self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
         self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
         
-    def test_reconcileDirectories(self):
-        self.build_dirs()
-        res = self.store.getAllDirectories()
-        self.failUnless(res)
-        self.failUnlessEqual(len(res.keys()), 4)
-        res = self.store.reconcileDirectories()
-        self.failUnlessEqual(res, False)
-        res = self.store.getAllDirectories()
-        self.failUnless(res)
-        self.failUnlessEqual(len(res.keys()), 4)
-        res = self.store.removeUntrackedFiles(self.dirs)
-        res = self.store.reconcileDirectories()
-        self.failUnlessEqual(res, True)
-        res = self.store.getAllDirectories()
-        self.failUnless(res)
-        self.failUnlessEqual(len(res.keys()), 3)
-        res = self.store.removeUntrackedFiles(self.dirs[:1])
-        res = self.store.reconcileDirectories()
-        self.failUnlessEqual(res, True)
-        res = self.store.getAllDirectories()
-        self.failUnless(res)
-        self.failUnlessEqual(len(res.keys()), 1)
-        res = self.store.removeUntrackedFiles([FilePath('/what')])
-        res = self.store.reconcileDirectories()
-        self.failUnlessEqual(res, True)
-        res = self.store.getAllDirectories()
-        self.failUnlessEqual(len(res.keys()), 0)
-        
     def tearDown(self):
         self.directory.remove()
         self.store.close()