2 from bz2 import BZ2Decompressor
3 from zlib import decompressobj, MAX_WBITS
4 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
5 from urlparse import urlparse
6 from binascii import a2b_hex
9 from twisted.python import log, filepath
10 from twisted.internet import defer
11 from twisted.trial import unittest
12 from twisted.web2 import stream
13 from twisted.web2.http import splitHostPort
15 from AptPackages import AptPackages
19 DECOMPRESS_EXTS = ['.gz', '.bz2']
20 DECOMPRESS_FILES = ['release', 'sources', 'packages']
22 class MirrorError(Exception):
23 """Exception raised when there's a problem with the mirror."""
25 class ProxyFileStream(stream.SimpleStream):
26 """Saves a stream to a file while providing a new stream."""
28 def __init__(self, stream, outFile, decompress = None, decFile = None):
29 """Initializes the proxy.
31 @type stream: C{twisted.web2.stream.IByteStream}
32 @param stream: the input stream to read from
33 @type outFile: C{twisted.python.filepath.FilePath}
34 @param outFile: the file to write to
35 @type decompress: C{string}
36 @param decompress: also decompress the file as this type
37 (currently only '.gz' and '.bz2' are supported)
38 @type decFile: C{twisted.python.filepath.FilePath}
39 @param decFile: the file to write the decompressed data to
42 self.outFile = outFile.open('w')
45 if decompress == ".gz":
47 self.gzfile = decFile.open('w')
48 self.gzdec = decompressobj(-MAX_WBITS)
49 elif decompress == ".bz2":
50 self.bz2file = decFile.open('w')
51 self.bz2dec = BZ2Decompressor()
52 self.length = self.stream.length
54 self.doneDefer = defer.Deferred()
57 """Close the output file."""
58 if not self.outFile.closed:
61 data_dec = self.gzdec.flush()
62 self.gzfile.write(data_dec)
69 self.doneDefer.callback(1)
72 """Read some data from the stream."""
73 if self.outFile.closed:
76 data = self.stream.read()
77 if isinstance(data, defer.Deferred):
78 data.addCallbacks(self._write, self._done)
84 def _write(self, data):
85 """Write the stream data to the file and return it for others to use."""
90 self.outFile.write(data)
94 new_data = self._remove_gzip_header(data)
95 dec_data = self.gzdec.decompress(new_data)
97 dec_data = self.gzdec.decompress(data)
98 self.gzfile.write(dec_data)
100 dec_data = self.bz2dec.decompress(data)
101 self.bz2file.write(dec_data)
104 def _remove_gzip_header(self, data):
105 if data[:2] != '\037\213':
106 raise IOError, 'Not a gzipped file'
107 if ord(data[2]) != 8:
108 raise IOError, 'Unknown compression method'
110 # modtime = self.fileobj.read(4)
111 # extraflag = self.fileobj.read(1)
112 # os = self.fileobj.read(1)
116 # Read & discard the extra field, if present
118 xlen = xlen + 256*ord(data[11])
119 skip = skip + 2 + xlen
121 # Read and discard a null-terminated string containing the filename
123 if not data[skip] or data[skip] == '\000':
128 # Read and discard a null-terminated string containing a comment
130 if not data[skip] or data[skip] == '\000':
135 skip += 2 # Read & discard the 16-bit header CRC
139 """Clean everything up and return None to future reads."""
145 """Manages all requests for mirror objects."""
147 def __init__(self, cache_dir):
148 self.cache_dir = cache_dir
149 self.cache = filepath.FilePath(self.cache_dir)
152 def extractPath(self, url):
153 parsed = urlparse(url)
154 host, port = splitHostPort(parsed[0], parsed[1])
155 site = host + ":" + str(port)
158 i = max(path.rfind('/dists/'), path.rfind('/pool/'))
163 # Uh oh, this is not good
164 log.msg("Couldn't find a good base directory for path: %s" % (site + path))
166 if site in self.apt_caches:
168 for base in self.apt_caches[site]:
170 for dirs in path.split('/'):
171 if base.startswith(base_match + '/' + dirs):
172 base_match += '/' + dirs
175 if len(base_match) > longest_match:
176 longest_match = len(base_match)
178 log.msg("Settled on baseDir: %s" % baseDir)
180 return site, baseDir, path
182 def init(self, site, baseDir):
183 if site not in self.apt_caches:
184 self.apt_caches[site] = {}
186 if baseDir not in self.apt_caches[site]:
187 site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_'))
188 self.apt_caches[site][baseDir] = AptPackages(site_cache)
190 def findHash(self, url):
191 site, baseDir, path = self.extractPath(url)
192 if site in self.apt_caches and baseDir in self.apt_caches[site]:
193 d = self.apt_caches[site][baseDir].findHash(path)
194 d.addCallback(self.translateHash)
197 d.errback(MirrorError("Site Not Found"))
200 def translateHash(self, (hash, size)):
201 """Translate a hash from apt's hex encoding to a string."""
206 def save_file(self, response, hash, size, url):
207 """Save a downloaded file to the cache and stream it."""
208 log.msg('Returning file: %s' % url)
210 parsed = urlparse(url)
211 destFile = self.cache.preauthChild(parsed[1] + parsed[2])
212 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
214 if destFile.exists():
215 log.msg('File already exists, removing: %s' % destFile.path)
218 destFile.parent().makedirs()
220 root, ext = os.path.splitext(destFile.basename())
221 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
223 decFile = destFile.sibling(root)
224 log.msg('Decompressing to: %s' % decFile.path)
226 log.msg('File already exists, removing: %s' % decFile.path)
232 orig_stream = response.stream
233 response.stream = ProxyFileStream(orig_stream, destFile, ext, decFile)
234 response.stream.doneDefer.addCallback(self.save_complete, url, destFile,
235 response.headers.getHeader('Last-Modified'),
237 response.stream.doneDefer.addErrback(self.save_error, url)
240 def save_complete(self, result, url, destFile, modtime = None, ext = None, decFile = None):
241 """Update the modification time and AptPackages."""
243 os.utime(destFile.path, (modtime, modtime))
245 os.utime(decFile.path, (modtime, modtime))
247 site, baseDir, path = self.extractPath(url)
248 self.init(site, baseDir)
249 self.apt_caches[site][baseDir].file_updated(path, destFile.path)
251 self.apt_caches[site][baseDir].file_updated(path[:-len(ext)], decFile.path)
253 def save_error(self, failure, url):
254 """An error has occurred in downloadign or saving the file."""
255 log.msg('Error occurred downloading %s' % url)
259 class TestMirrorManager(unittest.TestCase):
260 """Unit tests for the mirror manager."""
267 self.client = MirrorManager('/tmp')
269 def test_extractPath(self):
270 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release')
271 self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site)
272 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
273 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
275 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz')
276 self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site)
277 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
278 self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path)
280 site, baseDir, path = self.client.extractPath('http://debian.camrdale.org/dists/unstable/Release')
281 self.failUnless(site == "debian.camrdale.org:80", "no match: %s" % site)
282 self.failUnless(baseDir == "", "no match: %s" % baseDir)
283 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
285 def verifyHash(self, found_hash, path, true_hash):
286 self.failUnless(found_hash[0] == true_hash,
287 "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
289 def test_findHash(self):
290 self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n')
291 self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Sources$" | tail -n 1').read().rstrip('\n')
292 for f in os.walk('/var/lib/apt/lists').next()[2]:
293 if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]):
297 self.client.updatedFile('http://' + self.releaseFile.replace('_','/'),
298 '/var/lib/apt/lists/' + self.releaseFile)
299 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
300 self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'),
301 '/var/lib/apt/lists/' + self.packagesFile)
302 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
303 self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'),
304 '/var/lib/apt/lists/' + self.sourcesFile)
306 lastDefer = defer.Deferred()
308 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
309 '/var/lib/apt/lists/' + self.releaseFile +
310 ' | grep -E " main/binary-i386/Packages.bz2$"'
311 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
312 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
314 d = self.client.findHash(idx_path)
315 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
317 pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
318 '/var/lib/apt/lists/' + self.packagesFile +
319 ' | grep -E "^SHA1:" | head -n 1' +
320 ' | cut -d\ -f 2').read().rstrip('\n')
321 pkg_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \
322 os.popen('grep -A 30 -E "^Package: dpkg$" ' +
323 '/var/lib/apt/lists/' + self.packagesFile +
324 ' | grep -E "^Filename:" | head -n 1' +
325 ' | cut -d\ -f 2').read().rstrip('\n')
327 d = self.client.findHash(pkg_path)
328 d.addCallback(self.verifyHash, pkg_path, a2b_hex(pkg_hash))
330 src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
331 '/var/lib/apt/lists/' + self.sourcesFile +
332 ' | grep -E "^Directory:" | head -n 1' +
333 ' | cut -d\ -f 2').read().rstrip('\n')
334 src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
335 '/var/lib/apt/lists/' + self.sourcesFile +
336 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
337 ' | cut -d\ -f 2').read().split('\n')[:-1]
338 src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
339 '/var/lib/apt/lists/' + self.sourcesFile +
340 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
341 ' | cut -d\ -f 4').read().split('\n')[:-1]
343 for i in range(len(src_hashes)):
344 src_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i]
345 d = self.client.findHash(src_path)
346 d.addCallback(self.verifyHash, src_path, a2b_hex(src_hashes[i]))
348 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
349 '/var/lib/apt/lists/' + self.releaseFile +
350 ' | grep -E " main/source/Sources.bz2$"'
351 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
352 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2'
354 d = self.client.findHash(idx_path)
355 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
357 d.addBoth(lastDefer.callback)
361 for p in self.pending_calls: