self.manager = manager
self.scanning = []
- # Init the database, remove old files, init the HTTP dirs
+ # Init the database, remove old files
self.db.removeUntrackedFiles(self.all_dirs)
- self.db.reconcileDirectories()
- self.manager.setDirectories(self.db.getAllDirectories())
def scanDirectories(self):
log.msg('started scanning directory: %s' % self.scanning[0].path)
walker = self.scanning[0].walk()
else:
- # Done, just check if the HTTP directories need updating
log.msg('cache directory scan complete')
- if self.db.reconcileDirectories():
- self.manager.setDirectories(self.db.getAllDirectories())
return
try:
if isinstance(result, HashObject):
log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+ url = None
if self.scanning[0] == self.cache_dir:
- mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
- urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
url = 'http:/' + file.path[len(self.cache_dir.path):]
- else:
- urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
- url = None
- if newdir:
- self.manager.setDirectories(self.db.getAllDirectories())
- df = self.manager.new_cached_file(file, result, urlpath, url)
+ self.db.storeFile(file, result.digest())
+ df = self.manager.new_cached_file(file, result, url)
if df is None:
reactor.callLater(0, self._scanDirectories, None, walker)
else:
else:
log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
- mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
- urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
- log.msg('now avaliable at %s: %s' % (urlpath, url))
+ self.db.storeFile(destFile, hash.digest())
+ log.msg('now avaliable: %s' % (url))
if self.manager:
- if newdir:
- log.msg('A new web directory was created, so enable it')
- self.manager.setDirectories(self.db.getAllDirectories())
-
- self.manager.new_cached_file(destFile, hash, urlpath, url)
+ self.manager.new_cached_file(destFile, hash, url)
if ext:
- self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
+ self.manager.new_cached_file(decFile, None, url[:-len(ext)])
else:
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
destFile.remove()
self.dht = dht
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
- self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
- self.setDirectories = self.http_server.setDirectories
+ self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
self.getHTTPFactory = self.http_server.getHTTPFactory
self.peers = PeerManager()
self.mirrors = MirrorManager(self.cache_dir)
return getDefer
return response
- def new_cached_file(self, file_path, hash, urlpath, url = None):
+ def new_cached_file(self, file_path, hash, url = None):
"""Add a newly cached file to the DHT.
If the file was downloaded, set url to the path it was downloaded for.
if self.my_addr and hash:
site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
- full_path = urlunparse(('http', site, urlpath, None, None, None))
key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
- storeDefer = self.dht.storeValue(key, full_path)
- storeDefer.addCallback(self.store_done, full_path)
+ storeDefer = self.dht.storeValue(key, site)
+ storeDefer.addCallback(self.store_done, hash)
return storeDefer
return None
- def store_done(self, result, path):
- log.msg('Added %s to the DHT: %r' % (path, result))
+ def store_done(self, result, hash):
+ log.msg('Added %s to the DHT: %r' % (hash, result))
\ No newline at end of file
self.db.parent().makedirs()
self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
- c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+ c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
c.execute("CREATE INDEX files_hash ON files(hash)")
- c.execute("CREATE INDEX files_urldir ON files(urldir)")
c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
- c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
- c.execute("CREATE INDEX dirs_path ON dirs(path)")
c.close()
self.conn.commit()
c.close()
return res
- def storeFile(self, file, hash, directory):
- """Store or update a file in the database.
-
- @return: the urlpath to access the file, and whether a
- new url top-level directory was needed
- """
+ def storeFile(self, file, hash):
+ """Store or update a file in the database."""
file.restat()
c = self.conn.cursor()
- c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, ))
+ c.execute("SELECT path FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- if row and directory == row['directory']:
+ if row:
c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?",
(khash(hash), file.getsize(), file.getmtime(), datetime.now()))
- newdir = False
- urldir = row['urldir']
else:
- urldir, newdir = self.findDirectory(directory)
- c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)",
- (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now()))
+ c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)",
+ (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now()))
self.conn.commit()
c.close()
- return '/~' + str(urldir) + file.path[len(directory.path):], newdir
def getFile(self, file):
"""Get a file from the database.
None if not in database or missing
"""
c = self.conn.cursor()
- c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
+ c.execute("SELECT hash, size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
res = None
if row:
res = {}
res['hash'] = row['hash']
res['size'] = row['size']
- res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
c.close()
return res
@return: list of dictionaries of info for the found files
"""
c = self.conn.cursor()
- c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), ))
+ c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
row = c.fetchone()
files = []
while row:
res = {}
res['path'] = file
res['size'] = row['size']
- res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
files.append(res)
row = c.fetchone()
c.close()
Also removes any entries from the table that no longer exist.
- @return: dictionary with keys the hashes, values a list of url paths
+ @return: dictionary with keys the hashes, values a list of FilePaths
"""
t = datetime.now() - timedelta(seconds=expireAfter)
c = self.conn.cursor()
- c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, ))
+ c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, ))
row = c.fetchone()
expired = {}
while row:
res = self._removeChanged(FilePath(row['path']), row)
if res:
- expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:])
+ expired.setdefault(row['hash'], []).append(FilePath(row['path']))
row = c.fetchone()
c.close()
return expired
self.conn.commit()
return removed
- def findDirectory(self, directory):
- """Store or update a directory in the database.
-
- @return: the index of the url directory, and whether it is new or not
- """
- c = self.conn.cursor()
- c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, ))
- row = c.fetchone()
- c.close()
- if row['urldir']:
- return row['urldir'], False
-
- # Not found, need to add a new one
- c = self.conn.cursor()
- c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, ))
- self.conn.commit()
- urldir = c.lastrowid
- c.close()
- return urldir, True
-
- def getAllDirectories(self):
- """Get all the current directories avaliable."""
- c = self.conn.cursor()
- c.execute("SELECT urldir, path FROM dirs")
- row = c.fetchone()
- dirs = {}
- while row:
- dirs['~' + str(row['urldir'])] = FilePath(row['path'])
- row = c.fetchone()
- c.close()
- return dirs
-
- def reconcileDirectories(self):
- """Remove any unneeded directories by checking which are used by files."""
- c = self.conn.cursor()
- c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)')
- self.conn.commit()
- return bool(c.rowcount)
-
def close(self):
self.conn.close()
timeout = 5
db = FilePath('/tmp/khashmir.db')
- file = FilePath('/tmp/apt-dht/khashmir.test')
hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
directory = FilePath('/tmp/apt-dht/')
- urlpath = '/~1/khashmir.test'
+ file = FilePath('/tmp/apt-dht/khashmir.test')
testfile = 'tmp/khashmir.test'
dirs = [FilePath('/tmp/apt-dht/top1'),
FilePath('/tmp/apt-dht/top2/sub1'),
self.file.setContent('fgfhds')
self.file.touch()
self.store = DB(self.db)
- self.store.storeFile(self.file, self.hash, self.directory)
+ self.store.storeFile(self.file, self.hash)
def test_openExistsingDB(self):
self.store.close()
res = self.store.getFile(self.file)
self.failUnless(res)
self.failUnlessEqual(res['hash'], self.hash)
- self.failUnlessEqual(res['urlpath'], self.urlpath)
-
- def test_getAllDirectories(self):
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 1)
- self.failUnlessEqual(res.keys()[0], '~1')
- self.failUnlessEqual(res['~1'], self.directory)
def test_isUnchanged(self):
res = self.store.isUnchanged(self.file)
self.failUnlessEqual(len(res.keys()), 1)
self.failUnlessEqual(res.keys()[0], self.hash)
self.failUnlessEqual(len(res[self.hash]), 1)
- self.failUnlessEqual(res[self.hash][0], self.urlpath)
res = self.store.refreshFile(self.file)
self.failUnless(res)
res = self.store.expiredFiles(1)
file.parent().makedirs()
file.setContent(file.path)
file.touch()
- self.store.storeFile(file, self.hash, dir)
+ self.store.storeFile(file, self.hash)
def test_removeUntracked(self):
self.build_dirs()
self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
- def test_reconcileDirectories(self):
- self.build_dirs()
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 4)
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, False)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 4)
- res = self.store.removeUntrackedFiles(self.dirs)
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 3)
- res = self.store.removeUntrackedFiles(self.dirs[:1])
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 1)
- res = self.store.removeUntrackedFiles([FilePath('/what')])
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnlessEqual(len(res.keys()), 0)
-
def tearDown(self):
self.directory.remove()
self.store.close()