2 """Serve local requests from apt and remote requests from peers."""
4 from urllib import quote_plus, unquote_plus
5 from binascii import b2a_hex
8 from twisted.python import log
9 from twisted.internet import defer
10 from twisted.web2 import server, http, resource, channel, stream
11 from twisted.web2 import static, http_headers, responsecode
12 from twisted.trial import unittest
13 from twisted.python.filepath import FilePath
15 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
16 from apt_p2p_conf import config
17 from apt_p2p_Khashmir.bencode import bencode
19 class FileDownloader(static.File):
20 """Modified to make it suitable for apt requests.
22 Tries to find requests in the cache. Found files are first checked for
23 freshness before being sent. Requests for unfound and stale files are
24 forwarded to the main program for downloading.
26 @type manager: L{apt_p2p.AptP2P}
27 @ivar manager: the main program to query
30 def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
31 self.manager = manager
32 super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
34 def renderHTTP(self, req):
35 log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
36 resp = super(FileDownloader, self).renderHTTP(req)
37 if isinstance(resp, defer.Deferred):
38 resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
39 callbackArgs = (req, ), errbackArgs = (req, ))
41 resp = self._renderHTTP_done(resp, req)
44 def _renderHTTP_done(self, resp, req):
45 log.msg('Initial response to %s: %r' % (req.uri, resp))
48 path = 'http:/' + req.uri
49 if resp.code >= 200 and resp.code < 400:
50 return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
52 log.msg('Not found, trying other methods for %s' % req.uri)
53 return self.manager.get_resp(req, path)
57 def _renderHTTP_error(self, err, req):
58 log.msg('Failed to render %s: %r' % (req.uri, err))
62 path = 'http:/' + req.uri
63 return self.manager.get_resp(req, path)
67 def createSimilarFile(self, path):
68 return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
69 self.processors, self.indexNames[:])
71 class FileUploaderStream(stream.FileStream):
72 """Modified to make it suitable for streaming to peers.
74 Streams the file in small chunks to make it easier to throttle the
77 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
82 def read(self, sendfile=False):
91 # Remove the SendFileBuffer and mmap use, just use string reads and writes
93 readSize = min(length, self.CHUNK_SIZE)
95 self.f.seek(self.start)
96 b = self.f.read(readSize)
99 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
101 self.length -= bytesRead
102 self.start += bytesRead
106 class FileUploader(static.File):
107 """Modified to make it suitable for peer requests.
109 Uses the modified L{FileUploaderStream} to stream the file for throttling,
110 and doesn't do any listing of directory contents.
113 def render(self, req):
114 if not self.fp.exists():
115 return responsecode.NOT_FOUND
118 # Don't try to render a directory listing
119 return responsecode.NOT_FOUND
125 if e[0] == errno.EACCES:
126 return responsecode.FORBIDDEN
127 elif e[0] == errno.ENOENT:
128 return responsecode.NOT_FOUND
132 response = http.Response()
133 # Use the modified FileStream
134 response.stream = FileUploaderStream(f, 0, self.fp.getsize())
136 for (header, value) in (
137 ("content-type", self.contentType()),
138 ("content-encoding", self.contentEncoding()),
140 if value is not None:
141 response.headers.setHeader(header, value)
145 class UploadThrottlingProtocol(ThrottlingProtocol):
146 """Protocol for throttling uploads.
148 Determines whether or not to throttle the upload based on the type of stream.
149 Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
150 apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
155 def __init__(self, factory, wrappedProtocol):
156 ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
157 self.throttle = False
159 def write(self, data):
161 ThrottlingProtocol.write(self, data)
163 self.stats.sentBytes(len(data))
165 ProtocolWrapper.write(self, data)
167 def writeSequence(self, seq):
169 ThrottlingProtocol.writeSequence(self, seq)
171 self.stats.sentBytes(reduce(operator.add, map(len, seq)))
173 ProtocolWrapper.writeSequence(self, seq)
175 def registerProducer(self, producer, streaming):
176 ThrottlingProtocol.registerProducer(self, producer, streaming)
177 streamType = getattr(producer, 'stream', None)
178 if isinstance(streamType, FileUploaderStream) or isinstance(streamType, stream.MemoryStream):
182 class TopLevel(resource.Resource):
183 """The HTTP server for all requests, both from peers and apt.
185 @type directory: L{twisted.python.filepath.FilePath}
186 @ivar directory: the directory to check for cached files
188 @ivar db: the database to use for looking up files and hashes
189 @type manager: L{apt_p2p.AptP2P}
190 @ivar manager: the main program object to send requests to
191 @type factory: L{twisted.web2.channel.HTTPFactory} or L{policies.ThrottlingFactory}
192 @ivar factory: the factory to use to serve HTTP requests
197 def __init__(self, directory, db, manager):
198 """Initialize the instance.
200 @type directory: L{twisted.python.filepath.FilePath}
201 @param directory: the directory to check for cached files
203 @param db: the database to use for looking up files and hashes
204 @type manager: L{apt_p2p.AptP2P}
205 @param manager: the main program object to send requests to
207 self.directory = directory
209 self.manager = manager
210 self.uploadLimit = None
211 if config.getint('DEFAULT', 'UPLOAD_LIMIT') > 0:
212 self.uploadLimit = int(config.getint('DEFAULT', 'UPLOAD_LIMIT')*1024)
215 def getHTTPFactory(self):
216 """Initialize and get the factory for this HTTP server."""
217 if self.factory is None:
218 self.factory = channel.HTTPFactory(server.Site(self),
219 **{'maxPipeline': 10,
220 'betweenRequestsTimeOut': 60})
221 self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
222 self.factory.protocol = UploadThrottlingProtocol
224 self.factory.protocol.stats = self.manager.stats
227 def render(self, ctx):
228 """Render a web page with descriptive statistics."""
230 return http.Response(
232 {'content-type': http_headers.MimeType('text', 'html')},
233 self.manager.getStats())
235 return http.Response(
237 {'content-type': http_headers.MimeType('text', 'html')},
238 '<html><body><p>Some Statistics</body></html>')
240 def locateChild(self, request, segments):
241 """Process the incoming request."""
242 log.msg('Got HTTP request for %s from %s' % (request.uri, request.remoteAddr))
245 # If the request is for a shared file (from a peer)
247 if len(segments) != 2:
248 log.msg('Got a malformed request from %s' % request.remoteAddr)
251 # Find the file in the database
252 # Have to unquote_plus the uri, because the segments are unquoted by twisted
253 hash = unquote_plus(request.uri[3:])
254 files = self.db.lookupHash(hash)
256 # If it is a file, return it
257 if 'path' in files[0]:
258 log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
259 return FileUploader(files[0]['path'].path), ()
261 # It's not for a file, but for a piece string, so return that
262 log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
263 return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
265 log.msg('Hash could not be found in database: %r' % hash)
267 # Only local requests (apt) get past this point
268 if request.remoteAddr.host != "127.0.0.1":
269 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
272 # Block access to index .diff files (for now)
273 if 'Packages.diff' in segments or 'Sources.diff' in segments:
277 # It's a request from apt
278 return FileDownloader(self.directory.path, self.manager), segments[0:]
280 # Will render the statistics page
283 log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
286 class TestTopLevel(unittest.TestCase):
287 """Unit tests for the HTTP Server."""
291 torrent_hash = '\xca \xb8\x0c\x00\xe7\x07\xf8~])+\x9d\xe5_B\xff\x1a\xc4!'
292 torrent = 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
293 file_hash = '\xf8~])+\x9d\xe5_B\xff\x1a\xc4!\xca \xb8\x0c\x00\xe7\x07'
296 self.client = TopLevel(FilePath('/boot'), self, None)
298 def lookupHash(self, hash):
299 if hash == self.torrent_hash:
300 return [{'pieces': self.torrent}]
301 elif hash == self.file_hash:
302 return [{'path': FilePath('/boot/grub/stage2')}]
306 def create_request(self, host, path):
307 req = server.Request(None, 'GET', path, (1,1), 0, http_headers.Headers())
311 req.remoteAddr = addr()
312 req.remoteAddr.host = host
313 req.remoteAddr.port = 23456
314 server.Request._parseURL(req)
317 def test_unauthorized(self):
318 req = self.create_request('128.0.0.1', '/foo/bar')
319 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
321 def test_Packages_diff(self):
322 req = self.create_request('127.0.0.1',
323 '/ftp.us.debian.org/debian/dists/unstable/main/binary-i386/Packages.diff/Index')
324 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
326 def test_Statistics(self):
327 req = self.create_request('127.0.0.1', '/')
328 res = req._getChild(None, self.client, req.postpath)
329 self.failIfEqual(res, None)
330 df = defer.maybeDeferred(res.renderHTTP, req)
331 df.addCallback(self.check_resp, 200)
334 def test_apt_download(self):
335 req = self.create_request('127.0.0.1',
336 '/ftp.us.debian.org/debian/dists/stable/Release')
337 res = req._getChild(None, self.client, req.postpath)
338 self.failIfEqual(res, None)
339 self.failUnless(isinstance(res, FileDownloader))
340 df = defer.maybeDeferred(res.renderHTTP, req)
341 df.addCallback(self.check_resp, 404)
344 def test_torrent_upload(self):
345 req = self.create_request('123.45.67.89',
346 '/~/' + quote_plus(self.torrent_hash))
347 res = req._getChild(None, self.client, req.postpath)
348 self.failIfEqual(res, None)
349 self.failUnless(isinstance(res, static.Data))
350 df = defer.maybeDeferred(res.renderHTTP, req)
351 df.addCallback(self.check_resp, 200)
354 def test_file_upload(self):
355 req = self.create_request('123.45.67.89',
356 '/~/' + quote_plus(self.file_hash))
357 res = req._getChild(None, self.client, req.postpath)
358 self.failIfEqual(res, None)
359 self.failUnless(isinstance(res, FileUploader))
360 df = defer.maybeDeferred(res.renderHTTP, req)
361 df.addCallback(self.check_resp, 200)
364 def test_missing_hash(self):
365 req = self.create_request('123.45.67.89',
366 '/~/' + quote_plus('foobar'))
367 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
369 def check_resp(self, resp, code):
370 self.failUnlessEqual(resp.code, code)
374 for p in self.pending_calls:
377 self.pending_calls = []
381 if __name__ == '__builtin__':
382 # Running from twistd -ny HTTPServer.py
384 # wget -S 'http://localhost:18080/~/whatever'
385 # wget -S 'http://localhost:18080/~/pieces'
388 from twisted.python.filepath import FilePath
391 def lookupHash(self, hash):
393 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
394 return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
396 t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None, 0)
397 factory = t.getHTTPFactory()
399 # Standard twisted application Boilerplate
400 from twisted.application import service, strports
401 application = service.Application("demoserver")
402 s = strports.service('tcp:18080', factory)
403 s.setServiceParent(application)