2 """Serve local requests from apt and remote requests from peers."""
4 from urllib import quote_plus, unquote_plus
5 from binascii import b2a_hex
8 from twisted.python import log
9 from twisted.internet import defer
10 from twisted.web2 import server, http, resource, channel, stream
11 from twisted.web2 import static, http_headers, responsecode
12 from twisted.trial import unittest
13 from twisted.python.filepath import FilePath
15 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
16 from apt_p2p_conf import config
17 from apt_p2p_Khashmir.bencode import bencode
19 class FileDownloader(static.File):
20 """Modified to make it suitable for apt requests.
22 Tries to find requests in the cache. Found files are first checked for
23 freshness before being sent. Requests for unfound and stale files are
24 forwarded to the main program for downloading.
26 @type manager: L{apt_p2p.AptP2P}
27 @ivar manager: the main program to query
30 def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
31 self.manager = manager
32 super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
34 def renderHTTP(self, req):
35 log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
36 resp = super(FileDownloader, self).renderHTTP(req)
37 if isinstance(resp, defer.Deferred):
38 resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
39 callbackArgs = (req, ), errbackArgs = (req, ))
41 resp = self._renderHTTP_done(resp, req)
44 def _renderHTTP_done(self, resp, req):
45 log.msg('Initial response to %s: %r' % (req.uri, resp))
48 path = 'http:/' + req.uri
49 if resp.code >= 200 and resp.code < 400:
50 return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
52 log.msg('Not found, trying other methods for %s' % req.uri)
53 return self.manager.get_resp(req, path)
57 def _renderHTTP_error(self, err, req):
58 log.msg('Failed to render %s: %r' % (req.uri, err))
62 path = 'http:/' + req.uri
63 return self.manager.get_resp(req, path)
67 def createSimilarFile(self, path):
68 return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
69 self.processors, self.indexNames[:])
71 class FileUploaderStream(stream.FileStream):
72 """Modified to make it suitable for streaming to peers.
74 Streams the file in small chunks to make it easier to throttle the
77 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
82 def read(self, sendfile=False):
91 # Remove the SendFileBuffer and mmap use, just use string reads and writes
93 readSize = min(length, self.CHUNK_SIZE)
95 self.f.seek(self.start)
96 b = self.f.read(readSize)
99 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
101 self.length -= bytesRead
102 self.start += bytesRead
106 class FileUploader(static.File):
107 """Modified to make it suitable for peer requests.
109 Uses the modified L{FileUploaderStream} to stream the file for throttling,
110 and doesn't do any listing of directory contents.
113 def render(self, req):
114 if not self.fp.exists():
115 return responsecode.NOT_FOUND
118 # Don't try to render a directory listing
119 return responsecode.NOT_FOUND
125 if e[0] == errno.EACCES:
126 return responsecode.FORBIDDEN
127 elif e[0] == errno.ENOENT:
128 return responsecode.NOT_FOUND
132 response = http.Response()
133 # Use the modified FileStream
134 response.stream = FileUploaderStream(f, 0, self.fp.getsize())
136 for (header, value) in (
137 ("content-type", self.contentType()),
138 ("content-encoding", self.contentEncoding()),
140 if value is not None:
141 response.headers.setHeader(header, value)
145 class UploadThrottlingProtocol(ThrottlingProtocol):
146 """Protocol for throttling uploads.
148 Determines whether or not to throttle the upload based on the type of stream.
149 Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
150 apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
155 def __init__(self, factory, wrappedProtocol):
156 ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
157 self.throttle = False
159 def write(self, data):
161 ThrottlingProtocol.write(self, data)
163 stats.sentBytes(len(data))
165 ProtocolWrapper.write(self, data)
167 def writeSequence(self, seq):
169 ThrottlingProtocol.writeSequence(self, seq)
171 stats.sentBytes(reduce(operator.add, map(len, seq)))
173 ProtocolWrapper.writeSequence(self, seq)
175 def registerProducer(self, producer, streaming):
176 ThrottlingProtocol.registerProducer(self, producer, streaming)
177 streamType = getattr(producer, 'stream', None)
178 if isinstance(streamType, FileUploaderStream) or isinstance(streamType, stream.MemoryStream):
182 class TopLevel(resource.Resource):
183 """The HTTP server for all requests, both from peers and apt.
185 @type directory: L{twisted.python.filepath.FilePath}
186 @ivar directory: the directory to check for cached files
188 @ivar db: the database to use for looking up files and hashes
189 @type manager: L{apt_p2p.AptP2P}
190 @ivar manager: the main program object to send requests to
191 @type factory: L{twisted.web2.channel.HTTPFactory} or L{policies.ThrottlingFactory}
192 @ivar factory: the factory to use to serve HTTP requests
197 def __init__(self, directory, db, manager):
198 """Initialize the instance.
200 @type directory: L{twisted.python.filepath.FilePath}
201 @param directory: the directory to check for cached files
203 @param db: the database to use for looking up files and hashes
204 @type manager: L{apt_p2p.AptP2P}
205 @param manager: the main program object to send requests to
207 self.directory = directory
209 self.manager = manager
210 self.uploadLimit = None
211 if config.getint('DEFAULT', 'UPLOAD_LIMIT') > 0:
212 self.uploadLimit = int(config.getint('DEFAULT', 'UPLOAD_LIMIT')*1024)
215 def getHTTPFactory(self):
216 """Initialize and get the factory for this HTTP server."""
217 if self.factory is None:
218 self.factory = channel.HTTPFactory(server.Site(self),
219 **{'maxPipeline': 10,
220 'betweenRequestsTimeOut': 60})
221 self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
222 self.factory.protocol = UploadThrottlingProtocol
223 self.factory.protocol.stats = self.manager.stats
226 def render(self, ctx):
227 """Render a web page with descriptive statistics."""
229 return http.Response(
231 {'content-type': http_headers.MimeType('text', 'html')},
232 self.manager.getStats())
234 return http.Response(
236 {'content-type': http_headers.MimeType('text', 'html')},
237 '<html><body><p>Some Statistics</body></html>')
239 def locateChild(self, request, segments):
240 """Process the incoming request."""
241 log.msg('Got HTTP request for %s from %s' % (request.uri, request.remoteAddr))
244 # If the request is for a shared file (from a peer)
246 if len(segments) != 2:
247 log.msg('Got a malformed request from %s' % request.remoteAddr)
250 # Find the file in the database
251 # Have to unquote_plus the uri, because the segments are unquoted by twisted
252 hash = unquote_plus(request.uri[3:])
253 files = self.db.lookupHash(hash)
255 # If it is a file, return it
256 if 'path' in files[0]:
257 log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
258 return FileUploader(files[0]['path'].path), ()
260 # It's not for a file, but for a piece string, so return that
261 log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
262 return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
264 log.msg('Hash could not be found in database: %r' % hash)
266 # Only local requests (apt) get past this point
267 if request.remoteAddr.host != "127.0.0.1":
268 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
271 # Block access to index .diff files (for now)
272 if 'Packages.diff' in segments or 'Sources.diff' in segments:
276 # It's a request from apt
277 return FileDownloader(self.directory.path, self.manager), segments[0:]
279 # Will render the statistics page
282 log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
285 class TestTopLevel(unittest.TestCase):
286 """Unit tests for the HTTP Server."""
290 torrent_hash = '\xca \xb8\x0c\x00\xe7\x07\xf8~])+\x9d\xe5_B\xff\x1a\xc4!'
291 torrent = 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
292 file_hash = '\xf8~])+\x9d\xe5_B\xff\x1a\xc4!\xca \xb8\x0c\x00\xe7\x07'
295 self.client = TopLevel(FilePath('/boot'), self, None)
297 def lookupHash(self, hash):
298 if hash == self.torrent_hash:
299 return [{'pieces': self.torrent}]
300 elif hash == self.file_hash:
301 return [{'path': FilePath('/boot/grub/stage2')}]
305 def create_request(self, host, path):
306 req = server.Request(None, 'GET', path, (1,1), 0, http_headers.Headers())
310 req.remoteAddr = addr()
311 req.remoteAddr.host = host
312 req.remoteAddr.port = 23456
313 server.Request._parseURL(req)
316 def test_unauthorized(self):
317 req = self.create_request('128.0.0.1', '/foo/bar')
318 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
320 def test_Packages_diff(self):
321 req = self.create_request('127.0.0.1',
322 '/ftp.us.debian.org/debian/dists/unstable/main/binary-i386/Packages.diff/Index')
323 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
325 def test_Statistics(self):
326 req = self.create_request('127.0.0.1', '/')
327 res = req._getChild(None, self.client, req.postpath)
328 self.failIfEqual(res, None)
329 df = defer.maybeDeferred(res.renderHTTP, req)
330 df.addCallback(self.check_resp, 200)
333 def test_apt_download(self):
334 req = self.create_request('127.0.0.1',
335 '/ftp.us.debian.org/debian/dists/stable/Release')
336 res = req._getChild(None, self.client, req.postpath)
337 self.failIfEqual(res, None)
338 self.failUnless(isinstance(res, FileDownloader))
339 df = defer.maybeDeferred(res.renderHTTP, req)
340 df.addCallback(self.check_resp, 404)
343 def test_torrent_upload(self):
344 req = self.create_request('123.45.67.89',
345 '/~/' + quote_plus(self.torrent_hash))
346 res = req._getChild(None, self.client, req.postpath)
347 self.failIfEqual(res, None)
348 self.failUnless(isinstance(res, static.Data))
349 df = defer.maybeDeferred(res.renderHTTP, req)
350 df.addCallback(self.check_resp, 200)
353 def test_file_upload(self):
354 req = self.create_request('123.45.67.89',
355 '/~/' + quote_plus(self.file_hash))
356 res = req._getChild(None, self.client, req.postpath)
357 self.failIfEqual(res, None)
358 self.failUnless(isinstance(res, FileUploader))
359 df = defer.maybeDeferred(res.renderHTTP, req)
360 df.addCallback(self.check_resp, 200)
363 def test_missing_hash(self):
364 req = self.create_request('123.45.67.89',
365 '/~/' + quote_plus('foobar'))
366 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
368 def check_resp(self, resp, code):
369 self.failUnlessEqual(resp.code, code)
373 for p in self.pending_calls:
376 self.pending_calls = []
380 if __name__ == '__builtin__':
381 # Running from twistd -ny HTTPServer.py
383 # wget -S 'http://localhost:18080/~/whatever'
384 # wget -S 'http://localhost:18080/~/pieces'
387 from twisted.python.filepath import FilePath
390 def lookupHash(self, hash):
392 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
393 return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
395 t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None, 0)
396 factory = t.getHTTPFactory()
398 # Standard twisted application Boilerplate
399 from twisted.application import service, strports
400 application = service.Application("demoserver")
401 s = strports.service('tcp:18080', factory)
402 s.setServiceParent(application)