2 """The main program code.
4 @var download_dir: the name of the directory to use for downloaded files
5 @var peer_dir: the name of the directory to use for peer downloads
8 from urllib import unquote
10 from twisted.internet import defer, reactor, protocol
11 from twisted.web2 import static
12 from twisted.python import log, failure
13 from twisted.python.filepath import FilePath
15 from apt_p2p_conf import config
16 from DHTManager import DHT
17 from PeerManager import PeerManager
18 from HTTPServer import TopLevel
19 from MirrorManager import MirrorManager
20 from CacheManager import CacheManager
21 from Hash import HashObject
23 from stats import StatsLogger
25 download_dir = 'cache'
28 class AptP2P(protocol.Factory):
29 """The main code object that does all of the work.
31 Contains all of the sub-components that do all the low-level work, and
32 coordinates communication between them.
34 @type dhtClass: L{interfaces.IDHT}
35 @ivar dhtClass: the DHT class to use
36 @type cache_dir: L{twisted.python.filepath.FilePath}
37 @ivar cache_dir: the directory to use for storing all files
39 @ivar db: the database to use for tracking files and hashes
40 @type dht: L{DHTManager.DHT}
41 @ivar dht: the manager for DHT requests
42 @type stats: L{stats.StatsLogger}
43 @ivar stats: the statistics logger to record sent data to
44 @type http_server: L{HTTPServer.TopLevel}
45 @ivar http_server: the web server that will handle all requests from apt
47 @type peers: L{PeerManager.PeerManager}
48 @ivar peers: the manager of all downloads from mirrors and other peers
49 @type mirrors: L{MirrorManager.MirrorManager}
50 @ivar mirrors: the manager of downloaded information about mirrors which
51 can be queried to get hashes from file names
52 @type cache: L{CacheManager.CacheManager}
53 @ivar cache: the manager of all downloaded files
54 @type my_addr: C{string}, C{int}
55 @ivar my_addr: the IP address and port of this peer
58 def __init__(self, dhtClass):
59 """Initialize all the sub-components.
61 @type dhtClass: L{interfaces.IDHT}
62 @param dhtClass: the DHT class to use
64 log.msg('Initializing the main apt_p2p application')
65 self.dhtClass = dhtClass
69 def startFactory(self):
70 reactor.callLater(0, self._startFactory)
72 def _startFactory(self):
73 log.msg('Starting the main apt_p2p application')
74 self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75 if not self.cache_dir.child(download_dir).exists():
76 self.cache_dir.child(download_dir).makedirs()
77 if not self.cache_dir.child(peer_dir).exists():
78 self.cache_dir.child(peer_dir).makedirs()
79 self.db = DB(self.cache_dir.child('apt-p2p.db'))
80 self.dht = DHT(self.dhtClass, self.db)
82 df.addCallback(self._dhtStarted)
83 self.stats = StatsLogger(self.db)
84 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
85 self.http_server.getHTTPFactory().startFactory()
86 self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
87 self.mirrors = MirrorManager(self.cache_dir)
88 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
90 def _dhtStarted(self, result):
91 """Save the returned address and start scanning the cache."""
93 self.cache.scanDirectories()
95 def stopFactory(self):
96 log.msg('Stoppping the main apt_p2p application')
97 self.http_server.getHTTPFactory().stopFactory()
98 self.mirrors.cleanup()
102 def buildProtocol(self, addr):
103 return self.http_server.getHTTPFactory().buildProtocol(addr)
107 """Retrieve and format the statistics for the program.
110 @return: the formatted HTML page containing the statistics
112 out = '<html><body>\n\n'
113 out += self.stats.formatHTML(self.my_addr)
115 out += self.dht.getStats()
116 out += '\n</body></html>\n'
120 def get_resp(self, req, url, orig_resp = None):
121 """Lookup a hash for the file in the local mirror info.
123 Starts the process of getting a response to an apt request.
125 @type req: L{twisted.web2.http.Request}
126 @param req: the initial request sent to the HTTP server by apt
127 @param url: the URI of the actual mirror request
128 @type orig_resp: L{twisted.web2.http.Response}
129 @param orig_resp: the response from the cache to be sent to apt
130 (optional, ignored if missing)
131 @rtype: L{twisted.internet.defer.Deferred}
132 @return: a deferred that will be called back with the response
136 log.msg('Trying to find hash for %s' % url)
137 findDefer = self.mirrors.findHash(unquote(url))
139 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
140 callbackArgs=(req, url, orig_resp, d),
141 errbackArgs=(req, url, orig_resp, d))
144 def findHash_error(self, failure, req, url, orig_resp, d):
145 """Process the error in hash lookup by returning an empty L{HashObject}."""
146 log.msg('Hash lookup for %s resulted in an error: %s' %
147 (url, failure.getErrorMessage()))
148 self.findHash_done(HashObject(), req, url, orig_resp, d)
150 def findHash_done(self, hash, req, url, orig_resp, d):
151 """Use the returned hash to lookup the file in the cache.
153 If the hash was not found, the workflow skips down to download from
154 the mirror (L{startDownload}), or checks the freshness of an old
155 response if there is one.
157 @type hash: L{Hash.HashObject}
158 @param hash: the hash object containing the expected hash for the file
160 if hash.expected() is None:
161 log.msg('Hash for %s was not found' % url)
162 # Send the old response or get a new one
164 self.check_freshness(req, url, orig_resp, d)
166 self.startDownload([], req, hash, url, d)
168 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
170 # Lookup hash in cache
171 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
172 self.getCachedFile(hash, req, url, d, locations)
174 def check_freshness(self, req, url, orig_resp, d):
175 """Send a HEAD to the mirror to check if the response from the cache is still valid.
177 @type req: L{twisted.web2.http.Request}
178 @param req: the initial request sent to the HTTP server by apt
179 @param url: the URI of the actual mirror request
180 @type orig_resp: L{twisted.web2.http.Response}
181 @param orig_resp: the response from the cache to be sent to apt
182 @rtype: L{twisted.internet.defer.Deferred}
183 @return: a deferred that will be called back with the correct response
185 log.msg('Checking if %s is still fresh' % url)
186 modtime = orig_resp.headers.getHeader('Last-Modified')
187 headDefer = self.peers.get(HashObject(), url, method = "HEAD",
189 headDefer.addCallbacks(self.check_freshness_done,
190 self.check_freshness_error,
191 callbackArgs = (req, url, orig_resp, d),
192 errbackArgs = (req, url, d))
194 def check_freshness_done(self, resp, req, url, orig_resp, d):
195 """Return the fresh response, if stale start to redownload.
197 @type resp: L{twisted.web2.http.Response}
198 @param resp: the response from the mirror to the HEAD request
199 @type req: L{twisted.web2.http.Request}
200 @param req: the initial request sent to the HTTP server by apt
201 @param url: the URI of the actual mirror request
202 @type orig_resp: L{twisted.web2.http.Response}
203 @param orig_resp: the response from the cache to be sent to apt
206 log.msg('Still fresh, returning: %s' % url)
207 d.callback(orig_resp)
209 log.msg('Stale, need to redownload: %s' % url)
210 self.startDownload([], req, HashObject(), url, d)
212 def check_freshness_error(self, err, req, url, d):
213 """Mirror request failed, continue with download.
215 @param err: the response from the mirror to the HEAD request
216 @type req: L{twisted.web2.http.Request}
217 @param req: the initial request sent to the HTTP server by apt
218 @param url: the URI of the actual mirror request
221 self.startDownload([], req, HashObject(), url, d)
223 def getCachedFile(self, hash, req, url, d, locations):
224 """Try to return the file from the cache, otherwise move on to a DHT lookup.
226 @type locations: C{list} of C{dictionary}
227 @param locations: the files in the cache that match the hash,
228 the dictionary contains a key 'path' whose value is a
229 L{twisted.python.filepath.FilePath} object for the file.
232 log.msg('Failed to return file from cache: %s' % url)
233 self.lookupHash(req, hash, url, d)
236 # Get the first possible location from the list
237 file = locations.pop(0)['path']
238 log.msg('Returning cached file: %s' % file.path)
241 resp = static.File(file.path).renderHTTP(req)
242 if isinstance(resp, defer.Deferred):
243 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
245 self._getCachedFile(resp, hash, req, url, d, locations)
247 def _getCachedFile(self, resp, hash, req, url, d, locations):
248 """Check the returned response to be sure it is valid."""
249 if isinstance(resp, failure.Failure):
250 log.msg('Got error trying to get cached file')
252 # Try the next possible location
253 self.getCachedFile(hash, req, url, d, locations)
256 log.msg('Cached response: %r' % resp)
258 if resp.code >= 200 and resp.code < 400:
261 # Try the next possible location
262 self.getCachedFile(hash, req, url, d, locations)
264 def lookupHash(self, req, hash, url, d):
265 """Lookup the hash in the DHT."""
266 log.msg('Looking up hash in DHT for file: %s' % url)
267 key = hash.expected()
268 lookupDefer = self.dht.get(key)
269 lookupDefer.addBoth(self.startDownload, req, hash, url, d)
271 def startDownload(self, values, req, hash, url, d):
272 """Start the download of the file.
274 The download will be from peers if the DHT lookup succeeded, or
275 from the mirror otherwise.
277 @type values: C{list} of C{dictionary}
278 @param values: the returned values from the DHT containing peer
281 # Remove some headers Apt sets in the request
282 req.headers.removeHeader('If-Modified-Since')
283 req.headers.removeHeader('Range')
284 req.headers.removeHeader('If-Range')
286 if not isinstance(values, list) or not values:
287 if not isinstance(values, list):
288 log.msg('DHT lookup for %s failed with error %r' % (url, values))
290 log.msg('Peers for %s were not found' % url)
291 getDefer = self.peers.get(hash, url)
292 getDefer.addCallback(self.cache.save_file, hash, url)
293 getDefer.addErrback(self.cache.save_error, url)
294 getDefer.addCallbacks(d.callback, d.errback)
296 log.msg('Found peers for %s: %r' % (url, values))
297 # Download from the found peers
298 getDefer = self.peers.get(hash, url, values)
299 getDefer.addCallback(self.check_response, hash, url)
300 getDefer.addCallback(self.cache.save_file, hash, url)
301 getDefer.addErrback(self.cache.save_error, url)
302 getDefer.addCallbacks(d.callback, d.errback)
304 def check_response(self, response, hash, url):
305 """Check the response from peers, and download from the mirror if it is not."""
306 if response.code < 200 or response.code >= 300:
307 log.msg('Download from peers failed, going to direct download: %s' % url)
308 getDefer = self.peers.get(hash, url)
312 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
313 """Add a newly cached file to the mirror info and/or the DHT.
315 If the file was downloaded, set url to the path it was downloaded for.
316 Doesn't add a file to the DHT unless a hash was found for it
317 (but does add it anyway if forceDHT is True).
319 @type file_path: L{twisted.python.filepath.FilePath}
320 @param file_path: the location of the file in the local cache
321 @type hash: L{Hash.HashObject}
322 @param hash: the original (expected) hash object containing also the
323 hash of the downloaded file
324 @type new_hash: C{boolean}
325 @param new_hash: whether the has was new to this peer, and so should
328 @param url: the URI of the location of the file in the mirror
329 (optional, defaults to not adding the file to the mirror info)
330 @type forceDHT: C{boolean}
331 @param forceDHT: whether to force addition of the file to the DHT
332 even if the hash was not found in a mirror
333 (optional, defaults to False)
336 self.mirrors.updatedFile(url, file_path)
338 if self.my_addr and hash and new_hash and (hash.expected() is not None or forceDHT):
339 return self.dht.store(hash)