2 from bz2 import BZ2Decompressor
3 from zlib import decompressobj, MAX_WBITS
4 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
5 from urlparse import urlparse
6 from binascii import a2b_hex
9 from twisted.python import log, filepath
10 from twisted.internet import defer
11 from twisted.trial import unittest
12 from twisted.web2 import stream
13 from twisted.web2.http import splitHostPort
15 from AptPackages import AptPackages
19 DECOMPRESS_EXTS = ['.gz', '.bz2']
20 DECOMPRESS_FILES = ['release', 'sources', 'packages']
22 class MirrorError(Exception):
23 """Exception raised when there's a problem with the mirror."""
25 class ProxyFileStream(stream.SimpleStream):
26 """Saves a stream to a file while providing a new stream."""
28 def __init__(self, stream, outFile, decompress = None, decFile = None):
29 """Initializes the proxy.
31 @type stream: C{twisted.web2.stream.IByteStream}
32 @param stream: the input stream to read from
33 @type outFile: C{twisted.python.filepath.FilePath}
34 @param outFile: the file to write to
35 @type decompress: C{string}
36 @param decompress: also decompress the file as this type
37 (currently only '.gz' and '.bz2' are supported)
38 @type decFile: C{twisted.python.filepath.FilePath}
39 @param decFile: the file to write the decompressed data to
42 self.outFile = outFile.open('w')
45 if decompress == ".gz":
47 self.gzfile = decFile.open('w')
48 self.gzdec = decompressobj(-MAX_WBITS)
49 elif decompress == ".bz2":
50 self.bz2file = decFile.open('w')
51 self.bz2dec = BZ2Decompressor()
52 self.length = self.stream.length
54 self.doneDefer = defer.Deferred()
57 """Close the output file."""
58 if not self.outFile.closed:
61 data_dec = self.gzdec.flush()
62 self.gzfile.write(data_dec)
69 self.doneDefer.callback(1)
72 """Read some data from the stream."""
73 if self.outFile.closed:
76 data = self.stream.read()
77 if isinstance(data, defer.Deferred):
78 data.addCallbacks(self._write, self._done)
84 def _write(self, data):
85 """Write the stream data to the file and return it for others to use."""
90 self.outFile.write(data)
94 new_data = self._remove_gzip_header(data)
95 dec_data = self.gzdec.decompress(new_data)
97 dec_data = self.gzdec.decompress(data)
98 self.gzfile.write(dec_data)
100 dec_data = self.bz2dec.decompress(data)
101 self.bz2file.write(dec_data)
104 def _remove_gzip_header(self, data):
105 if data[:2] != '\037\213':
106 raise IOError, 'Not a gzipped file'
107 if ord(data[2]) != 8:
108 raise IOError, 'Unknown compression method'
110 # modtime = self.fileobj.read(4)
111 # extraflag = self.fileobj.read(1)
112 # os = self.fileobj.read(1)
116 # Read & discard the extra field, if present
118 xlen = xlen + 256*ord(data[11])
119 skip = skip + 2 + xlen
121 # Read and discard a null-terminated string containing the filename
123 if not data[skip] or data[skip] == '\000':
128 # Read and discard a null-terminated string containing a comment
130 if not data[skip] or data[skip] == '\000':
135 skip += 2 # Read & discard the 16-bit header CRC
139 """Clean everything up and return None to future reads."""
145 """Manages all requests for mirror objects."""
147 def __init__(self, cache_dir):
148 self.cache_dir = cache_dir
149 self.cache = filepath.FilePath(self.cache_dir)
152 def extractPath(self, url):
153 parsed = urlparse(url)
154 host, port = splitHostPort(parsed[0], parsed[1])
155 site = host + ":" + str(port)
158 i = max(path.rfind('/dists/'), path.rfind('/pool/'))
163 # Uh oh, this is not good
164 log.msg("Couldn't find a good base directory for path: %s" % (site + path))
166 if site in self.apt_caches:
168 for base in self.apt_caches[site]:
170 for dirs in path.split('/'):
171 if base.startswith(base_match + '/' + dirs):
172 base_match += '/' + dirs
175 if len(base_match) > longest_match:
176 longest_match = len(base_match)
178 log.msg("Settled on baseDir: %s" % baseDir)
180 return site, baseDir, path
182 def init(self, site, baseDir):
183 if site not in self.apt_caches:
184 self.apt_caches[site] = {}
186 if baseDir not in self.apt_caches[site]:
187 site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_'))
188 self.apt_caches[site][baseDir] = AptPackages(site_cache)
190 def updatedFile(self, url, file_path):
191 site, baseDir, path = self.extractPath(url)
192 self.init(site, baseDir)
193 self.apt_caches[site][baseDir].file_updated(path, file_path)
195 def findHash(self, url):
196 site, baseDir, path = self.extractPath(url)
197 if site in self.apt_caches and baseDir in self.apt_caches[site]:
198 d = self.apt_caches[site][baseDir].findHash(path)
199 d.addCallback(self.translateHash)
202 d.errback(MirrorError("Site Not Found"))
205 def translateHash(self, (hash, size)):
206 """Translate a hash from apt's hex encoding to a string."""
211 def save_file(self, response, hash, size, url):
212 """Save a downloaded file to the cache and stream it."""
213 log.msg('Returning file: %s' % url)
215 parsed = urlparse(url)
216 destFile = self.cache.preauthChild(parsed[1] + parsed[2])
217 log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
219 if destFile.exists():
220 log.msg('File already exists, removing: %s' % destFile.path)
223 destFile.parent().makedirs()
225 root, ext = os.path.splitext(destFile.basename())
226 if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
228 decFile = destFile.sibling(root)
229 log.msg('Decompressing to: %s' % decFile.path)
231 log.msg('File already exists, removing: %s' % decFile.path)
237 orig_stream = response.stream
238 response.stream = ProxyFileStream(orig_stream, destFile, ext, decFile)
239 response.stream.doneDefer.addCallback(self.save_complete, url, destFile,
240 response.headers.getHeader('Last-Modified'),
242 response.stream.doneDefer.addErrback(self.save_error, url)
245 def save_complete(self, result, url, destFile, modtime = None, ext = None, decFile = None):
246 """Update the modification time and AptPackages."""
248 os.utime(destFile.path, (modtime, modtime))
250 os.utime(decFile.path, (modtime, modtime))
252 self.updatedFile(url, destFile.path)
254 self.updatedFile(url[:-len(ext)], decFile.path)
256 def save_error(self, failure, url):
257 """An error has occurred in downloadign or saving the file."""
258 log.msg('Error occurred downloading %s' % url)
262 class TestMirrorManager(unittest.TestCase):
263 """Unit tests for the mirror manager."""
270 self.client = MirrorManager('/tmp')
272 def test_extractPath(self):
273 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release')
274 self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site)
275 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
276 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
278 site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz')
279 self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site)
280 self.failUnless(baseDir == "/debian", "no match: %s" % baseDir)
281 self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path)
283 site, baseDir, path = self.client.extractPath('http://debian.camrdale.org/dists/unstable/Release')
284 self.failUnless(site == "debian.camrdale.org:80", "no match: %s" % site)
285 self.failUnless(baseDir == "", "no match: %s" % baseDir)
286 self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path)
288 def verifyHash(self, found_hash, path, true_hash):
289 self.failUnless(found_hash[0] == true_hash,
290 "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
292 def test_findHash(self):
293 self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n')
294 self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Sources$" | tail -n 1').read().rstrip('\n')
295 for f in os.walk('/var/lib/apt/lists').next()[2]:
296 if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]):
300 self.client.updatedFile('http://' + self.releaseFile.replace('_','/'),
301 '/var/lib/apt/lists/' + self.releaseFile)
302 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
303 self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'),
304 '/var/lib/apt/lists/' + self.packagesFile)
305 self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') +
306 self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'),
307 '/var/lib/apt/lists/' + self.sourcesFile)
309 lastDefer = defer.Deferred()
311 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
312 '/var/lib/apt/lists/' + self.releaseFile +
313 ' | grep -E " main/binary-i386/Packages.bz2$"'
314 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
315 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
317 d = self.client.findHash(idx_path)
318 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
320 pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
321 '/var/lib/apt/lists/' + self.packagesFile +
322 ' | grep -E "^SHA1:" | head -n 1' +
323 ' | cut -d\ -f 2').read().rstrip('\n')
324 pkg_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \
325 os.popen('grep -A 30 -E "^Package: dpkg$" ' +
326 '/var/lib/apt/lists/' + self.packagesFile +
327 ' | grep -E "^Filename:" | head -n 1' +
328 ' | cut -d\ -f 2').read().rstrip('\n')
330 d = self.client.findHash(pkg_path)
331 d.addCallback(self.verifyHash, pkg_path, a2b_hex(pkg_hash))
333 src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
334 '/var/lib/apt/lists/' + self.sourcesFile +
335 ' | grep -E "^Directory:" | head -n 1' +
336 ' | cut -d\ -f 2').read().rstrip('\n')
337 src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
338 '/var/lib/apt/lists/' + self.sourcesFile +
339 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
340 ' | cut -d\ -f 2').read().split('\n')[:-1]
341 src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
342 '/var/lib/apt/lists/' + self.sourcesFile +
343 ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
344 ' | cut -d\ -f 4').read().split('\n')[:-1]
346 for i in range(len(src_hashes)):
347 src_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i]
348 d = self.client.findHash(src_path)
349 d.addCallback(self.verifyHash, src_path, a2b_hex(src_hashes[i]))
351 idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
352 '/var/lib/apt/lists/' + self.releaseFile +
353 ' | grep -E " main/source/Sources.bz2$"'
354 ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
355 idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2'
357 d = self.client.findHash(idx_path)
358 d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
360 d.addBoth(lastDefer.callback)
364 for p in self.pending_calls: