2 from bz2 import BZ2Decompressor
3 from zlib import decompressobj, MAX_WBITS
4 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
5 from urlparse import urlparse
6 from binascii import a2b_hex, b2a_hex
9 from twisted.python import log, filepath
10 from twisted.internet import defer
11 from twisted.trial import unittest
12 from twisted.web2 import stream
13 from twisted.web2.http import splitHostPort
15 from AptPackages import AptPackages
19 DECOMPRESS_EXTS = ['.gz', '.bz2']
20 DECOMPRESS_FILES = ['release', 'sources', 'packages']
22 class MirrorError(Exception):
23 """Exception raised when there's a problem with the mirror."""
25 class ProxyFileStream(stream.SimpleStream):
26 """Saves a stream to a file while providing a new stream."""
28 def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
29 """Initializes the proxy.
31 @type stream: C{twisted.web2.stream.IByteStream}
32 @param stream: the input stream to read from
33 @type outFile: C{twisted.python.filepath.FilePath}
34 @param outFile: the file to write to
35 @type hash: L{Hash.HashObject}
36 @param hash: the hash object to use for the file
37 @type decompress: C{string}
38 @param decompress: also decompress the file as this type
39 (currently only '.gz' and '.bz2' are supported)
40 @type decFile: C{twisted.python.filepath.FilePath}
41 @param decFile: the file to write the decompressed data to
44 self.outFile = outFile.open('w')
49 if decompress == ".gz":
51 self.gzfile = decFile.open('w')
52 self.gzdec = decompressobj(-MAX_WBITS)
53 elif decompress == ".bz2":
54 self.bz2file = decFile.open('w')
55 self.bz2dec = BZ2Decompressor()
56 self.length = self.stream.length
58 self.doneDefer = defer.Deferred()
61 """Close the output file."""
62 if not self.outFile.closed:
66 data_dec = self.gzdec.flush()
67 self.gzfile.write(data_dec)
74 self.doneDefer.callback(self.hash)
77 """Read some data from the stream."""
78 if self.outFile.closed:
81 data = self.stream.read()
82 if isinstance(data, defer.Deferred):
83 data.addCallbacks(self._write, self._done)
89 def _write(self, data):
90 """Write the stream data to the file and return it for others to use."""
95 self.outFile.write(data)
96 self.hash.update(data)
100 new_data = self._remove_gzip_header(data)
101 dec_data = self.gzdec.decompress(new_data)
103 dec_data = self.gzdec.decompress(data)
104 self.gzfile.write(dec_data)
106 dec_data = self.bz2dec.decompress(data)
107 self.bz2file.write(dec_data)
110 def _remove_gzip_header(self, data):
111 if data[:2] != '\037\213':
112 raise IOError, 'Not a gzipped file'
113 if ord(data[2]) != 8:
114 raise IOError, 'Unknown compression method'
116 # modtime = self.fileobj.read(4)
117 # extraflag = self.fileobj.read(1)
118 # os = self.fileobj.read(1)
122 # Read & discard the extra field, if present
124 xlen = xlen + 256*ord(data[11])
125 skip = skip + 2 + xlen
127 # Read and discard a null-terminated string containing the filename
129 if not data[skip] or data[skip] == '\000':
134 # Read and discard a null-terminated string containing a comment
136 if not data[skip] or data[skip] == '\000':
141 skip += 2 # Read & discard the 16-bit header CRC
145 """Clean everything up and return None to future reads."""
151 """Manages all requests for mirror objects."""
153 def __init__(self, cache_dir):
154 self.cache_dir = cache_dir
155 self.cache = filepath.FilePath(self.cache_dir)
158 def extractPath(self, url):
159 parsed = urlparse(url)
160 host, port = splitHostPort(parsed[0], parsed[1])
161 site = host + ":" + str(port)
164 i = max(path.rfind('/dists/'), path.rfind('/pool/'))
169 # Uh oh, this is not good
170 log.msg("Couldn't find a good base directory for path: %s" % (site + path))
172 if site in self.apt_caches:
174 for base in self.apt_caches[site]:
176 for dirs in path.split('/'):
177 if base.startswith(base_match + '/' + dirs):
178 base_match += '/' + dirs
181 if len(base_match) > longest_match:
182 longest_match = len(base_match)
184 log.msg("Settled on baseDir: %s" % baseDir)
186 return site, baseDir, path
188 def init(self, site, baseDir):
189 if site not in self.apt_caches:
190 self.apt_caches[site] = {}
192 if baseDir not in self.apt_caches[site]:
193 site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_'))
194 self.apt_caches[site][baseDir] = AptPackages(site_cache)
196 def updatedFile(self, url, file_path):
197 site, baseDir, path = self.extractPath(url)
198 self.init(site, baseDir)
199 self.apt_caches[site][baseDir].file_updated(path, file_path)
201 def findHash(self, url):
202 site, baseDir, path = self.extractPath(url)
203 if site in self.apt_caches and baseDir in self.apt_caches[site]:
204 return self.apt_caches[site][baseDir].findHash(path)
206 d.errback(MirrorError("Site Not Found"))
209 def save_file(self, response, hash, url):
210 """Save a downloaded file to the cache and stream it."""
211 log.msg('Returning file: %s' % url)
213 parsed = urlparse(url)
214 destFile = self.cache.preauthChild(parsed[1] + parsed[2])
215 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
217 if destFile.exists():
218 log.msg('File already exists, removing: %s' % destFile.path)
221 destFile.parent().makedirs()
223 root, ext = os.path.splitext(destFile.basename())
224 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
226 decFile = destFile.sibling(root)
227 log.msg('Decompressing to: %s' % decFile.path)
229 log.msg('File already exists, removing: %s' % decFile.path)
235 orig_stream = response.stream
236 response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
237 response.stream.doneDefer.addCallback(self.save_complete, url, destFile,
238 response.headers.getHeader('Last-Modified'),
240 response.stream.doneDefer.addErrback(self.save_error, url)
243 def save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
244 """Update the modification time and AptPackages."""
246 os.utime(destFile.path, (modtime, modtime))
248 os.utime(decFile.path, (modtime, modtime))
250 result = hash.verify()
251 if result or result is None:
253 log.msg('Hashes match: %s' % url)
255 log.msg('Hashed file to %s: %s' % (b2a_hex(result), url))
257 self.updatedFile(url, destFile.path)
259 self.updatedFile(url[:-len(ext)], decFile.path)
261 log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
263 def save_error(self, failure, url):
264 """An error has occurred in downloadign or saving the file."""
265 log.msg('Error occurred downloading %s' % url)
269 class TestMirrorManager(unittest.TestCase):
270 """Unit tests for the mirror manager."""
277 self.client = MirrorManager('/tmp')
279 def test_extractPath(self):
280 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release')
281 self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site)
282 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
283 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
285 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz')
286 self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site)
287 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
288 self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path)
290 site, baseDir, path = self.client.extractPath('http://debian.camrdale.org/dists/unstable/Release')
291 self.failUnless(site == "debian.camrdale.org:80", "no match: %s" % site)
292 self.failUnless(baseDir == "", "no match: %s" % baseDir)
293 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
295 def verifyHash(self, found_hash, path, true_hash):
296 self.failUnless(found_hash.hexexpected() == true_hash,
297 "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
299 def test_findHash(self):
300 self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n')
301 self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Sources$" | tail -n 1').read().rstrip('\n')
302 for f in os.walk('/var/lib/apt/lists').next()[2]:
303 if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]):
307 self.client.updatedFile('http://' + self.releaseFile.replace('_','/'),
308 '/var/lib/apt/lists/' + self.releaseFile)
309 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
310 self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'),
311 '/var/lib/apt/lists/' + self.packagesFile)
312 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
313 self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'),
314 '/var/lib/apt/lists/' + self.sourcesFile)
316 lastDefer = defer.Deferred()
318 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
319 '/var/lib/apt/lists/' + self.releaseFile +
320 ' | grep -E " main/binary-i386/Packages.bz2$"'
321 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
322 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
324 d = self.client.findHash(idx_path)
325 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
327 pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
328 '/var/lib/apt/lists/' + self.packagesFile +
329 ' | grep -E "^SHA1:" | head -n 1' +
330 ' | cut -d\ -f 2').read().rstrip('\n')
331 pkg_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \
332 os.popen('grep -A 30 -E "^Package: dpkg$" ' +
333 '/var/lib/apt/lists/' + self.packagesFile +
334 ' | grep -E "^Filename:" | head -n 1' +
335 ' | cut -d\ -f 2').read().rstrip('\n')
337 d = self.client.findHash(pkg_path)
338 d.addCallback(self.verifyHash, pkg_path, a2b_hex(pkg_hash))
340 src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
341 '/var/lib/apt/lists/' + self.sourcesFile +
342 ' | grep -E "^Directory:" | head -n 1' +
343 ' | cut -d\ -f 2').read().rstrip('\n')
344 src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
345 '/var/lib/apt/lists/' + self.sourcesFile +
346 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
347 ' | cut -d\ -f 2').read().split('\n')[:-1]
348 src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
349 '/var/lib/apt/lists/' + self.sourcesFile +
350 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
351 ' | cut -d\ -f 4').read().split('\n')[:-1]
353 for i in range(len(src_hashes)):
354 src_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i]
355 d = self.client.findHash(src_path)
356 d.addCallback(self.verifyHash, src_path, a2b_hex(src_hashes[i]))
358 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
359 '/var/lib/apt/lists/' + self.releaseFile +
360 ' | grep -E " main/source/Sources.bz2$"'
361 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
362 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2'
364 d = self.client.findHash(idx_path)
365 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
367 d.addBoth(lastDefer.callback)
371 for p in self.pending_calls: