2 """Serve local requests from apt and remote requests from peers."""
4 from urllib import quote_plus, unquote_plus
5 from binascii import b2a_hex
7 from twisted.python import log
8 from twisted.internet import defer
9 from twisted.web2 import server, http, resource, channel, stream
10 from twisted.web2 import static, http_headers, responsecode
11 from twisted.trial import unittest
12 from twisted.python.filepath import FilePath
14 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
15 from apt_p2p_conf import config
16 from apt_p2p_Khashmir.bencode import bencode
18 class FileDownloader(static.File):
19 """Modified to make it suitable for apt requests.
21 Tries to find requests in the cache. Found files are first checked for
22 freshness before being sent. Requests for unfound and stale files are
23 forwarded to the main program for downloading.
25 @type manager: L{apt_p2p.AptP2P}
26 @ivar manager: the main program to query
29 def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
30 self.manager = manager
31 super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
33 def renderHTTP(self, req):
34 log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
35 resp = super(FileDownloader, self).renderHTTP(req)
36 if isinstance(resp, defer.Deferred):
37 resp.addCallback(self._renderHTTP_done, req)
39 resp = self._renderHTTP_done(resp, req)
42 def _renderHTTP_done(self, resp, req):
43 log.msg('Initial response to %s: %r' % (req.uri, resp))
46 path = 'http:/' + req.uri
47 if resp.code >= 200 and resp.code < 400:
48 return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
50 log.msg('Not found, trying other methods for %s' % req.uri)
51 return self.manager.get_resp(req, path)
55 def createSimilarFile(self, path):
56 return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
57 self.processors, self.indexNames[:])
59 class FileUploaderStream(stream.FileStream):
60 """Modified to make it suitable for streaming to peers.
62 Streams the file in small chunks to make it easier to throttle the
65 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
70 def read(self, sendfile=False):
79 # Remove the SendFileBuffer and mmap use, just use string reads and writes
81 readSize = min(length, self.CHUNK_SIZE)
83 self.f.seek(self.start)
84 b = self.f.read(readSize)
87 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
89 self.length -= bytesRead
90 self.start += bytesRead
94 class FileUploader(static.File):
95 """Modified to make it suitable for peer requests.
97 Uses the modified L{FileUploaderStream} to stream the file for throttling,
98 and doesn't do any listing of directory contents.
101 def render(self, req):
102 if not self.fp.exists():
103 return responsecode.NOT_FOUND
106 # Don't try to render a directory listing
107 return responsecode.NOT_FOUND
113 if e[0] == errno.EACCES:
114 return responsecode.FORBIDDEN
115 elif e[0] == errno.ENOENT:
116 return responsecode.NOT_FOUND
120 response = http.Response()
121 # Use the modified FileStream
122 response.stream = FileUploaderStream(f, 0, self.fp.getsize())
124 for (header, value) in (
125 ("content-type", self.contentType()),
126 ("content-encoding", self.contentEncoding()),
128 if value is not None:
129 response.headers.setHeader(header, value)
133 class UploadThrottlingProtocol(ThrottlingProtocol):
134 """Protocol for throttling uploads.
136 Determines whether or not to throttle the upload based on the type of stream.
137 Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
138 apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
141 def __init__(self, factory, wrappedProtocol):
142 ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
143 self.throttle = False
145 def write(self, data):
147 ThrottlingProtocol.write(self, data)
149 ProtocolWrapper.write(self, data)
151 def registerProducer(self, producer, streaming):
152 ThrottlingProtocol.registerProducer(self, producer, streaming)
153 streamType = getattr(producer, 'stream', None)
154 if isinstance(streamType, FileUploaderStream) or isinstance(streamType, stream.MemoryStream):
158 class TopLevel(resource.Resource):
159 """The HTTP server for all requests, both from peers and apt.
161 @type directory: L{twisted.python.filepath.FilePath}
162 @ivar directory: the directory to check for cached files
164 @ivar db: the database to use for looking up files and hashes
165 @type manager: L{apt_p2p.AptP2P}
166 @ivar manager: the main program object to send requests to
167 @type factory: L{twisted.web2.channel.HTTPFactory} or L{policies.ThrottlingFactory}
168 @ivar factory: the factory to use to serve HTTP requests
173 def __init__(self, directory, db, manager):
174 """Initialize the instance.
176 @type directory: L{twisted.python.filepath.FilePath}
177 @param directory: the directory to check for cached files
179 @param db: the database to use for looking up files and hashes
180 @type manager: L{apt_p2p.AptP2P}
181 @param manager: the main program object to send requests to
183 self.directory = directory
185 self.manager = manager
186 self.uploadLimit = None
187 if config.getint('DEFAULT', 'UPLOAD_LIMIT') > 0:
188 self.uploadLimit = int(config.getint('DEFAULT', 'UPLOAD_LIMIT')*1024)
191 def getHTTPFactory(self):
192 """Initialize and get the factory for this HTTP server."""
193 if self.factory is None:
194 self.factory = channel.HTTPFactory(server.Site(self),
195 **{'maxPipeline': 10,
196 'betweenRequestsTimeOut': 60})
197 self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
198 self.factory.protocol = UploadThrottlingProtocol
201 def render(self, ctx):
202 """Render a web page with descriptive statistics."""
204 return http.Response(
206 {'content-type': http_headers.MimeType('text', 'html')},
207 self.manager.getStats())
209 return http.Response(
211 {'content-type': http_headers.MimeType('text', 'html')},
212 '<html><body><p>Some Statistics</body></html>')
214 def locateChild(self, request, segments):
215 """Process the incoming request."""
216 log.msg('Got HTTP request for %s from %s' % (request.uri, request.remoteAddr))
219 # If the request is for a shared file (from a peer)
221 if len(segments) != 2:
222 log.msg('Got a malformed request from %s' % request.remoteAddr)
225 # Find the file in the database
226 # Have to unquote_plus the uri, because the segments are unquoted by twisted
227 hash = unquote_plus(request.uri[3:])
228 files = self.db.lookupHash(hash)
230 # If it is a file, return it
231 if 'path' in files[0]:
232 log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
233 return FileUploader(files[0]['path'].path), ()
235 # It's not for a file, but for a piece string, so return that
236 log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
237 return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
239 log.msg('Hash could not be found in database: %r' % hash)
241 # Only local requests (apt) get past this point
242 if request.remoteAddr.host != "127.0.0.1":
243 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
246 # Block access to index .diff files (for now)
247 if 'Packages.diff' in segments or 'Sources.diff' in segments:
251 # It's a request from apt
252 return FileDownloader(self.directory.path, self.manager), segments[0:]
254 # Will render the statistics page
257 log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
260 class TestTopLevel(unittest.TestCase):
261 """Unit tests for the HTTP Server."""
265 torrent_hash = '\xca \xb8\x0c\x00\xe7\x07\xf8~])+\x9d\xe5_B\xff\x1a\xc4!'
266 torrent = 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
267 file_hash = '\xf8~])+\x9d\xe5_B\xff\x1a\xc4!\xca \xb8\x0c\x00\xe7\x07'
270 self.client = TopLevel(FilePath('/boot'), self, None)
272 def lookupHash(self, hash):
273 if hash == self.torrent_hash:
274 return [{'pieces': self.torrent}]
275 elif hash == self.file_hash:
276 return [{'path': FilePath('/boot/grub/stage2')}]
280 def create_request(self, host, path):
281 req = server.Request(None, 'GET', path, (1,1), 0, http_headers.Headers())
285 req.remoteAddr = addr()
286 req.remoteAddr.host = host
287 req.remoteAddr.port = 23456
288 server.Request._parseURL(req)
291 def test_unauthorized(self):
292 req = self.create_request('128.0.0.1', '/foo/bar')
293 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
295 def test_Packages_diff(self):
296 req = self.create_request('127.0.0.1',
297 '/ftp.us.debian.org/debian/dists/unstable/main/binary-i386/Packages.diff/Index')
298 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
300 def test_Statistics(self):
301 req = self.create_request('127.0.0.1', '/')
302 res = req._getChild(None, self.client, req.postpath)
303 self.failIfEqual(res, None)
304 df = defer.maybeDeferred(res.renderHTTP, req)
305 df.addCallback(self.check_resp, 200)
308 def test_apt_download(self):
309 req = self.create_request('127.0.0.1',
310 '/ftp.us.debian.org/debian/dists/stable/Release')
311 res = req._getChild(None, self.client, req.postpath)
312 self.failIfEqual(res, None)
313 self.failUnless(isinstance(res, FileDownloader))
314 df = defer.maybeDeferred(res.renderHTTP, req)
315 df.addCallback(self.check_resp, 404)
318 def test_torrent_upload(self):
319 req = self.create_request('123.45.67.89',
320 '/~/' + quote_plus(self.torrent_hash))
321 res = req._getChild(None, self.client, req.postpath)
322 self.failIfEqual(res, None)
323 self.failUnless(isinstance(res, static.Data))
324 df = defer.maybeDeferred(res.renderHTTP, req)
325 df.addCallback(self.check_resp, 200)
328 def test_file_upload(self):
329 req = self.create_request('123.45.67.89',
330 '/~/' + quote_plus(self.file_hash))
331 res = req._getChild(None, self.client, req.postpath)
332 self.failIfEqual(res, None)
333 self.failUnless(isinstance(res, FileUploader))
334 df = defer.maybeDeferred(res.renderHTTP, req)
335 df.addCallback(self.check_resp, 200)
338 def test_missing_hash(self):
339 req = self.create_request('123.45.67.89',
340 '/~/' + quote_plus('foobar'))
341 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
343 def check_resp(self, resp, code):
344 self.failUnlessEqual(resp.code, code)
348 for p in self.pending_calls:
351 self.pending_calls = []
355 if __name__ == '__builtin__':
356 # Running from twistd -ny HTTPServer.py
358 # wget -S 'http://localhost:18080/~/whatever'
359 # wget -S 'http://localhost:18080/~/pieces'
362 from twisted.python.filepath import FilePath
365 def lookupHash(self, hash):
367 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
368 return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
370 t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None, 0)
371 factory = t.getHTTPFactory()
373 # Standard twisted application Boilerplate
374 from twisted.application import service, strports
375 application = service.Application("demoserver")
376 s = strports.service('tcp:18080', factory)
377 s.setServiceParent(application)