2 """Serve local requests from apt and remote requests from peers."""
4 from urllib import quote_plus, unquote_plus
5 from binascii import b2a_hex
8 from twisted.python import log
9 from twisted.internet import defer
10 from twisted.web2 import server, http, resource, channel, stream
11 from twisted.web2 import static, http_headers, responsecode
12 from twisted.trial import unittest
13 from twisted.python.filepath import FilePath
15 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
16 from apt_p2p_conf import config
17 from apt_p2p_Khashmir.bencode import bencode
19 class FileDownloader(static.File):
20 """Modified to make it suitable for apt requests.
22 Tries to find requests in the cache. Found files are first checked for
23 freshness before being sent. Requests for unfound and stale files are
24 forwarded to the main program for downloading.
26 @type manager: L{apt_p2p.AptP2P}
27 @ivar manager: the main program to query
30 def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
31 self.manager = manager
32 super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
34 def renderHTTP(self, req):
35 log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
36 resp = super(FileDownloader, self).renderHTTP(req)
37 if isinstance(resp, defer.Deferred):
38 resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
39 callbackArgs = (req, ), errbackArgs = (req, ))
41 resp = self._renderHTTP_done(resp, req)
44 def _renderHTTP_done(self, resp, req):
45 log.msg('Initial response to %s: %r' % (req.uri, resp))
48 path = 'http:/' + req.uri
49 if resp.code >= 200 and resp.code < 400:
50 return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
52 log.msg('Not found, trying other methods for %s' % req.uri)
53 return self.manager.get_resp(req, path)
57 def _renderHTTP_error(self, err, req):
58 log.msg('Failed to render %s: %r' % (req.uri, err))
62 path = 'http:/' + req.uri
63 return self.manager.get_resp(req, path)
67 def createSimilarFile(self, path):
68 return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
69 self.processors, self.indexNames[:])
72 """Identifier for streams that are uploaded to peers."""
74 class FileUploaderStream(stream.FileStream, UploadStream):
75 """Modified to make it suitable for streaming to peers.
77 Streams the file in small chunks to make it easier to throttle the
80 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
85 def read(self, sendfile=False):
94 # Remove the SendFileBuffer and mmap use, just use string reads and writes
96 readSize = min(length, self.CHUNK_SIZE)
98 self.f.seek(self.start)
99 b = self.f.read(readSize)
102 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
104 self.length -= bytesRead
105 self.start += bytesRead
108 class PiecesUploaderStream(stream.MemoryStream, UploadStream):
109 """Modified to identify it for streaming to peers."""
111 class PiecesUploader(static.Data):
112 """Modified to identify it for peer requests.
114 Uses the modified L{PieceUploaderStream} to stream the pieces for throttling.
117 def render(self, req):
118 return http.Response(responsecode.OK,
119 http_headers.Headers({'content-type': self.contentType()}),
120 stream=PiecesUploaderStream(self.data))
122 class FileUploader(static.File):
123 """Modified to make it suitable for peer requests.
125 Uses the modified L{FileUploaderStream} to stream the file for throttling,
126 and doesn't do any listing of directory contents.
129 def render(self, req):
130 if not self.fp.exists():
131 return responsecode.NOT_FOUND
134 # Don't try to render a directory listing
135 return responsecode.NOT_FOUND
141 if e[0] == errno.EACCES:
142 return responsecode.FORBIDDEN
143 elif e[0] == errno.ENOENT:
144 return responsecode.NOT_FOUND
148 response = http.Response()
149 # Use the modified FileStream
150 response.stream = FileUploaderStream(f, 0, self.fp.getsize())
152 for (header, value) in (
153 ("content-type", self.contentType()),
154 ("content-encoding", self.contentEncoding()),
156 if value is not None:
157 response.headers.setHeader(header, value)
161 class UploadThrottlingProtocol(ThrottlingProtocol):
162 """Protocol for throttling uploads.
164 Determines whether or not to throttle the upload based on the type of stream.
165 Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
166 apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
171 def __init__(self, factory, wrappedProtocol):
172 ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
173 self.throttle = False
175 def write(self, data):
177 ThrottlingProtocol.write(self, data)
179 self.stats.sentBytes(len(data))
181 ProtocolWrapper.write(self, data)
183 def writeSequence(self, seq):
185 ThrottlingProtocol.writeSequence(self, seq)
187 self.stats.sentBytes(reduce(operator.add, map(len, seq)))
189 ProtocolWrapper.writeSequence(self, seq)
191 def registerProducer(self, producer, streaming):
192 ThrottlingProtocol.registerProducer(self, producer, streaming)
193 streamType = getattr(producer, 'stream', None)
194 if isinstance(streamType, UploadStream):
198 class TopLevel(resource.Resource):
199 """The HTTP server for all requests, both from peers and apt.
201 @type directory: L{twisted.python.filepath.FilePath}
202 @ivar directory: the directory to check for cached files
204 @ivar db: the database to use for looking up files and hashes
205 @type manager: L{apt_p2p.AptP2P}
206 @ivar manager: the main program object to send requests to
207 @type factory: L{twisted.web2.channel.HTTPFactory} or L{policies.ThrottlingFactory}
208 @ivar factory: the factory to use to serve HTTP requests
213 def __init__(self, directory, db, manager):
214 """Initialize the instance.
216 @type directory: L{twisted.python.filepath.FilePath}
217 @param directory: the directory to check for cached files
219 @param db: the database to use for looking up files and hashes
220 @type manager: L{apt_p2p.AptP2P}
221 @param manager: the main program object to send requests to
223 self.directory = directory
225 self.manager = manager
226 self.uploadLimit = None
227 if config.getint('DEFAULT', 'UPLOAD_LIMIT') > 0:
228 self.uploadLimit = int(config.getint('DEFAULT', 'UPLOAD_LIMIT')*1024)
231 def getHTTPFactory(self):
232 """Initialize and get the factory for this HTTP server."""
233 if self.factory is None:
234 self.factory = channel.HTTPFactory(server.Site(self),
235 **{'maxPipeline': 10,
236 'betweenRequestsTimeOut': 60})
237 self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
238 self.factory.protocol = UploadThrottlingProtocol
240 self.factory.protocol.stats = self.manager.stats
243 def render(self, ctx):
244 """Render a web page with descriptive statistics."""
246 return http.Response(
248 {'content-type': http_headers.MimeType('text', 'html')},
249 self.manager.getStats())
251 return http.Response(
253 {'content-type': http_headers.MimeType('text', 'html')},
254 '<html><body><p>Some Statistics</body></html>')
256 def locateChild(self, request, segments):
257 """Process the incoming request."""
258 log.msg('Got HTTP request for %s from %s' % (request.uri, request.remoteAddr))
261 # If the request is for a shared file (from a peer)
263 if len(segments) != 2:
264 log.msg('Got a malformed request from %s' % request.remoteAddr)
267 # Find the file in the database
268 # Have to unquote_plus the uri, because the segments are unquoted by twisted
269 hash = unquote_plus(request.uri[3:])
270 files = self.db.lookupHash(hash)
272 # If it is a file, return it
273 if 'path' in files[0]:
274 log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
275 return FileUploader(files[0]['path'].path), ()
277 # It's not for a file, but for a piece string, so return that
278 log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
279 return PiecesUploader(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
281 log.msg('Hash could not be found in database: %r' % hash)
285 # It's a request from apt
287 # Only local requests (apt) get past this point
288 if request.remoteAddr.host != "127.0.0.1":
289 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
292 # Block access to index .diff files (for now)
293 if 'Packages.diff' in segments or 'Sources.diff' in segments or name == 'favicon.ico':
296 return FileDownloader(self.directory.path, self.manager), segments[0:]
298 # Will render the statistics page
300 # Only local requests for stats are allowed
301 if not config.getboolean('DEFAULT', 'REMOTE_STATS') and request.remoteAddr.host != "127.0.0.1":
302 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
307 log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
310 class TestTopLevel(unittest.TestCase):
311 """Unit tests for the HTTP Server."""
315 torrent_hash = '\xca \xb8\x0c\x00\xe7\x07\xf8~])+\x9d\xe5_B\xff\x1a\xc4!'
316 torrent = 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
317 file_hash = '\xf8~])+\x9d\xe5_B\xff\x1a\xc4!\xca \xb8\x0c\x00\xe7\x07'
320 self.client = TopLevel(FilePath('/boot'), self, None)
322 def lookupHash(self, hash):
323 if hash == self.torrent_hash:
324 return [{'pieces': self.torrent}]
325 elif hash == self.file_hash:
326 return [{'path': FilePath('/boot/grub/stage2')}]
330 def create_request(self, host, path):
331 req = server.Request(None, 'GET', path, (1,1), 0, http_headers.Headers())
335 req.remoteAddr = addr()
336 req.remoteAddr.host = host
337 req.remoteAddr.port = 23456
338 server.Request._parseURL(req)
341 def test_unauthorized(self):
342 req = self.create_request('128.0.0.1', '/foo/bar')
343 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
345 def test_Packages_diff(self):
346 req = self.create_request('127.0.0.1',
347 '/ftp.us.debian.org/debian/dists/unstable/main/binary-i386/Packages.diff/Index')
348 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
350 def test_Statistics(self):
351 req = self.create_request('127.0.0.1', '/')
352 res = req._getChild(None, self.client, req.postpath)
353 self.failIfEqual(res, None)
354 df = defer.maybeDeferred(res.renderHTTP, req)
355 df.addCallback(self.check_resp, 200)
358 def test_apt_download(self):
359 req = self.create_request('127.0.0.1',
360 '/ftp.us.debian.org/debian/dists/stable/Release')
361 res = req._getChild(None, self.client, req.postpath)
362 self.failIfEqual(res, None)
363 self.failUnless(isinstance(res, FileDownloader))
364 df = defer.maybeDeferred(res.renderHTTP, req)
365 df.addCallback(self.check_resp, 404)
368 def test_torrent_upload(self):
369 req = self.create_request('123.45.67.89',
370 '/~/' + quote_plus(self.torrent_hash))
371 res = req._getChild(None, self.client, req.postpath)
372 self.failIfEqual(res, None)
373 self.failUnless(isinstance(res, static.Data))
374 df = defer.maybeDeferred(res.renderHTTP, req)
375 df.addCallback(self.check_resp, 200)
378 def test_file_upload(self):
379 req = self.create_request('123.45.67.89',
380 '/~/' + quote_plus(self.file_hash))
381 res = req._getChild(None, self.client, req.postpath)
382 self.failIfEqual(res, None)
383 self.failUnless(isinstance(res, FileUploader))
384 df = defer.maybeDeferred(res.renderHTTP, req)
385 df.addCallback(self.check_resp, 200)
388 def test_missing_hash(self):
389 req = self.create_request('123.45.67.89',
390 '/~/' + quote_plus('foobar'))
391 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
393 def check_resp(self, resp, code):
394 self.failUnlessEqual(resp.code, code)
398 for p in self.pending_calls:
401 self.pending_calls = []
405 if __name__ == '__builtin__':
406 # Running from twistd -ny HTTPServer.py
408 # wget -S 'http://localhost:18080/~/whatever'
409 # wget -S 'http://localhost:18080/~/pieces'
412 from twisted.python.filepath import FilePath
415 def lookupHash(self, hash):
417 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
418 return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
420 t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None)
421 factory = t.getHTTPFactory()
423 # Standard twisted application Boilerplate
424 from twisted.application import service, strports
425 application = service.Application("demoserver")
426 s = strports.service('tcp:18080', factory)
427 s.setServiceParent(application)