From ef16875cd9e8b3452deeff8b13768d4c10b0e9ad Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 18 Feb 2008 14:40:04 -0800 Subject: [PATCH] HTTPServer uses the hash to lookup the file in the DB (no more directories). --- apt_dht/CacheManager.py | 31 +++-------- apt_dht/HTTPServer.py | 29 +++++----- apt_dht/apt_dht.py | 14 ++--- apt_dht/db.py | 120 +++++----------------------------------- 4 files changed, 45 insertions(+), 149 deletions(-) diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py index 9820ae1..517a009 100644 --- a/apt_dht/CacheManager.py +++ b/apt_dht/CacheManager.py @@ -156,10 +156,8 @@ class CacheManager: self.manager = manager self.scanning = [] - # Init the database, remove old files, init the HTTP dirs + # Init the database, remove old files self.db.removeUntrackedFiles(self.all_dirs) - self.db.reconcileDirectories() - self.manager.setDirectories(self.db.getAllDirectories()) def scanDirectories(self): @@ -176,10 +174,7 @@ class CacheManager: log.msg('started scanning directory: %s' % self.scanning[0].path) walker = self.scanning[0].walk() else: - # Done, just check if the HTTP directories need updating log.msg('cache directory scan complete') - if self.db.reconcileDirectories(): - self.manager.setDirectories(self.db.getAllDirectories()) return try: @@ -212,16 +207,11 @@ class CacheManager: if isinstance(result, HashObject): log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest())) + url = None if self.scanning[0] == self.cache_dir: - mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0]) - urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir) url = 'http:/' + file.path[len(self.cache_dir.path):] - else: - urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0]) - url = None - if newdir: - self.manager.setDirectories(self.db.getAllDirectories()) - df = self.manager.new_cached_file(file, result, urlpath, url) + self.db.storeFile(file, result.digest()) + df = self.manager.new_cached_file(file, result, url) if df is None: reactor.callLater(0, self._scanDirectories, None, walker) else: @@ -283,18 +273,13 @@ class CacheManager: else: log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) - mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0]) - urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir) - log.msg('now avaliable at %s: %s' % (urlpath, url)) + self.db.storeFile(destFile, hash.digest()) + log.msg('now avaliable: %s' % (url)) if self.manager: - if newdir: - log.msg('A new web directory was created, so enable it') - self.manager.setDirectories(self.db.getAllDirectories()) - - self.manager.new_cached_file(destFile, hash, urlpath, url) + self.manager.new_cached_file(destFile, hash, url) if ext: - self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)]) + self.manager.new_cached_file(decFile, None, url[:-len(ext)]) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove() diff --git a/apt_dht/HTTPServer.py b/apt_dht/HTTPServer.py index 38c7c09..266078e 100644 --- a/apt_dht/HTTPServer.py +++ b/apt_dht/HTTPServer.py @@ -40,10 +40,10 @@ class FileDownloader(static.File): class TopLevel(resource.Resource): addSlash = True - def __init__(self, directory, manager): + def __init__(self, directory, db, manager): self.directory = directory + self.db = db self.manager = manager - self.subdirs = {} self.factory = None def getHTTPFactory(self): @@ -53,14 +53,6 @@ class TopLevel(resource.Resource): 'betweenRequestsTimeOut': 60}) return self.factory - def setDirectories(self, dirs): - self.subdirs = {} - for k in dirs: - # Don't allow empty subdirectory - if k: - self.subdirs[k] = dirs[k] - log.msg('new subdirectories initialized') - def render(self, ctx): return http.Response( 200, @@ -71,9 +63,17 @@ class TopLevel(resource.Resource): def locateChild(self, request, segments): name = segments[0] - if name in self.subdirs: - log.msg('Sharing %s with %s' % (request.uri, request.remoteAddr)) - return static.File(self.subdirs[name].path), segments[1:] + if name == '~': + if len(segments) != 2: + log.msg('Got a malformed request from %s' % request.remoteAddr) + return None, () + hash = segments[1] + files = self.db.lookupHash(hash) + if files: + log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr)) + return static.File(files[0]['path'].path), () + else: + log.msg('Hash could not be found in database: %s' % hash) if request.remoteAddr.host != "127.0.0.1": log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr)) @@ -83,6 +83,9 @@ class TopLevel(resource.Resource): return FileDownloader(self.directory.path, self.manager), segments[0:] else: return self, () + + log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr)) + return None, () if __name__ == '__builtin__': # Running from twistd -y diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index aeda56b..264be0f 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -29,8 +29,7 @@ class AptDHT: self.dht = dht self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) - self.http_server = TopLevel(self.cache_dir.child(download_dir), self) - self.setDirectories = self.http_server.setDirectories + self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) self.getHTTPFactory = self.http_server.getHTTPFactory self.peers = PeerManager() self.mirrors = MirrorManager(self.cache_dir) @@ -153,7 +152,7 @@ class AptDHT: return getDefer return response - def new_cached_file(self, file_path, hash, urlpath, url = None): + def new_cached_file(self, file_path, hash, url = None): """Add a newly cached file to the DHT. If the file was downloaded, set url to the path it was downloaded for. @@ -163,13 +162,12 @@ class AptDHT: if self.my_addr and hash: site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT')) - full_path = urlunparse(('http', site, urlpath, None, None, None)) key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) - storeDefer = self.dht.storeValue(key, full_path) - storeDefer.addCallback(self.store_done, full_path) + storeDefer = self.dht.storeValue(key, site) + storeDefer.addCallback(self.store_done, hash) return storeDefer return None - def store_done(self, result, path): - log.msg('Added %s to the DHT: %r' % (path, result)) + def store_done(self, result, hash): + log.msg('Added %s to the DHT: %r' % (hash, result)) \ No newline at end of file diff --git a/apt_dht/db.py b/apt_dht/db.py index 1f4d6e3..7b225a1 100644 --- a/apt_dht/db.py +++ b/apt_dht/db.py @@ -46,12 +46,9 @@ class DB: self.db.parent().makedirs() self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() - c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") + c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") c.execute("CREATE INDEX files_hash ON files(hash)") - c.execute("CREATE INDEX files_urldir ON files(urldir)") c.execute("CREATE INDEX files_refreshed ON files(refreshed)") - c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)") - c.execute("CREATE INDEX dirs_path ON dirs(path)") c.close() self.conn.commit() @@ -68,28 +65,20 @@ class DB: c.close() return res - def storeFile(self, file, hash, directory): - """Store or update a file in the database. - - @return: the urlpath to access the file, and whether a - new url top-level directory was needed - """ + def storeFile(self, file, hash): + """Store or update a file in the database.""" file.restat() c = self.conn.cursor() - c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, )) + c.execute("SELECT path FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - if row and directory == row['directory']: + if row: c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", (khash(hash), file.getsize(), file.getmtime(), datetime.now())) - newdir = False - urldir = row['urldir'] else: - urldir, newdir = self.findDirectory(directory) - c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)", - (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now())) + c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)", + (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now())) self.conn.commit() c.close() - return '/~' + str(urldir) + file.path[len(directory.path):], newdir def getFile(self, file): """Get a file from the database. @@ -100,7 +89,7 @@ class DB: None if not in database or missing """ c = self.conn.cursor() - c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, )) + c.execute("SELECT hash, size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() res = None if row: @@ -109,7 +98,6 @@ class DB: res = {} res['hash'] = row['hash'] res['size'] = row['size'] - res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] c.close() return res @@ -122,7 +110,7 @@ class DB: @return: list of dictionaries of info for the found files """ c = self.conn.cursor() - c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), )) + c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), )) row = c.fetchone() files = [] while row: @@ -132,7 +120,6 @@ class DB: res = {} res['path'] = file res['size'] = row['size'] - res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] files.append(res) row = c.fetchone() c.close() @@ -172,17 +159,17 @@ class DB: Also removes any entries from the table that no longer exist. - @return: dictionary with keys the hashes, values a list of url paths + @return: dictionary with keys the hashes, values a list of FilePaths """ t = datetime.now() - timedelta(seconds=expireAfter) c = self.conn.cursor() - c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, )) + c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, )) row = c.fetchone() expired = {} while row: res = self._removeChanged(FilePath(row['path']), row) if res: - expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:]) + expired.setdefault(row['hash'], []).append(FilePath(row['path'])) row = c.fetchone() c.close() return expired @@ -215,45 +202,6 @@ class DB: self.conn.commit() return removed - def findDirectory(self, directory): - """Store or update a directory in the database. - - @return: the index of the url directory, and whether it is new or not - """ - c = self.conn.cursor() - c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, )) - row = c.fetchone() - c.close() - if row['urldir']: - return row['urldir'], False - - # Not found, need to add a new one - c = self.conn.cursor() - c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, )) - self.conn.commit() - urldir = c.lastrowid - c.close() - return urldir, True - - def getAllDirectories(self): - """Get all the current directories avaliable.""" - c = self.conn.cursor() - c.execute("SELECT urldir, path FROM dirs") - row = c.fetchone() - dirs = {} - while row: - dirs['~' + str(row['urldir'])] = FilePath(row['path']) - row = c.fetchone() - c.close() - return dirs - - def reconcileDirectories(self): - """Remove any unneeded directories by checking which are used by files.""" - c = self.conn.cursor() - c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)') - self.conn.commit() - return bool(c.rowcount) - def close(self): self.conn.close() @@ -262,10 +210,9 @@ class TestDB(unittest.TestCase): timeout = 5 db = FilePath('/tmp/khashmir.db') - file = FilePath('/tmp/apt-dht/khashmir.test') hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!' directory = FilePath('/tmp/apt-dht/') - urlpath = '/~1/khashmir.test' + file = FilePath('/tmp/apt-dht/khashmir.test') testfile = 'tmp/khashmir.test' dirs = [FilePath('/tmp/apt-dht/top1'), FilePath('/tmp/apt-dht/top2/sub1'), @@ -277,7 +224,7 @@ class TestDB(unittest.TestCase): self.file.setContent('fgfhds') self.file.touch() self.store = DB(self.db) - self.store.storeFile(self.file, self.hash, self.directory) + self.store.storeFile(self.file, self.hash) def test_openExistsingDB(self): self.store.close() @@ -291,14 +238,6 @@ class TestDB(unittest.TestCase): res = self.store.getFile(self.file) self.failUnless(res) self.failUnlessEqual(res['hash'], self.hash) - self.failUnlessEqual(res['urlpath'], self.urlpath) - - def test_getAllDirectories(self): - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 1) - self.failUnlessEqual(res.keys()[0], '~1') - self.failUnlessEqual(res['~1'], self.directory) def test_isUnchanged(self): res = self.store.isUnchanged(self.file) @@ -319,7 +258,6 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(len(res.keys()), 1) self.failUnlessEqual(res.keys()[0], self.hash) self.failUnlessEqual(len(res[self.hash]), 1) - self.failUnlessEqual(res[self.hash][0], self.urlpath) res = self.store.refreshFile(self.file) self.failUnless(res) res = self.store.expiredFiles(1) @@ -332,7 +270,7 @@ class TestDB(unittest.TestCase): file.parent().makedirs() file.setContent(file.path) file.touch() - self.store.storeFile(file, self.hash, dir) + self.store.storeFile(file, self.hash) def test_removeUntracked(self): self.build_dirs() @@ -349,34 +287,6 @@ class TestDB(unittest.TestCase): self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) - def test_reconcileDirectories(self): - self.build_dirs() - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 4) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, False) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 4) - res = self.store.removeUntrackedFiles(self.dirs) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 3) - res = self.store.removeUntrackedFiles(self.dirs[:1]) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 1) - res = self.store.removeUntrackedFiles([FilePath('/what')]) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnlessEqual(len(res.keys()), 0) - def tearDown(self): self.directory.remove() self.store.close() -- 2.39.5