2 """Manage a cache of downloaded files.
4 @var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed
5 @var DECOMPRESS_FILES: a list of file names that need to be decompressed
8 from bz2 import BZ2Decompressor
9 from zlib import decompressobj, MAX_WBITS
10 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
11 from urlparse import urlparse
14 from twisted.python import log
15 from twisted.python.filepath import FilePath
16 from twisted.internet import defer, reactor
17 from twisted.trial import unittest
18 from twisted.web2 import stream
19 from twisted.web2.http import splitHostPort
21 from Hash import HashObject
22 from apt_p2p_conf import config
24 DECOMPRESS_EXTS = ['.gz', '.bz2']
25 DECOMPRESS_FILES = ['release', 'sources', 'packages']
27 class CacheError(Exception):
28 """Error occurred downloading a file to the cache."""
30 class ProxyFileStream(stream.SimpleStream):
31 """Saves a stream to a file while providing a new stream.
33 Also optionally decompresses the file while it is being downloaded.
35 @type stream: L{twisted.web2.stream.IByteStream}
36 @ivar stream: the input stream being read
37 @type outFile: L{twisted.python.filepath.FilePath}
38 @ivar outFile: the file being written
39 @type hash: L{Hash.HashObject}
40 @ivar hash: the hash object for the file
42 @ivar gzfile: the open file to write decompressed gzip data to
43 @type gzdec: L{zlib.decompressobj}
44 @ivar gzdec: the decompressor to use for the compressed gzip data
45 @type gzheader: C{boolean}
46 @ivar gzheader: whether the gzip header still needs to be removed from
47 the zlib compressed data
48 @type bz2file: C{file}
49 @ivar bz2file: the open file to write decompressed bz2 data to
50 @type bz2dec: L{bz2.BZ2Decompressor}
51 @ivar bz2dec: the decompressor to use for the compressed bz2 data
53 @ivar length: the length of the original (compressed) file
54 @type doneDefer: L{twisted.internet.defer.Deferred}
55 @ivar doneDefer: the deferred that will fire when done streaming
57 @group Stream implementation: read, close
61 def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
62 """Initializes the proxy.
64 @type stream: L{twisted.web2.stream.IByteStream}
65 @param stream: the input stream to read from
66 @type outFile: L{twisted.python.filepath.FilePath}
67 @param outFile: the file to write to
68 @type hash: L{Hash.HashObject}
69 @param hash: the hash object to use for the file
70 @type decompress: C{string}
71 @param decompress: also decompress the file as this type
72 (currently only '.gz' and '.bz2' are supported)
73 @type decFile: C{twisted.python.FilePath}
74 @param decFile: the file to write the decompressed data to
77 self.outFile = outFile.open('w')
82 if decompress == ".gz":
84 self.gzfile = decFile.open('w')
85 self.gzdec = decompressobj(-MAX_WBITS)
86 elif decompress == ".bz2":
87 self.bz2file = decFile.open('w')
88 self.bz2dec = BZ2Decompressor()
89 self.length = self.stream.length
90 self.doneDefer = defer.Deferred()
93 """Close all the output files, return the result."""
94 if not self.outFile.closed:
98 # Finish the decompression
99 data_dec = self.gzdec.flush()
100 self.gzfile.write(data_dec)
107 def _error(self, err):
108 """Close all the output files, return the error."""
109 if not self.outFile.closed:
112 self.doneDefer.errback(err)
115 """Read some data from the stream."""
116 if self.outFile.closed:
119 # Read data from the stream, deal with the possible deferred
120 data = self.stream.read()
121 if isinstance(data, defer.Deferred):
122 data.addCallbacks(self._write, self._error)
128 def _write(self, data):
129 """Write the stream data to the file and return it for others to use.
131 Also optionally decompresses it.
134 if not self.outFile.closed:
136 self.doneDefer.callback(self.hash)
139 # Write and hash the streamed data
140 self.outFile.write(data)
141 self.hash.update(data)
144 # Decompress the zlib portion of the file
146 # Remove the gzip header junk
147 self.gzheader = False
148 new_data = self._remove_gzip_header(data)
149 dec_data = self.gzdec.decompress(new_data)
151 dec_data = self.gzdec.decompress(data)
152 self.gzfile.write(dec_data)
154 # Decompress the bz2 file
155 dec_data = self.bz2dec.decompress(data)
156 self.bz2file.write(dec_data)
160 def _remove_gzip_header(self, data):
161 """Remove the gzip header from the zlib compressed data."""
162 # Read, check & discard the header fields
163 if data[:2] != '\037\213':
164 raise IOError, 'Not a gzipped file'
165 if ord(data[2]) != 8:
166 raise IOError, 'Unknown compression method'
168 # modtime = self.fileobj.read(4)
169 # extraflag = self.fileobj.read(1)
170 # os = self.fileobj.read(1)
174 # Read & discard the extra field
176 xlen = xlen + 256*ord(data[11])
177 skip = skip + 2 + xlen
179 # Read and discard a null-terminated string containing the filename
181 if not data[skip] or data[skip] == '\000':
186 # Read and discard a null-terminated string containing a comment
188 if not data[skip] or data[skip] == '\000':
193 skip += 2 # Read & discard the 16-bit header CRC
198 """Clean everything up and return None to future reads."""
199 log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
200 if self.hash.size < self.length:
201 self._error(CacheError('Prematurely closed, all data was not written'))
202 elif not self.outFile.closed:
204 self.doneDefer.callback(self.hash)
209 """Manages all downloaded files and requests for cached objects.
211 @type cache_dir: L{twisted.python.filepath.FilePath}
212 @ivar cache_dir: the directory to use for storing all files
213 @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
214 @ivar other_dirs: the other directories that have shared files in them
215 @type all_dirs: C{list} of L{twisted.python.filepath.FilePath}
216 @ivar all_dirs: all the directories that have cached files in them
218 @ivar db: the database to use for tracking files and hashes
219 @type manager: L{apt_p2p.AptP2P}
220 @ivar manager: the main program object to send requests to
221 @type scanning: C{list} of L{twisted.python.filepath.FilePath}
222 @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
225 def __init__(self, cache_dir, db, manager = None):
226 """Initialize the instance and remove any untracked files from the DB..
228 @type cache_dir: L{twisted.python.filepath.FilePath}
229 @param cache_dir: the directory to use for storing all files
231 @param db: the database to use for tracking files and hashes
232 @type manager: L{apt_p2p.AptP2P}
233 @param manager: the main program object to send requests to
234 (optional, defaults to not calling back with cached files)
236 self.cache_dir = cache_dir
237 self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
238 self.all_dirs = self.other_dirs[:]
239 self.all_dirs.insert(0, self.cache_dir)
241 self.manager = manager
244 # Init the database, remove old files
245 self.db.removeUntrackedFiles(self.all_dirs)
247 #{ Scanning directories
248 def scanDirectories(self):
249 """Scan the cache directories, hashing new and rehashing changed files."""
250 assert not self.scanning, "a directory scan is already under way"
251 self.scanning = self.all_dirs[:]
252 self._scanDirectories()
254 def _scanDirectories(self, result = None, walker = None):
255 """Walk each directory looking for cached files.
257 @param result: the result of a DHT store request, not used (optional)
258 @param walker: the walker to use to traverse the current directory
259 (optional, defaults to creating a new walker from the first
260 directory in the L{CacheManager.scanning} list)
262 # Need to start walking a new directory
264 # If there are any left, get them
266 log.msg('started scanning directory: %s' % self.scanning[0].path)
267 walker = self.scanning[0].walk()
269 log.msg('cache directory scan complete')
273 # Get the next file in the directory
275 except StopIteration:
276 # No files left, go to the next directory
277 log.msg('done scanning directory: %s' % self.scanning[0].path)
279 reactor.callLater(0, self._scanDirectories)
282 # If it's not a file ignore it
283 if not file.isfile():
284 log.msg('entering directory: %s' % file.path)
285 reactor.callLater(0, self._scanDirectories, None, walker)
288 # If it's already properly in the DB, ignore it
289 db_status = self.db.isUnchanged(file)
291 log.msg('file is unchanged: %s' % file.path)
292 reactor.callLater(0, self._scanDirectories, None, walker)
295 # Don't hash files in the cache that are not in the DB
296 if self.scanning[0] == self.cache_dir:
297 if db_status is None:
298 log.msg('ignoring unknown cache file: %s' % file.path)
300 log.msg('removing changed cache file: %s' % file.path)
302 reactor.callLater(0, self._scanDirectories, None, walker)
306 log.msg('start hash checking file: %s' % file.path)
308 df = hash.hashInThread(file)
309 df.addBoth(self._doneHashing, file, walker)
310 df.addErrback(log.err)
312 def _doneHashing(self, result, file, walker):
313 """If successful, add the hashed file to the DB and inform the main program."""
314 if isinstance(result, HashObject):
315 log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
317 # Only set a URL if this is a downloaded file
319 if self.scanning[0] == self.cache_dir:
320 url = 'http:/' + file.path[len(self.cache_dir.path):]
322 # Store the hashed file in the database
323 new_hash = self.db.storeFile(file, result.digest(),
324 ''.join(result.pieceDigests()))
326 # Tell the main program to handle the new cache file
327 df = self.manager.new_cached_file(file, result, new_hash, url, True)
329 reactor.callLater(0, self._scanDirectories, None, walker)
331 df.addBoth(self._scanDirectories, walker)
333 # Must have returned an error
334 log.msg('hash check of %s failed' % file.path)
336 reactor.callLater(0, self._scanDirectories, None, walker)
339 def save_file(self, response, hash, url):
340 """Save a downloaded file to the cache and stream it.
342 @type response: L{twisted.web2.http.Response}
343 @param response: the response from the download
344 @type hash: L{Hash.HashObject}
345 @param hash: the hash object containing the expected hash for the file
346 @param url: the URI of the actual mirror request
347 @rtype: L{twisted.web2.http.Response}
348 @return: the final response from the download
350 if response.code != 200:
351 log.msg('File was not found (%r): %s' % (response, url))
354 log.msg('Returning file: %s' % url)
356 # Set the destination path for the file
357 parsed = urlparse(url)
358 destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
359 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
361 # Make sure there's a free place for the file
362 if destFile.exists():
363 log.msg('File already exists, removing: %s' % destFile.path)
365 if not destFile.parent().exists():
366 destFile.parent().makedirs()
368 # Determine whether it needs to be decompressed and how
369 root, ext = os.path.splitext(destFile.basename())
370 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
372 decFile = destFile.sibling(root)
373 log.msg('Decompressing to: %s' % decFile.path)
375 log.msg('File already exists, removing: %s' % decFile.path)
381 # Create the new stream from the old one.
382 orig_stream = response.stream
383 response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
384 response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
385 response.headers.getHeader('Last-Modified'),
387 response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile)
389 # Return the modified response with the new stream
392 def _save_complete(self, hash, url, destFile, modtime = None, decFile = None):
393 """Update the modification time and inform the main program.
395 @type hash: L{Hash.HashObject}
396 @param hash: the hash object containing the expected hash for the file
397 @param url: the URI of the actual mirror request
398 @type destFile: C{twisted.python.FilePath}
399 @param destFile: the file where the download was written to
400 @type modtime: C{int}
401 @param modtime: the modified time of the cached file (seconds since epoch)
402 (optional, defaults to not setting the modification time of the file)
403 @type decFile: C{twisted.python.FilePath}
404 @param decFile: the file where the decompressed download was written to
405 (optional, defaults to the file not having been compressed)
408 os.utime(destFile.path, (modtime, modtime))
410 os.utime(decFile.path, (modtime, modtime))
412 result = hash.verify()
413 if result or result is None:
415 log.msg('Hashes match: %s' % url)
417 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
419 new_hash = self.db.storeFile(destFile, hash.digest(),
420 ''.join(hash.pieceDigests()))
421 log.msg('now avaliable: %s' % (url))
424 self.manager.new_cached_file(destFile, hash, new_hash, url)
426 ext_len = len(destFile.path) - len(decFile.path)
427 self.manager.new_cached_file(decFile, None, False, url[:-ext_len])
429 log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
434 def _save_error(self, failure, url, destFile, decFile = None):
435 """Remove the destination files."""
436 log.msg('Error occurred downloading %s' % url)
438 destFile.restat(False)
439 if destFile.exists():
440 log.msg('Removing the incomplete file: %s' % destFile.path)
443 decFile.restat(False)
445 log.msg('Removing the incomplete file: %s' % decFile.path)
448 def save_error(self, failure, url):
449 """An error has occurred in downloading or saving the file"""
450 log.msg('Error occurred downloading %s' % url)
454 class TestMirrorManager(unittest.TestCase):
455 """Unit tests for the mirror manager."""
462 self.client = CacheManager(FilePath('/tmp/.apt-p2p'))
465 for p in self.pending_calls: