2 from bz2 import BZ2Decompressor
3 from zlib import decompressobj, MAX_WBITS
4 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
5 from urlparse import urlparse
8 from twisted.python import log
9 from twisted.python.filepath import FilePath
10 from twisted.internet import defer, reactor
11 from twisted.trial import unittest
12 from twisted.web2 import stream
13 from twisted.web2.http import splitHostPort
15 from Hash import HashObject
17 aptpkg_dir='apt-packages'
19 DECOMPRESS_EXTS = ['.gz', '.bz2']
20 DECOMPRESS_FILES = ['release', 'sources', 'packages']
22 class ProxyFileStream(stream.SimpleStream):
23 """Saves a stream to a file while providing a new stream."""
25 def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
26 """Initializes the proxy.
28 @type stream: C{twisted.web2.stream.IByteStream}
29 @param stream: the input stream to read from
30 @type outFile: C{twisted.python.FilePath}
31 @param outFile: the file to write to
32 @type hash: L{Hash.HashObject}
33 @param hash: the hash object to use for the file
34 @type decompress: C{string}
35 @param decompress: also decompress the file as this type
36 (currently only '.gz' and '.bz2' are supported)
37 @type decFile: C{twisted.python.FilePath}
38 @param decFile: the file to write the decompressed data to
41 self.outFile = outFile.open('w')
46 if decompress == ".gz":
48 self.gzfile = decFile.open('w')
49 self.gzdec = decompressobj(-MAX_WBITS)
50 elif decompress == ".bz2":
51 self.bz2file = decFile.open('w')
52 self.bz2dec = BZ2Decompressor()
53 self.length = self.stream.length
55 self.doneDefer = defer.Deferred()
58 """Close the output file."""
59 if not self.outFile.closed:
63 data_dec = self.gzdec.flush()
64 self.gzfile.write(data_dec)
71 self.doneDefer.callback(self.hash)
74 """Read some data from the stream."""
75 if self.outFile.closed:
78 data = self.stream.read()
79 if isinstance(data, defer.Deferred):
80 data.addCallbacks(self._write, self._done)
86 def _write(self, data):
87 """Write the stream data to the file and return it for others to use."""
92 self.outFile.write(data)
93 self.hash.update(data)
97 new_data = self._remove_gzip_header(data)
98 dec_data = self.gzdec.decompress(new_data)
100 dec_data = self.gzdec.decompress(data)
101 self.gzfile.write(dec_data)
103 dec_data = self.bz2dec.decompress(data)
104 self.bz2file.write(dec_data)
107 def _remove_gzip_header(self, data):
108 if data[:2] != '\037\213':
109 raise IOError, 'Not a gzipped file'
110 if ord(data[2]) != 8:
111 raise IOError, 'Unknown compression method'
113 # modtime = self.fileobj.read(4)
114 # extraflag = self.fileobj.read(1)
115 # os = self.fileobj.read(1)
119 # Read & discard the extra field, if present
121 xlen = xlen + 256*ord(data[11])
122 skip = skip + 2 + xlen
124 # Read and discard a null-terminated string containing the filename
126 if not data[skip] or data[skip] == '\000':
131 # Read and discard a null-terminated string containing a comment
133 if not data[skip] or data[skip] == '\000':
138 skip += 2 # Read & discard the 16-bit header CRC
142 """Clean everything up and return None to future reads."""
148 """Manages all requests for cached objects."""
150 def __init__(self, cache_dir, db, other_dirs = [], manager = None):
151 self.cache_dir = cache_dir
152 self.other_dirs = other_dirs
153 self.all_dirs = self.other_dirs[:]
154 self.all_dirs.insert(0, self.cache_dir)
156 self.manager = manager
159 # Init the database, remove old files, init the HTTP dirs
160 self.db.removeUntrackedFiles(self.all_dirs)
161 self.db.reconcileDirectories()
162 self.manager.setDirectories(self.db.getAllDirectories())
165 def scanDirectories(self):
166 """Scan the cache directories, hashing new and rehashing changed files."""
167 assert not self.scanning, "a directory scan is already under way"
168 self.scanning = self.all_dirs[:]
169 self._scanDirectories()
171 def _scanDirectories(self, result = None, walker = None):
172 # Need to start waling a new directory
174 # If there are any left, get them
176 log.msg('started scanning directory: %s' % self.scanning[0].path)
177 walker = self.scanning[0].walk()
179 # Done, just check if the HTTP directories need updating
180 log.msg('cache directory scan complete')
181 if self.db.reconcileDirectories():
182 self.manager.setDirectories(self.db.getAllDirectories())
186 # Get the next file in the directory
188 except StopIteration:
189 # No files left, go to the next directory
190 log.msg('done scanning directory: %s' % self.scanning[0].path)
192 reactor.callLater(0, self._scanDirectories)
195 # If it's not a file, or it's already properly in the DB, ignore it
196 if not file.isfile() or self.db.isUnchanged(file):
197 if not file.isfile():
198 log.msg('entering directory: %s' % file.path)
200 log.msg('file is unchanged: %s' % file.path)
201 reactor.callLater(0, self._scanDirectories, None, walker)
205 log.msg('start hash checking file: %s' % file.path)
207 df = hash.hashInThread(file)
208 df.addBoth(self._doneHashing, file, walker)
209 df.addErrback(log.err)
211 def _doneHashing(self, result, file, walker):
213 if isinstance(result, HashObject):
214 log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
215 if self.scanning[0] == self.cache_dir:
216 mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
217 urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
218 url = 'http:/' + file.path[len(self.cache_dir.path):]
220 urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
223 self.manager.setDirectories(self.db.getAllDirectories())
224 df = self.manager.new_cached_file(file, result, urlpath, url)
226 reactor.callLater(0, self._scanDirectories, None, walker)
228 df.addBoth(self._scanDirectories, walker)
230 log.msg('hash check of %s failed' % file.path)
232 reactor.callLater(0, self._scanDirectories, None, walker)
234 def save_file(self, response, hash, url):
235 """Save a downloaded file to the cache and stream it."""
236 if response.code != 200:
237 log.msg('File was not found (%r): %s' % (response, url))
240 log.msg('Returning file: %s' % url)
242 parsed = urlparse(url)
243 destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
244 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
246 if destFile.exists():
247 log.msg('File already exists, removing: %s' % destFile.path)
249 elif not destFile.parent().exists():
250 destFile.parent().makedirs()
252 root, ext = os.path.splitext(destFile.basename())
253 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
255 decFile = destFile.sibling(root)
256 log.msg('Decompressing to: %s' % decFile.path)
258 log.msg('File already exists, removing: %s' % decFile.path)
264 orig_stream = response.stream
265 response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
266 response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
267 response.headers.getHeader('Last-Modified'),
269 response.stream.doneDefer.addErrback(self.save_error, url)
272 def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
273 """Update the modification time and AptPackages."""
275 os.utime(destFile.path, (modtime, modtime))
277 os.utime(decFile.path, (modtime, modtime))
279 result = hash.verify()
280 if result or result is None:
282 log.msg('Hashes match: %s' % url)
284 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
286 mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
287 urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
288 log.msg('now avaliable at %s: %s' % (urlpath, url))
292 log.msg('A new web directory was created, so enable it')
293 self.manager.setDirectories(self.db.getAllDirectories())
295 self.manager.new_cached_file(destFile, hash, urlpath, url)
297 self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
299 log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
304 def save_error(self, failure, url):
305 """An error has occurred in downloadign or saving the file."""
306 log.msg('Error occurred downloading %s' % url)
310 class TestMirrorManager(unittest.TestCase):
311 """Unit tests for the mirror manager."""
318 self.client = CacheManager(FilePath('/tmp/.apt-dht'))
321 for p in self.pending_calls: