Fix some documentation errors.
[quix0rs-apt-p2p.git] / apt_p2p / CacheManager.py
1
2 """Manage a cache of downloaded files.
3
4 @var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed
5 @var DECOMPRESS_FILES: a list of file names that need to be decompressed
6 """
7
8 from urlparse import urlparse
9 import os
10
11 from twisted.python import log
12 from twisted.python.filepath import FilePath
13 from twisted.internet import defer, reactor
14 from twisted.trial import unittest
15 from twisted.web2.http import splitHostPort
16
17 from Streams import GrowingFileStream, StreamToFile
18 from Hash import HashObject
19 from apt_p2p_conf import config
20
21 DECOMPRESS_EXTS = ['.gz', '.bz2']
22 DECOMPRESS_FILES = ['release', 'sources', 'packages']
23
24 class CacheError(Exception):
25     """Error occurred downloading a file to the cache."""
26
27 class CacheManager:
28     """Manages all downloaded files and requests for cached objects.
29     
30     @type cache_dir: L{twisted.python.filepath.FilePath}
31     @ivar cache_dir: the directory to use for storing all files
32     @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
33     @ivar other_dirs: the other directories that have shared files in them
34     @type all_dirs: C{list} of L{twisted.python.filepath.FilePath}
35     @ivar all_dirs: all the directories that have cached files in them
36     @type db: L{db.DB}
37     @ivar db: the database to use for tracking files and hashes
38     @type manager: L{apt_p2p.AptP2P}
39     @ivar manager: the main program object to send requests to
40     @type scanning: C{list} of L{twisted.python.filepath.FilePath}
41     @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
42     """
43     
44     def __init__(self, cache_dir, db, manager = None):
45         """Initialize the instance and remove any untracked files from the DB..
46         
47         @type cache_dir: L{twisted.python.filepath.FilePath}
48         @param cache_dir: the directory to use for storing all files
49         @type db: L{db.DB}
50         @param db: the database to use for tracking files and hashes
51         @type manager: L{apt_p2p.AptP2P}
52         @param manager: the main program object to send requests to
53             (optional, defaults to not calling back with cached files)
54         """
55         self.cache_dir = cache_dir
56         self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
57         self.all_dirs = self.other_dirs[:]
58         self.all_dirs.insert(0, self.cache_dir)
59         self.db = db
60         self.manager = manager
61         self.scanning = []
62         
63         # Init the database, remove old files
64         self.db.removeUntrackedFiles(self.all_dirs)
65         
66     #{ Scanning directories
67     def scanDirectories(self, result = None):
68         """Scan the cache directories, hashing new and rehashing changed files."""
69         assert not self.scanning, "a directory scan is already under way"
70         self.scanning = self.all_dirs[:]
71         self._scanDirectories()
72
73     def _scanDirectories(self, result = None, walker = None):
74         """Walk each directory looking for cached files.
75         
76         @param result: the result of a DHT store request, not used (optional)
77         @param walker: the walker to use to traverse the current directory
78             (optional, defaults to creating a new walker from the first
79             directory in the L{CacheManager.scanning} list)
80         """
81         # Need to start walking a new directory
82         if walker is None:
83             # If there are any left, get them
84             if self.scanning:
85                 log.msg('started scanning directory: %s' % self.scanning[0].path)
86                 walker = self.scanning[0].walk()
87             else:
88                 log.msg('cache directory scan complete')
89                 return
90             
91         try:
92             # Get the next file in the directory
93             file = walker.next()
94         except StopIteration:
95             # No files left, go to the next directory
96             log.msg('done scanning directory: %s' % self.scanning[0].path)
97             self.scanning.pop(0)
98             reactor.callLater(0, self._scanDirectories)
99             return
100
101         # If it's not a file ignore it
102         if not file.isfile():
103             reactor.callLater(0, self._scanDirectories, None, walker)
104             return
105
106         # If it's already properly in the DB, ignore it
107         db_status = self.db.isUnchanged(file)
108         if db_status:
109             reactor.callLater(0, self._scanDirectories, None, walker)
110             return
111         
112         # Don't hash files in the cache that are not in the DB
113         if self.scanning[0] == self.cache_dir:
114             if db_status is None:
115                 log.msg('ignoring unknown cache file: %s' % file.path)
116             else:
117                 log.msg('removing changed cache file: %s' % file.path)
118                 file.remove()
119             reactor.callLater(0, self._scanDirectories, None, walker)
120             return
121
122         # Otherwise hash it
123         log.msg('start hash checking file: %s' % file.path)
124         hash = HashObject()
125         df = hash.hashInThread(file)
126         df.addBoth(self._doneHashing, file, walker)
127     
128     def _doneHashing(self, result, file, walker):
129         """If successful, add the hashed file to the DB and inform the main program."""
130         if isinstance(result, HashObject):
131             log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
132             
133             # Only set a URL if this is a downloaded file
134             url = None
135             if self.scanning[0] == self.cache_dir:
136                 url = 'http:/' + file.path[len(self.cache_dir.path):]
137                 
138             # Store the hashed file in the database
139             new_hash = self.db.storeFile(file, result.digest(), True,
140                                          ''.join(result.pieceDigests()))
141             
142             # Tell the main program to handle the new cache file
143             df = self.manager.new_cached_file(file, result, new_hash, url, True)
144             if df is None:
145                 reactor.callLater(0, self._scanDirectories, None, walker)
146             else:
147                 df.addBoth(self._scanDirectories, walker)
148         else:
149             # Must have returned an error
150             log.msg('hash check of %s failed' % file.path)
151             log.err(result)
152             reactor.callLater(0, self._scanDirectories, None, walker)
153
154     #{ Downloading files
155     def save_file(self, response, hash, url):
156         """Save a downloaded file to the cache and stream it.
157         
158         @type response: L{twisted.web2.http.Response}
159         @param response: the response from the download
160         @type hash: L{Hash.HashObject}
161         @param hash: the hash object containing the expected hash for the file
162         @param url: the URI of the actual mirror request
163         @rtype: L{twisted.web2.http.Response}
164         @return: the final response from the download
165         """
166         if response.code != 200:
167             log.msg('File was not found (%r): %s' % (response, url))
168             return response
169         
170         log.msg('Returning file: %s' % url)
171
172         # Set the destination path for the file
173         parsed = urlparse(url)
174         destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
175         log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
176         
177         # Make sure there's a free place for the file
178         if destFile.exists():
179             log.msg('File already exists, removing: %s' % destFile.path)
180             destFile.remove()
181         if not destFile.parent().exists():
182             destFile.parent().makedirs()
183
184         # Determine whether it needs to be decompressed and how
185         root, ext = os.path.splitext(destFile.basename())
186         if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
187             ext = ext.lower()
188             decFile = destFile.sibling(root)
189             log.msg('Decompressing to: %s' % decFile.path)
190             if decFile.exists():
191                 log.msg('File already exists, removing: %s' % decFile.path)
192                 decFile.remove()
193         else:
194             ext = None
195             decFile = None
196             
197         # Create the new stream from the old one.
198         orig_stream = response.stream
199         f = destFile.open('w+')
200         new_stream = GrowingFileStream(f, orig_stream.length)
201         hash.new()
202         df = StreamToFile(hash, orig_stream, f, notify = new_stream.updateAvailable,
203                           decompress = ext, decFile = decFile).run()
204         df.addCallback(self._save_complete, url, destFile, new_stream,
205                        response.headers.getHeader('Last-Modified'), decFile)
206         df.addErrback(self._save_error, url, destFile, new_stream, decFile)
207         response.stream = new_stream
208
209         # Return the modified response with the new stream
210         return response
211
212     def _save_complete(self, hash, url, destFile, destStream = None,
213                        modtime = None, decFile = None):
214         """Update the modification time and inform the main program.
215         
216         @type hash: L{Hash.HashObject}
217         @param hash: the hash object containing the expected hash for the file
218         @param url: the URI of the actual mirror request
219         @type destFile: C{twisted.python.FilePath}
220         @param destFile: the file where the download was written to
221         @type destStream: L{Streams.GrowingFileStream}
222         @param destStream: the stream to notify that all data is available
223         @type modtime: C{int}
224         @param modtime: the modified time of the cached file (seconds since epoch)
225             (optional, defaults to not setting the modification time of the file)
226         @type decFile: C{twisted.python.FilePath}
227         @param decFile: the file where the decompressed download was written to
228             (optional, defaults to the file not having been compressed)
229         """
230         result = hash.verify()
231         if result or result is None:
232             if destStream:
233                 destStream.allAvailable()
234             if modtime:
235                 os.utime(destFile.path, (modtime, modtime))
236             
237             if result:
238                 log.msg('Hashes match: %s' % url)
239                 dht = True
240             else:
241                 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
242                 dht = False
243                 
244             new_hash = self.db.storeFile(destFile, hash.digest(), dht,
245                                          ''.join(hash.pieceDigests()))
246
247             if self.manager:
248                 self.manager.new_cached_file(destFile, hash, new_hash, url)
249
250             if decFile:
251                 # Hash the decompressed file and add it to the DB
252                 decHash = HashObject()
253                 ext_len = len(destFile.path) - len(decFile.path)
254                 df = decHash.hashInThread(decFile)
255                 df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime = modtime)
256                 df.addErrback(self._save_error, url[:-ext_len], decFile)
257         else:
258             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
259             if destStream:
260                 destStream.allAvailable(remove = True)
261             if decFile:
262                 decFile.remove()
263
264     def _save_error(self, failure, url, destFile, destStream = None, decFile = None):
265         """Remove the destination files."""
266         log.msg('Error occurred downloading %s' % url)
267         log.err(failure)
268         if destStream:
269             destStream.allAvailable(remove = True)
270         else:
271             destFile.restat(False)
272             if destFile.exists():
273                 log.msg('Removing the incomplete file: %s' % destFile.path)
274                 destFile.remove()
275         if decFile:
276             decFile.restat(False)
277             if decFile.exists():
278                 log.msg('Removing the incomplete file: %s' % decFile.path)
279                 decFile.remove()
280
281     def save_error(self, failure, url):
282         """An error has occurred in downloading or saving the file"""
283         log.msg('Error occurred downloading %s' % url)
284         log.err(failure)
285         return failure
286
287 class TestMirrorManager(unittest.TestCase):
288     """Unit tests for the mirror manager."""
289     
290     timeout = 20
291     pending_calls = []
292     client = None
293     
294     def setUp(self):
295         self.client = CacheManager(FilePath('/tmp/.apt-p2p'))
296         
297     def tearDown(self):
298         for p in self.pending_calls:
299             if p.active():
300                 p.cancel()
301         self.client = None
302