Add a final retry of all errback mirror downloads.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var download_dir: the name of the directory to use for downloaded files
5 @var peer_dir: the name of the directory to use for peer downloads
6 """
7
8 from urllib import unquote
9
10 from twisted.internet import defer, reactor, protocol
11 from twisted.web2 import static
12 from twisted.python import log, failure
13 from twisted.python.filepath import FilePath
14
15 from apt_p2p_conf import config
16 from DHTManager import DHT
17 from PeerManager import PeerManager
18 from HTTPServer import TopLevel
19 from MirrorManager import MirrorManager
20 from CacheManager import CacheManager
21 from Hash import HashObject
22 from db import DB
23 from stats import StatsLogger
24
25 download_dir = 'cache'
26 peer_dir = 'peers'
27
28 class AptP2P(protocol.Factory):
29     """The main code object that does all of the work.
30     
31     Contains all of the sub-components that do all the low-level work, and
32     coordinates communication between them.
33     
34     @type dhtClass: L{interfaces.IDHT}
35     @ivar dhtClass: the DHT class to use
36     @type cache_dir: L{twisted.python.filepath.FilePath}
37     @ivar cache_dir: the directory to use for storing all files
38     @type db: L{db.DB}
39     @ivar db: the database to use for tracking files and hashes
40     @type dht: L{DHTManager.DHT}
41     @ivar dht: the manager for DHT requests
42     @type stats: L{stats.StatsLogger}
43     @ivar stats: the statistics logger to record sent data to
44     @type http_server: L{HTTPServer.TopLevel}
45     @ivar http_server: the web server that will handle all requests from apt
46         and from other peers
47     @type peers: L{PeerManager.PeerManager}
48     @ivar peers: the manager of all downloads from mirrors and other peers
49     @type mirrors: L{MirrorManager.MirrorManager}
50     @ivar mirrors: the manager of downloaded information about mirrors which
51         can be queried to get hashes from file names
52     @type cache: L{CacheManager.CacheManager}
53     @ivar cache: the manager of all downloaded files
54     @type my_addr: C{string}, C{int}
55     @ivar my_addr: the IP address and port of this peer
56     """
57     
58     def __init__(self, dhtClass):
59         """Initialize all the sub-components.
60         
61         @type dhtClass: L{interfaces.IDHT}
62         @param dhtClass: the DHT class to use
63         """
64         log.msg('Initializing the main apt_p2p application')
65         self.dhtClass = dhtClass
66         self.my_addr = None
67
68     #{ Factory interface
69     def startFactory(self):
70         reactor.callLater(0, self._startFactory)
71         
72     def _startFactory(self):
73         log.msg('Starting the main apt_p2p application')
74         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75         if not self.cache_dir.child(download_dir).exists():
76             self.cache_dir.child(download_dir).makedirs()
77         if not self.cache_dir.child(peer_dir).exists():
78             self.cache_dir.child(peer_dir).makedirs()
79         self.db = DB(self.cache_dir.child('apt-p2p.db'))
80         self.dht = DHT(self.dhtClass, self.db)
81         df = self.dht.start()
82         df.addCallback(self._dhtStarted)
83         self.stats = StatsLogger(self.db)
84         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
85         self.http_server.getHTTPFactory().startFactory()
86         self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
87         self.mirrors = MirrorManager(self.cache_dir)
88         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
89     
90     def _dhtStarted(self, result):
91         """Save the returned address and start scanning the cache."""
92         self.my_addr = result
93         self.cache.scanDirectories()
94         
95     def stopFactory(self):
96         log.msg('Stoppping the main apt_p2p application')
97         self.http_server.getHTTPFactory().stopFactory()
98         self.mirrors.cleanup()
99         self.stats.save()
100         self.db.close()
101     
102     def buildProtocol(self, addr):
103         return self.http_server.getHTTPFactory().buildProtocol(addr)
104
105     #{ Other functions
106     def getStats(self):
107         """Retrieve and format the statistics for the program.
108         
109         @rtype: C{string}
110         @return: the formatted HTML page containing the statistics
111         """
112         out = '<html><body>\n\n'
113         out += self.stats.formatHTML(self.my_addr)
114         out += '\n\n'
115         out += self.dht.getStats()
116         out += '\n</body></html>\n'
117         return out
118
119     #{ Main workflow
120     def get_resp(self, req, url, orig_resp = None):
121         """Lookup a hash for the file in the local mirror info.
122         
123         Starts the process of getting a response to an apt request.
124         
125         @type req: L{twisted.web2.http.Request}
126         @param req: the initial request sent to the HTTP server by apt
127         @param url: the URI of the actual mirror request
128         @type orig_resp: L{twisted.web2.http.Response}
129         @param orig_resp: the response from the cache to be sent to apt
130             (optional, ignored if missing)
131         @rtype: L{twisted.internet.defer.Deferred}
132         @return: a deferred that will be called back with the response
133         """
134         d = defer.Deferred()
135         
136         log.msg('Trying to find hash for %s' % url)
137         findDefer = self.mirrors.findHash(unquote(url))
138         
139         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
140                                callbackArgs=(req, url, orig_resp, d),
141                                errbackArgs=(req, url, orig_resp, d))
142         return d
143     
144     def findHash_error(self, failure, req, url, orig_resp, d):
145         """Process the error in hash lookup by returning an empty L{HashObject}."""
146         log.msg('Hash lookup for %s resulted in an error: %s' %
147                 (url, failure.getErrorMessage()))
148         self.findHash_done(HashObject(), req, url, orig_resp, d)
149         
150     def findHash_done(self, hash, req, url, orig_resp, d):
151         """Use the returned hash to lookup the file in the cache.
152         
153         If the hash was not found, the workflow skips down to download from
154         the mirror (L{startDownload}), or checks the freshness of an old
155         response if there is one.
156         
157         @type hash: L{Hash.HashObject}
158         @param hash: the hash object containing the expected hash for the file
159         """
160         if hash.expected() is None:
161             log.msg('Hash for %s was not found' % url)
162             # Send the old response or get a new one
163             if orig_resp:
164                 self.check_freshness(req, url, orig_resp, d)
165             else:
166                 self.startDownload([], req, hash, url, d)
167         else:
168             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
169             
170             # Lookup hash in cache
171             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
172             self.getCachedFile(hash, req, url, d, locations)
173
174     def check_freshness(self, req, url, orig_resp, d):
175         """Send a HEAD to the mirror to check if the response from the cache is still valid.
176         
177         @type req: L{twisted.web2.http.Request}
178         @param req: the initial request sent to the HTTP server by apt
179         @param url: the URI of the actual mirror request
180         @type orig_resp: L{twisted.web2.http.Response}
181         @param orig_resp: the response from the cache to be sent to apt
182         @rtype: L{twisted.internet.defer.Deferred}
183         @return: a deferred that will be called back with the correct response
184         """
185         log.msg('Checking if %s is still fresh' % url)
186         modtime = orig_resp.headers.getHeader('Last-Modified')
187         headDefer = self.peers.get(HashObject(), url, method = "HEAD",
188                                    modtime = modtime)
189         headDefer.addCallbacks(self.check_freshness_done,
190                                self.check_freshness_error,
191                                callbackArgs = (req, url, orig_resp, d),
192                                errbackArgs = (req, url, d))
193     
194     def check_freshness_done(self, resp, req, url, orig_resp, d):
195         """Return the fresh response, if stale start to redownload.
196         
197         @type resp: L{twisted.web2.http.Response}
198         @param resp: the response from the mirror to the HEAD request
199         @type req: L{twisted.web2.http.Request}
200         @param req: the initial request sent to the HTTP server by apt
201         @param url: the URI of the actual mirror request
202         @type orig_resp: L{twisted.web2.http.Response}
203         @param orig_resp: the response from the cache to be sent to apt
204         """
205         if resp.code == 304:
206             log.msg('Still fresh, returning: %s' % url)
207             d.callback(orig_resp)
208         else:
209             log.msg('Stale, need to redownload: %s' % url)
210             self.startDownload([], req, HashObject(), url, d)
211     
212     def check_freshness_error(self, err, req, url, d):
213         """Mirror request failed, continue with download.
214         
215         @param err: the response from the mirror to the HEAD request
216         @type req: L{twisted.web2.http.Request}
217         @param req: the initial request sent to the HTTP server by apt
218         @param url: the URI of the actual mirror request
219         """
220         log.err(err)
221         self.startDownload([], req, HashObject(), url, d)
222     
223     def getCachedFile(self, hash, req, url, d, locations):
224         """Try to return the file from the cache, otherwise move on to a DHT lookup.
225         
226         @type locations: C{list} of C{dictionary}
227         @param locations: the files in the cache that match the hash,
228             the dictionary contains a key 'path' whose value is a
229             L{twisted.python.filepath.FilePath} object for the file.
230         """
231         if not locations:
232             log.msg('Failed to return file from cache: %s' % url)
233             self.lookupHash(req, hash, url, d)
234             return
235         
236         # Get the first possible location from the list
237         file = locations.pop(0)['path']
238         log.msg('Returning cached file: %s' % file.path)
239         
240         # Get it's response
241         resp = static.File(file.path).renderHTTP(req)
242         if isinstance(resp, defer.Deferred):
243             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
244         else:
245             self._getCachedFile(resp, hash, req, url, d, locations)
246         
247     def _getCachedFile(self, resp, hash, req, url, d, locations):
248         """Check the returned response to be sure it is valid."""
249         if isinstance(resp, failure.Failure):
250             log.msg('Got error trying to get cached file')
251             log.err(resp)
252             # Try the next possible location
253             self.getCachedFile(hash, req, url, d, locations)
254             return
255             
256         log.msg('Cached response: %r' % resp)
257         
258         if resp.code >= 200 and resp.code < 400:
259             d.callback(resp)
260         else:
261             # Try the next possible location
262             self.getCachedFile(hash, req, url, d, locations)
263
264     def lookupHash(self, req, hash, url, d):
265         """Lookup the hash in the DHT."""
266         log.msg('Looking up hash in DHT for file: %s' % url)
267         key = hash.expected()
268         lookupDefer = self.dht.get(key)
269         lookupDefer.addBoth(self.startDownload, req, hash, url, d)
270
271     def startDownload(self, values, req, hash, url, d):
272         """Start the download of the file.
273         
274         The download will be from peers if the DHT lookup succeeded, or
275         from the mirror otherwise.
276         
277         @type values: C{list} of C{dictionary}
278         @param values: the returned values from the DHT containing peer
279             download information
280         """
281         # Remove some headers Apt sets in the request
282         req.headers.removeHeader('If-Modified-Since')
283         req.headers.removeHeader('Range')
284         req.headers.removeHeader('If-Range')
285         
286         if not isinstance(values, list) or not values:
287             if not isinstance(values, list):
288                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
289             else:
290                 log.msg('Peers for %s were not found' % url)
291             getDefer = self.peers.get(hash, url)
292             getDefer.addErrback(self.final_fallback, hash, url)
293             getDefer.addCallback(self.cache.save_file, hash, url)
294             getDefer.addErrback(self.cache.save_error, url)
295             getDefer.addCallbacks(d.callback, d.errback)
296         else:
297             log.msg('Found peers for %s: %r' % (url, values))
298             # Download from the found peers
299             getDefer = self.peers.get(hash, url, values)
300             getDefer.addCallback(self.check_response, hash, url)
301             getDefer.addCallback(self.cache.save_file, hash, url)
302             getDefer.addErrback(self.cache.save_error, url)
303             getDefer.addCallbacks(d.callback, d.errback)
304             
305     def check_response(self, response, hash, url):
306         """Check the response from peers, and download from the mirror if it is not."""
307         if response.code < 200 or response.code >= 300:
308             log.msg('Download from peers failed, going to direct download: %s' % url)
309             getDefer = self.peers.get(hash, url)
310             getDefer.addErrback(self.final_fallback, hash, url)
311             return getDefer
312         return response
313         
314     def final_fallback(self, err, hash, url):
315         """Final retry if the mirror still generated an error."""
316         log.msg('Download from mirror failed, retrying once only: %s' % url)
317         log.err(err)
318         getDefer = self.peers.get(hash, url)
319         return getDefer
320         
321     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
322         """Add a newly cached file to the mirror info and/or the DHT.
323         
324         If the file was downloaded, set url to the path it was downloaded for.
325         Doesn't add a file to the DHT unless a hash was found for it
326         (but does add it anyway if forceDHT is True).
327         
328         @type file_path: L{twisted.python.filepath.FilePath}
329         @param file_path: the location of the file in the local cache
330         @type hash: L{Hash.HashObject}
331         @param hash: the original (expected) hash object containing also the
332             hash of the downloaded file
333         @type new_hash: C{boolean}
334         @param new_hash: whether the has was new to this peer, and so should
335             be added to the DHT
336         @type url: C{string}
337         @param url: the URI of the location of the file in the mirror
338             (optional, defaults to not adding the file to the mirror info)
339         @type forceDHT: C{boolean}
340         @param forceDHT: whether to force addition of the file to the DHT
341             even if the hash was not found in a mirror
342             (optional, defaults to False)
343         """
344         if url:
345             self.mirrors.updatedFile(url, file_path)
346         
347         if self.my_addr and hash and new_hash and (hash.expected() is not None or forceDHT):
348             return self.dht.store(hash)
349         return None
350