X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht%2Fapt_dht.py;h=94260bd4c396a4c216e2952c9094ae2a1cdaf0cb;hb=d900237088b7832d2554c31b7436977bc5669348;hp=fd7b73d81ce5ba8053431578491dbc31a5424222;hpb=5f57d64912b39b546c724f79eb0a3a15ecce7b0a;p=quix0rs-apt-p2p.git diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index fd7b73d..94260bd 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -1,9 +1,9 @@ from binascii import b2a_hex from urlparse import urlunparse -import os, re +import os, re, sha -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.web2 import server, http, http_headers, static from twisted.python import log, failure from twisted.python.filepath import FilePath @@ -15,7 +15,10 @@ from MirrorManager import MirrorManager from CacheManager import CacheManager from Hash import HashObject from db import DB -from util import findMyIPAddr +from util import findMyIPAddr, compact + +DHT_PIECES = 4 +TORRENT_PIECES = 70 download_dir = 'cache' @@ -32,24 +35,48 @@ class AptDHT: self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) self.getHTTPFactory = self.http_server.getHTTPFactory self.peers = PeerManager() - self.mirrors = MirrorManager(self.cache_dir) + self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE')) other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')] self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self) - self.my_addr = None + self.my_contact = None def joinComplete(self, result): - self.my_addr = findMyIPAddr(result, - config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), - config.getboolean('DEFAULT', 'LOCAL_OK')) - if not self.my_addr: + my_addr = findMyIPAddr(result, + config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), + config.getboolean('DEFAULT', 'LOCAL_OK')) + if not my_addr: raise RuntimeError, "IP address for this machine could not be found" + self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) self.cache.scanDirectories() + reactor.callLater(60, self.refreshFiles) def joinError(self, failure): log.msg("joining DHT failed miserably") log.err(failure) raise RuntimeError, "IP address for this machine could not be found" + def refreshFiles(self): + """Refresh any files in the DHT that are about to expire.""" + expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') + hashes = self.db.expiredHashes(expireAfter) + if len(hashes.keys()) > 0: + log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) + self._refreshFiles(None, hashes) + + def _refreshFiles(self, result, hashes): + if result is not None: + log.msg('Storage resulted in: %r' % result) + + if hashes: + raw_hash = hashes.keys()[0] + self.db.refreshHash(raw_hash) + hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) + del hashes[raw_hash] + storeDefer = self.store(hash) + storeDefer.addBoth(self._refreshFiles, hashes) + else: + reactor.callLater(60, self.refreshFiles) + def check_freshness(self, req, path, modtime, resp): log.msg('Checking if %s is still fresh' % path) d = self.peers.get('', path, method = "HEAD", modtime = modtime) @@ -87,7 +114,7 @@ class AptDHT: log.msg('Found hash %s for %s' % (hash.hexexpected(), path)) # Lookup hash in cache - locations = self.db.lookupHash(hash.expected()) + locations = self.db.lookupHash(hash.expected(), filesOnly = True) self.getCachedFile(hash, req, path, d, locations) def getCachedFile(self, hash, req, path, d, locations): @@ -129,17 +156,17 @@ class AptDHT: lookupDefer = self.dht.getValue(key) lookupDefer.addCallback(self.lookupHash_done, hash, path, d) - def lookupHash_done(self, locations, hash, path, d): - if not locations: + def lookupHash_done(self, values, hash, path, d): + if not values: log.msg('Peers for %s were not found' % path) getDefer = self.peers.get(hash, path) getDefer.addCallback(self.cache.save_file, hash, path) getDefer.addErrback(self.cache.save_error, path) getDefer.addCallbacks(d.callback, d.errback) else: - log.msg('Found peers for %s: %r' % (path, locations)) + log.msg('Found peers for %s: %r' % (path, values)) # Download from the found peers - getDefer = self.peers.get(hash, path, locations) + getDefer = self.peers.get(hash, path, values) getDefer.addCallback(self.check_response, hash, path) getDefer.addCallback(self.cache.save_file, hash, path) getDefer.addErrback(self.cache.save_error, path) @@ -152,24 +179,52 @@ class AptDHT: return getDefer return response - def new_cached_file(self, file_path, hash, url = None, forceDHT = False): - """Add a newly cached file to the DHT. + def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False): + """Add a newly cached file to the appropriate places. If the file was downloaded, set url to the path it was downloaded for. - Don't add a file to the DHT unless a hash was found for it - (but do add it anyway if forceDHT is True). + Doesn't add a file to the DHT unless a hash was found for it + (but does add it anyway if forceDHT is True). """ if url: self.mirrors.updatedFile(url, file_path) - if self.my_addr and hash and (hash.expected() is not None or forceDHT): - site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT')) - key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) - storeDefer = self.dht.storeValue(key, site) - storeDefer.addCallback(self.store_done, hash) - return storeDefer + if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT): + return self.store(hash) return None + + def store(self, hash): + """Add a file to the DHT.""" + key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) + value = {'c': self.my_contact} + pieces = hash.pieceDigests() + if len(pieces) <= 1: + pass + elif len(pieces) <= DHT_PIECES: + value['t'] = {'t': ''.join(pieces)} + elif len(pieces) <= TORRENT_PIECES: + s = sha.new().update(''.join(pieces)) + value['h'] = s.digest() + else: + s = sha.new().update(''.join(pieces)) + value['l'] = s.digest() + storeDefer = self.dht.storeValue(key, value) + storeDefer.addCallback(self.store_done, hash) + return storeDefer def store_done(self, result, hash): - log.msg('Added %s to the DHT: %r' % (hash, result)) - \ No newline at end of file + log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result)) + pieces = hash.pieceDigests() + if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES: + s = sha.new().update(''.join(pieces)) + key = s.digest() + value = {'t': ''.join(pieces)} + storeDefer = self.dht.storeValue(key, value) + storeDefer.addCallback(self.store_torrent_done, key) + return storeDefer + return result + + def store_torrent_done(self, result, key): + log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result)) + return result + \ No newline at end of file