2 from bz2 import BZ2Decompressor
3 from zlib import decompressobj, MAX_WBITS
4 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
5 from urlparse import urlparse
8 from twisted.python import log
9 from twisted.python.filepath import FilePath
10 from twisted.internet import defer
11 from twisted.trial import unittest
12 from twisted.web2 import stream
13 from twisted.web2.http import splitHostPort
15 from AptPackages import AptPackages
17 aptpkg_dir='apt-packages'
19 DECOMPRESS_EXTS = ['.gz', '.bz2']
20 DECOMPRESS_FILES = ['release', 'sources', 'packages']
22 class ProxyFileStream(stream.SimpleStream):
23 """Saves a stream to a file while providing a new stream."""
25 def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
26 """Initializes the proxy.
28 @type stream: C{twisted.web2.stream.IByteStream}
29 @param stream: the input stream to read from
30 @type outFile: C{twisted.python.FilePath}
31 @param outFile: the file to write to
32 @type hash: L{Hash.HashObject}
33 @param hash: the hash object to use for the file
34 @type decompress: C{string}
35 @param decompress: also decompress the file as this type
36 (currently only '.gz' and '.bz2' are supported)
37 @type decFile: C{twisted.python.FilePath}
38 @param decFile: the file to write the decompressed data to
41 self.outFile = outFile.open('w')
46 if decompress == ".gz":
48 self.gzfile = decFile.open('w')
49 self.gzdec = decompressobj(-MAX_WBITS)
50 elif decompress == ".bz2":
51 self.bz2file = decFile.open('w')
52 self.bz2dec = BZ2Decompressor()
53 self.length = self.stream.length
55 self.doneDefer = defer.Deferred()
58 """Close the output file."""
59 if not self.outFile.closed:
63 data_dec = self.gzdec.flush()
64 self.gzfile.write(data_dec)
71 self.doneDefer.callback(self.hash)
74 """Read some data from the stream."""
75 if self.outFile.closed:
78 data = self.stream.read()
79 if isinstance(data, defer.Deferred):
80 data.addCallbacks(self._write, self._done)
86 def _write(self, data):
87 """Write the stream data to the file and return it for others to use."""
92 self.outFile.write(data)
93 self.hash.update(data)
97 new_data = self._remove_gzip_header(data)
98 dec_data = self.gzdec.decompress(new_data)
100 dec_data = self.gzdec.decompress(data)
101 self.gzfile.write(dec_data)
103 dec_data = self.bz2dec.decompress(data)
104 self.bz2file.write(dec_data)
107 def _remove_gzip_header(self, data):
108 if data[:2] != '\037\213':
109 raise IOError, 'Not a gzipped file'
110 if ord(data[2]) != 8:
111 raise IOError, 'Unknown compression method'
113 # modtime = self.fileobj.read(4)
114 # extraflag = self.fileobj.read(1)
115 # os = self.fileobj.read(1)
119 # Read & discard the extra field, if present
121 xlen = xlen + 256*ord(data[11])
122 skip = skip + 2 + xlen
124 # Read and discard a null-terminated string containing the filename
126 if not data[skip] or data[skip] == '\000':
131 # Read and discard a null-terminated string containing a comment
133 if not data[skip] or data[skip] == '\000':
138 skip += 2 # Read & discard the 16-bit header CRC
142 """Clean everything up and return None to future reads."""
148 """Manages all requests for cached objects."""
150 def __init__(self, cache_dir, db, manager = None):
151 self.cache_dir = cache_dir
153 self.manager = manager
155 def save_file(self, response, hash, url):
156 """Save a downloaded file to the cache and stream it."""
157 if response.code != 200:
158 log.msg('File was not found (%r): %s' % (response, url))
161 log.msg('Returning file: %s' % url)
163 parsed = urlparse(url)
164 destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
165 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
167 if destFile.exists():
168 log.msg('File already exists, removing: %s' % destFile.path)
170 elif not destFile.parent().exists():
171 destFile.parent().makedirs()
173 root, ext = os.path.splitext(destFile.basename())
174 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
176 decFile = destFile.sibling(root)
177 log.msg('Decompressing to: %s' % decFile.path)
179 log.msg('File already exists, removing: %s' % decFile.path)
185 orig_stream = response.stream
186 response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
187 response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
188 response.headers.getHeader('Last-Modified'),
190 response.stream.doneDefer.addErrback(self.save_error, url)
193 def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
194 """Update the modification time and AptPackages."""
196 os.utime(destFile.path, (modtime, modtime))
198 os.utime(decFile.path, (modtime, modtime))
200 result = hash.verify()
201 if result or result is None:
203 log.msg('Hashes match: %s' % url)
205 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
207 urlpath, newdir = self.db.storeFile(destFile, hash.digest(), self.cache_dir)
208 log.msg('now avaliable at %s: %s' % (urlpath, url))
209 if newdir and self.manager:
210 log.msg('A new web directory was created, so enable it')
211 self.manager.setDirectories(self.db.getAllDirectories())
214 self.manager.new_cached_file(url, destFile, hash, urlpath)
216 self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath)
218 log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
223 def save_error(self, failure, url):
224 """An error has occurred in downloadign or saving the file."""
225 log.msg('Error occurred downloading %s' % url)
229 class TestMirrorManager(unittest.TestCase):
230 """Unit tests for the mirror manager."""
237 self.client = CacheManager(FilePath('/tmp/.apt-dht'))
240 for p in self.pending_calls: