From c2be1eee6c0157ddcb1dc188c96711eaa21c7897 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Fri, 22 Feb 2008 15:30:57 -0800 Subject: [PATCH] Refresh DHT values just before they are due to expire. --- TODO | 7 --- apt-dht.conf | 4 ++ apt_dht/CacheManager.py | 10 ++-- apt_dht/Hash.py | 5 +- apt_dht/apt_dht.py | 36 +++++++++++---- apt_dht/apt_dht_conf.py | 4 ++ apt_dht/db.py | 99 +++++++++++++++++++++++++++++----------- debian/apt-dht.conf.sgml | 8 ++++ test.py | 4 ++ 9 files changed, 127 insertions(+), 50 deletions(-) diff --git a/TODO b/TODO index b204ce7..ce33a77 100644 --- a/TODO +++ b/TODO @@ -93,10 +93,3 @@ key. Once a querying node has found enough values (or all of them), then it would send the "get_value" method to the nodes that have the most values. The "get_value" query could also have a new parameter "number", which is the maximum number of values to return. - - -Missing Kademlia implementation details are needed. - -The current implementation is missing some important features, mostly -focussed on storing values: - - values need to be republished (every hour?) diff --git a/apt-dht.conf b/apt-dht.conf index 709a0d3..8982669 100644 --- a/apt-dht.conf +++ b/apt-dht.conf @@ -34,6 +34,10 @@ LOCAL_OK = no # to reload when a new request arrives. UNLOAD_PACKAGES_CACHE = 5m +# Refresh the DHT keys after this much time has passed. +# This should be a time slightly less than the DHT's KEY_EXPIRE value. +KEY_REFRESH = 57m + # Which DHT implementation to use. # It must be possile to do "from .DHT import DHT" to get a class that # implements the IDHT interface. There should also be a similarly named diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py index c70cee2..9bdf75b 100644 --- a/apt_dht/CacheManager.py +++ b/apt_dht/CacheManager.py @@ -210,8 +210,8 @@ class CacheManager: url = None if self.scanning[0] == self.cache_dir: url = 'http:/' + file.path[len(self.cache_dir.path):] - self.db.storeFile(file, result.digest()) - df = self.manager.new_cached_file(file, result, url, True) + new_hash = self.db.storeFile(file, result.digest()) + df = self.manager.new_cached_file(file, result, new_hash, url, True) if df is None: reactor.callLater(0, self._scanDirectories, None, walker) else: @@ -273,13 +273,13 @@ class CacheManager: else: log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) - self.db.storeFile(destFile, hash.digest()) + new_hash = self.db.storeFile(destFile, hash.digest()) log.msg('now avaliable: %s' % (url)) if self.manager: - self.manager.new_cached_file(destFile, hash, url) + self.manager.new_cached_file(destFile, hash, new_hash, url) if ext: - self.manager.new_cached_file(decFile, None, url[:-len(ext)]) + self.manager.new_cached_file(decFile, None, False, url[:-len(ext)]) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove() diff --git a/apt_dht/Hash.py b/apt_dht/Hash.py index 933fdb5..d55375f 100644 --- a/apt_dht/Hash.py +++ b/apt_dht/Hash.py @@ -34,14 +34,15 @@ class HashObject: }, ] - def __init__(self): + def __init__(self, digest = None, size = None): self.hashTypeNum = 0 # Use the first if nothing else matters self.expHash = None self.expHex = None self.expSize = None self.expNormHash = None self.fileHasher = None - self.fileHash = None + self.fileHash = digest + self.size = size self.fileHex = None self.fileNormHash = None self.done = True diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index eb103bd..1896494 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -3,7 +3,7 @@ from binascii import b2a_hex from urlparse import urlunparse import os, re -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.web2 import server, http, http_headers, static from twisted.python import log, failure from twisted.python.filepath import FilePath @@ -35,21 +35,38 @@ class AptDHT: self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE')) other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')] self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self) - self.my_addr = None + self.my_contact = None def joinComplete(self, result): - self.my_addr = findMyIPAddr(result, - config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), - config.getboolean('DEFAULT', 'LOCAL_OK')) - if not self.my_addr: + my_addr = findMyIPAddr(result, + config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), + config.getboolean('DEFAULT', 'LOCAL_OK')) + if not my_addr: raise RuntimeError, "IP address for this machine could not be found" + self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) self.cache.scanDirectories() + reactor.callLater(60, self.refreshFiles) def joinError(self, failure): log.msg("joining DHT failed miserably") log.err(failure) raise RuntimeError, "IP address for this machine could not be found" + def refreshFiles(self): + """Refresh any files in the DHT that are about to expire.""" + expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') + hashes = self.db.expiredFiles(expireAfter) + if len(hashes.keys()) > 0: + log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) + for raw_hash in hashes: + self.db.refreshHash(raw_hash) + hash = HashObject(raw_hash) + key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) + value = {'c': self.my_contact} + storeDefer = self.dht.storeValue(key, value) + storeDefer.addCallback(self.store_done, hash) + reactor.callLater(60, self.refreshFiles) + def check_freshness(self, req, path, modtime, resp): log.msg('Checking if %s is still fresh' % path) d = self.peers.get('', path, method = "HEAD", modtime = modtime) @@ -152,7 +169,7 @@ class AptDHT: return getDefer return response - def new_cached_file(self, file_path, hash, url = None, forceDHT = False): + def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False): """Add a newly cached file to the DHT. If the file was downloaded, set url to the path it was downloaded for. @@ -162,10 +179,9 @@ class AptDHT: if url: self.mirrors.updatedFile(url, file_path) - if self.my_addr and hash and (hash.expected() is not None or forceDHT): - contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT')) - value = {'c': contact} + if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT): key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) + value = {'c': self.my_contact} storeDefer = self.dht.storeValue(key, value) storeDefer.addCallback(self.store_done, hash) return storeDefer diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index 931d46c..3fad625 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -43,6 +43,10 @@ DEFAULTS = { # to reload when a new request arrives. 'UNLOAD_PACKAGES_CACHE': '5m', + # Refresh the DHT keys after this much time has passed. + # This should be a time slightly less than the DHT's KEY_EXPIRE value. + 'KEY_REFRESH': '57m', + # Which DHT implementation to use. # It must be possile to do "from .DHT import DHT" to get a class that # implements the IDHT interface. diff --git a/apt_dht/db.py b/apt_dht/db.py index 7b225a1..1ab2b37 100644 --- a/apt_dht/db.py +++ b/apt_dht/db.py @@ -66,20 +66,36 @@ class DB: return res def storeFile(self, file, hash): - """Store or update a file in the database.""" + """Store or update a file in the database. + + @return: True if the hash was not in the database before + (so it needs to be added to the DHT) + """ + new_hash = True + refreshTime = datetime.now() + c = self.conn.cursor() + c.execute("SELECT MAX(refreshed) AS max_refresh FROM files WHERE hash = ?", (khash(hash), )) + row = c.fetchone() + if row and row['max_refresh']: + new_hash = False + refreshTime = row['max_refresh'] + c.close() + file.restat() c = self.conn.cursor() c.execute("SELECT path FROM files WHERE path = ?", (file.path, )) row = c.fetchone() if row: c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", - (khash(hash), file.getsize(), file.getmtime(), datetime.now())) + (khash(hash), file.getsize(), file.getmtime(), refreshTime)) else: c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)", - (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now())) + (file.path, khash(hash), file.getsize(), file.getmtime(), refreshTime)) self.conn.commit() c.close() + return new_hash + def getFile(self, file): """Get a file from the database. @@ -110,7 +126,7 @@ class DB: @return: list of dictionaries of info for the found files """ c = self.conn.cursor() - c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), )) + c.execute("SELECT path, size, mtime, refreshed FROM files WHERE hash = ?", (khash(hash), )) row = c.fetchone() files = [] while row: @@ -120,6 +136,7 @@ class DB: res = {} res['path'] = file res['size'] = row['size'] + res['refreshed'] = row['refreshed'] files.append(res) row = c.fetchone() c.close() @@ -137,41 +154,47 @@ class DB: row = c.fetchone() return self._removeChanged(file, row) - def refreshFile(self, file): - """Refresh the publishing time of a file. - - If it has changed or is missing, it is removed from the table. - - @return: True if unchanged, False if changed, None if not in database - """ + def refreshHash(self, hash): + """Refresh the publishing time all files with a hash.""" + refreshTime = datetime.now() c = self.conn.cursor() - c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) - row = c.fetchone() - res = None - if row: - res = self._removeChanged(file, row) - if res: - c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path)) - return res + c.execute("UPDATE files SET refreshed = ? WHERE hash = ?", (refreshTime, khash(hash))) + c.close() def expiredFiles(self, expireAfter): """Find files that need refreshing after expireAfter seconds. - Also removes any entries from the table that no longer exist. + For each hash that needs refreshing, finds all the files with that hash. + If the file has changed or is missing, it is removed from the table. @return: dictionary with keys the hashes, values a list of FilePaths """ t = datetime.now() - timedelta(seconds=expireAfter) + + # First find the hashes that need refreshing c = self.conn.cursor() - c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, )) + c.execute("SELECT DISTINCT hash FROM files WHERE refreshed < ?", (t, )) row = c.fetchone() expired = {} while row: - res = self._removeChanged(FilePath(row['path']), row) - if res: - expired.setdefault(row['hash'], []).append(FilePath(row['path'])) + expired.setdefault(row['hash'], []) row = c.fetchone() c.close() + + # Now find the files for each hash + for hash in expired.keys(): + c = self.conn.cursor() + c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), )) + row = c.fetchone() + while row: + res = self._removeChanged(FilePath(row['path']), row) + if res: + expired[hash].append(FilePath(row['path'])) + row = c.fetchone() + if len(expired[hash]) == 0: + del expired[hash] + c.close() + return expired def removeUntrackedFiles(self, dirs): @@ -239,6 +262,12 @@ class TestDB(unittest.TestCase): self.failUnless(res) self.failUnlessEqual(res['hash'], self.hash) + def test_lookupHash(self): + res = self.store.lookupHash(self.hash) + self.failUnless(res) + self.failUnlessEqual(len(res), 1) + self.failUnlessEqual(res[0]['path'].path, self.file.path) + def test_isUnchanged(self): res = self.store.isUnchanged(self.file) self.failUnless(res) @@ -258,8 +287,7 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(len(res.keys()), 1) self.failUnlessEqual(res.keys()[0], self.hash) self.failUnlessEqual(len(res[self.hash]), 1) - res = self.store.refreshFile(self.file) - self.failUnless(res) + self.store.refreshHash(self.hash) res = self.store.expiredFiles(1) self.failUnlessEqual(len(res.keys()), 0) @@ -272,6 +300,25 @@ class TestDB(unittest.TestCase): file.touch() self.store.storeFile(file, self.hash) + def test_multipleHashes(self): + self.build_dirs() + res = self.store.expiredFiles(1) + self.failUnlessEqual(len(res.keys()), 0) + res = self.store.lookupHash(self.hash) + self.failUnless(res) + self.failUnlessEqual(len(res), 4) + self.failUnlessEqual(res[0]['refreshed'], res[1]['refreshed']) + self.failUnlessEqual(res[0]['refreshed'], res[2]['refreshed']) + self.failUnlessEqual(res[0]['refreshed'], res[3]['refreshed']) + sleep(2) + res = self.store.expiredFiles(1) + self.failUnlessEqual(len(res.keys()), 1) + self.failUnlessEqual(res.keys()[0], self.hash) + self.failUnlessEqual(len(res[self.hash]), 4) + self.store.refreshHash(self.hash) + res = self.store.expiredFiles(1) + self.failUnlessEqual(len(res.keys()), 0) + def test_removeUntracked(self): self.build_dirs() res = self.store.removeUntrackedFiles(self.dirs) diff --git a/debian/apt-dht.conf.sgml b/debian/apt-dht.conf.sgml index b481193..6ded80c 100644 --- a/debian/apt-dht.conf.sgml +++ b/debian/apt-dht.conf.sgml @@ -123,6 +123,14 @@ to reload when a new request arrives. (Default is 5 minutes.) + + + + The time after which to refresh DHT keys. + This should be a time slightly less than the DHT's KEY_EXPIRE value. + (Default is 57 minutes.) + + diff --git a/test.py b/test.py index 66f0fc6..5468a1d 100755 --- a/test.py +++ b/test.py @@ -337,6 +337,10 @@ LOCAL_OK = yes # to reload when a new request arrives. UNLOAD_PACKAGES_CACHE = 5m +# Refresh the DHT keys after this much time has passed. +# This should be a time slightly less than the DHT's KEY_EXPIRE value. +KEY_REFRESH = 57m + # Which DHT implementation to use. # It must be possile to do "from .DHT import DHT" to get a class that # implements the IDHT interface. -- 2.39.5