2 """Manage a cache of downloaded files.
4 @var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed
5 @var DECOMPRESS_FILES: a list of file names that need to be decompressed
8 from urlparse import urlparse
11 from twisted.python import log
12 from twisted.python.filepath import FilePath
13 from twisted.internet import defer, reactor
14 from twisted.trial import unittest
15 from twisted.web2.http import splitHostPort
17 from Streams import GrowingFileStream, StreamToFile
18 from Hash import HashObject
19 from apt_p2p_conf import config
21 DECOMPRESS_EXTS = ['.gz', '.bz2']
22 DECOMPRESS_FILES = ['release', 'sources', 'packages']
24 class CacheError(Exception):
25 """Error occurred downloading a file to the cache."""
28 """Manages all downloaded files and requests for cached objects.
30 @type cache_dir: L{twisted.python.filepath.FilePath}
31 @ivar cache_dir: the directory to use for storing all files
32 @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
33 @ivar other_dirs: the other directories that have shared files in them
34 @type all_dirs: C{list} of L{twisted.python.filepath.FilePath}
35 @ivar all_dirs: all the directories that have cached files in them
37 @ivar db: the database to use for tracking files and hashes
38 @type manager: L{apt_p2p.AptP2P}
39 @ivar manager: the main program object to send requests to
40 @type scanning: C{list} of L{twisted.python.filepath.FilePath}
41 @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
44 def __init__(self, cache_dir, db, manager = None):
45 """Initialize the instance and remove any untracked files from the DB..
47 @type cache_dir: L{twisted.python.filepath.FilePath}
48 @param cache_dir: the directory to use for storing all files
50 @param db: the database to use for tracking files and hashes
51 @type manager: L{apt_p2p.AptP2P}
52 @param manager: the main program object to send requests to
53 (optional, defaults to not calling back with cached files)
55 self.cache_dir = cache_dir
56 self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
57 self.all_dirs = self.other_dirs[:]
58 self.all_dirs.insert(0, self.cache_dir)
60 self.manager = manager
63 # Init the database, remove old files
64 self.db.removeUntrackedFiles(self.all_dirs)
66 #{ Scanning directories
67 def scanDirectories(self, result = None):
68 """Scan the cache directories, hashing new and rehashing changed files."""
69 assert not self.scanning, "a directory scan is already under way"
70 self.scanning = self.all_dirs[:]
71 self._scanDirectories()
73 def _scanDirectories(self, result = None, walker = None):
74 """Walk each directory looking for cached files.
76 @param result: the result of a DHT store request, not used (optional)
77 @param walker: the walker to use to traverse the current directory
78 (optional, defaults to creating a new walker from the first
79 directory in the L{CacheManager.scanning} list)
81 # Need to start walking a new directory
83 # If there are any left, get them
85 log.msg('started scanning directory: %s' % self.scanning[0].path)
86 walker = self.scanning[0].walk()
88 log.msg('cache directory scan complete')
92 # Get the next file in the directory
95 # No files left, go to the next directory
96 log.msg('done scanning directory: %s' % self.scanning[0].path)
98 reactor.callLater(0, self._scanDirectories)
101 # If it's not a file ignore it
102 if not file.isfile():
103 reactor.callLater(0, self._scanDirectories, None, walker)
106 # If it's already properly in the DB, ignore it
107 db_status = self.db.isUnchanged(file)
109 reactor.callLater(0, self._scanDirectories, None, walker)
112 # Don't hash files in the cache that are not in the DB
113 if self.scanning[0] == self.cache_dir:
114 if db_status is None:
115 log.msg('ignoring unknown cache file: %s' % file.path)
117 log.msg('removing changed cache file: %s' % file.path)
119 reactor.callLater(0, self._scanDirectories, None, walker)
123 log.msg('start hash checking file: %s' % file.path)
125 df = hash.hashInThread(file)
126 df.addBoth(self._doneHashing, file, walker)
128 def _doneHashing(self, result, file, walker):
129 """If successful, add the hashed file to the DB and inform the main program."""
130 if isinstance(result, HashObject):
131 log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
133 # Only set a URL if this is a downloaded file
135 if self.scanning[0] == self.cache_dir:
136 url = 'http:/' + file.path[len(self.cache_dir.path):]
138 # Store the hashed file in the database
139 new_hash = self.db.storeFile(file, result.digest(), True,
140 ''.join(result.pieceDigests()))
142 # Tell the main program to handle the new cache file
143 df = self.manager.new_cached_file(file, result, new_hash, url, True)
145 reactor.callLater(0, self._scanDirectories, None, walker)
147 df.addBoth(self._scanDirectories, walker)
149 # Must have returned an error
150 log.msg('hash check of %s failed' % file.path)
152 reactor.callLater(0, self._scanDirectories, None, walker)
155 def save_file(self, response, hash, url):
156 """Save a downloaded file to the cache and stream it.
158 @type response: L{twisted.web2.http.Response}
159 @param response: the response from the download
160 @type hash: L{Hash.HashObject}
161 @param hash: the hash object containing the expected hash for the file
162 @param url: the URI of the actual mirror request
163 @rtype: L{twisted.web2.http.Response}
164 @return: the final response from the download
166 if response.code != 200:
167 log.msg('File was not found (%r): %s' % (response, url))
170 log.msg('Returning file: %s' % url)
172 # Set the destination path for the file
173 parsed = urlparse(url)
174 destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
175 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
177 # Make sure there's a free place for the file
178 if destFile.exists():
179 log.msg('File already exists, removing: %s' % destFile.path)
181 if not destFile.parent().exists():
182 destFile.parent().makedirs()
184 # Determine whether it needs to be decompressed and how
185 root, ext = os.path.splitext(destFile.basename())
186 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
188 decFile = destFile.sibling(root)
189 log.msg('Decompressing to: %s' % decFile.path)
191 log.msg('File already exists, removing: %s' % decFile.path)
197 # Create the new stream from the old one.
198 orig_stream = response.stream
199 f = destFile.open('w+')
200 new_stream = GrowingFileStream(f, orig_stream.length)
202 df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable,
203 decompress = ext, decFile = decFile).run()
204 df.addCallback(self._save_complete, url, destFile, new_stream,
205 response.headers.getHeader('Last-Modified'), decFile)
206 df.addErrback(self._save_error, url, destFile, new_stream, decFile)
207 response.stream = new_stream
209 # Return the modified response with the new stream
212 def _save_complete(self, hash, url, destFile, destStream = None,
213 modtime = None, decFile = None):
214 """Update the modification time and inform the main program.
216 @type hash: L{Hash.HashObject}
217 @param hash: the hash object containing the expected hash for the file
218 @param url: the URI of the actual mirror request
219 @type destFile: C{twisted.python.FilePath}
220 @param destFile: the file where the download was written to
221 @type destStream: L{Streams.GrowingFileStream}
222 @param destStream: the stream to notify that all data is available
223 @type modtime: C{int}
224 @param modtime: the modified time of the cached file (seconds since epoch)
225 (optional, defaults to not setting the modification time of the file)
226 @type decFile: C{twisted.python.FilePath}
227 @param decFile: the file where the decompressed download was written to
228 (optional, defaults to the file not having been compressed)
230 result = hash.verify()
231 if result or result is None:
233 destStream.allAvailable()
235 os.utime(destFile.path, (modtime, modtime))
238 log.msg('Hashes match: %s' % url)
241 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
244 new_hash = self.db.storeFile(destFile, hash.digest(), dht,
245 ''.join(hash.pieceDigests()))
248 self.manager.new_cached_file(destFile, hash, new_hash, url)
251 # Hash the decompressed file and add it to the DB
252 decHash = HashObject()
253 ext_len = len(destFile.path) - len(decFile.path)
254 df = decHash.hashInThread(decFile)
255 df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime)
256 df.addErrback(self._save_error, url[:-ext_len], decFile)
258 log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
260 destStream.allAvailable(remove = True)
264 def _save_error(self, failure, url, destFile, destStream = None, decFile = None):
265 """Remove the destination files."""
266 log.msg('Error occurred downloading %s' % url)
269 destStream.allAvailable(remove = True)
271 destFile.restat(False)
272 if destFile.exists():
273 log.msg('Removing the incomplete file: %s' % destFile.path)
276 decFile.restat(False)
278 log.msg('Removing the incomplete file: %s' % decFile.path)
281 def save_error(self, failure, url):
282 """An error has occurred in downloading or saving the file"""
283 log.msg('Error occurred downloading %s' % url)
287 class TestMirrorManager(unittest.TestCase):
288 """Unit tests for the mirror manager."""
295 self.client = CacheManager(FilePath('/tmp/.apt-p2p'))
298 for p in self.pending_calls: