2 """Serve local requests from apt and remote requests from peers."""
4 from urllib import quote_plus, unquote_plus
5 from binascii import b2a_hex
7 from twisted.python import log
8 from twisted.internet import defer
9 from twisted.web2 import server, http, resource, channel, stream
10 from twisted.web2 import static, http_headers, responsecode
11 from twisted.trial import unittest
12 from twisted.python.filepath import FilePath
14 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
15 from apt_p2p_Khashmir.bencode import bencode
17 class FileDownloader(static.File):
18 """Modified to make it suitable for apt requests.
20 Tries to find requests in the cache. Found files are first checked for
21 freshness before being sent. Requests for unfound and stale files are
22 forwarded to the main program for downloading.
24 @type manager: L{apt_p2p.AptP2P}
25 @ivar manager: the main program to query
28 def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
29 self.manager = manager
30 super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
32 def renderHTTP(self, req):
33 log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
34 resp = super(FileDownloader, self).renderHTTP(req)
35 if isinstance(resp, defer.Deferred):
36 resp.addCallback(self._renderHTTP_done, req)
38 resp = self._renderHTTP_done(resp, req)
41 def _renderHTTP_done(self, resp, req):
42 log.msg('Initial response to %s: %r' % (req.uri, resp))
45 path = 'http:/' + req.uri
46 if resp.code >= 200 and resp.code < 400:
47 return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
49 log.msg('Not found, trying other methods for %s' % req.uri)
50 return self.manager.get_resp(req, path)
54 def createSimilarFile(self, path):
55 return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
56 self.processors, self.indexNames[:])
58 class FileUploaderStream(stream.FileStream):
59 """Modified to make it suitable for streaming to peers.
61 Streams the file in small chunks to make it easier to throttle the
64 @ivar CHUNK_SIZE: the size of chunks of data to send at a time
69 def read(self, sendfile=False):
78 # Remove the SendFileBuffer and mmap use, just use string reads and writes
80 readSize = min(length, self.CHUNK_SIZE)
82 self.f.seek(self.start)
83 b = self.f.read(readSize)
86 raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
88 self.length -= bytesRead
89 self.start += bytesRead
93 class FileUploader(static.File):
94 """Modified to make it suitable for peer requests.
96 Uses the modified L{FileUploaderStream} to stream the file for throttling,
97 and doesn't do any listing of directory contents.
100 def render(self, req):
101 if not self.fp.exists():
102 return responsecode.NOT_FOUND
105 # Don't try to render a directory listing
106 return responsecode.NOT_FOUND
112 if e[0] == errno.EACCES:
113 return responsecode.FORBIDDEN
114 elif e[0] == errno.ENOENT:
115 return responsecode.NOT_FOUND
119 response = http.Response()
120 # Use the modified FileStream
121 response.stream = FileUploaderStream(f, 0, self.fp.getsize())
123 for (header, value) in (
124 ("content-type", self.contentType()),
125 ("content-encoding", self.contentEncoding()),
127 if value is not None:
128 response.headers.setHeader(header, value)
132 class UploadThrottlingProtocol(ThrottlingProtocol):
133 """Protocol for throttling uploads.
135 Determines whether or not to throttle the upload based on the type of stream.
136 Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
137 apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
140 def __init__(self, factory, wrappedProtocol):
141 ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
142 self.throttle = False
144 def write(self, data):
146 ThrottlingProtocol.write(self, data)
148 ProtocolWrapper.write(self, data)
150 def registerProducer(self, producer, streaming):
151 ThrottlingProtocol.registerProducer(self, producer, streaming)
152 streamType = getattr(producer, 'stream', None)
153 if isinstance(streamType, FileUploaderStream) or isinstance(streamType, stream.MemoryStream):
157 class TopLevel(resource.Resource):
158 """The HTTP server for all requests, both from peers and apt.
160 @type directory: L{twisted.python.filepath.FilePath}
161 @ivar directory: the directory to check for cached files
163 @ivar db: the database to use for looking up files and hashes
164 @type manager: L{apt_p2p.AptP2P}
165 @ivar manager: the main program object to send requests to
166 @type factory: L{twisted.web2.channel.HTTPFactory} or L{policies.ThrottlingFactory}
167 @ivar factory: the factory to use to serve HTTP requests
172 def __init__(self, directory, db, manager, uploadLimit):
173 """Initialize the instance.
175 @type directory: L{twisted.python.filepath.FilePath}
176 @param directory: the directory to check for cached files
178 @param db: the database to use for looking up files and hashes
179 @type manager: L{apt_p2p.AptP2P}
180 @param manager: the main program object to send requests to
182 self.directory = directory
184 self.manager = manager
185 self.uploadLimit = None
187 self.uploadLimit = int(uploadLimit*1024)
190 def getHTTPFactory(self):
191 """Initialize and get the factory for this HTTP server."""
192 if self.factory is None:
193 self.factory = channel.HTTPFactory(server.Site(self),
194 **{'maxPipeline': 10,
195 'betweenRequestsTimeOut': 60})
196 self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
197 self.factory.protocol = UploadThrottlingProtocol
200 def render(self, ctx):
201 """Render a web page with descriptive statistics."""
203 return http.Response(
205 {'content-type': http_headers.MimeType('text', 'html')},
206 self.manager.getStats())
208 return http.Response(
210 {'content-type': http_headers.MimeType('text', 'html')},
211 '<html><body><p>Some Statistics</body></html>')
213 def locateChild(self, request, segments):
214 """Process the incoming request."""
215 log.msg('Got HTTP request for %s from %s' % (request.uri, request.remoteAddr))
218 # If the request is for a shared file (from a peer)
220 if len(segments) != 2:
221 log.msg('Got a malformed request from %s' % request.remoteAddr)
224 # Find the file in the database
225 # Have to unquote_plus the uri, because the segments are unquoted by twisted
226 hash = unquote_plus(request.uri[3:])
227 files = self.db.lookupHash(hash)
229 # If it is a file, return it
230 if 'path' in files[0]:
231 log.msg('Sharing %s with %s' % (files[0]['path'].path, request.remoteAddr))
232 return FileUploader(files[0]['path'].path), ()
234 # It's not for a file, but for a piece string, so return that
235 log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
236 return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
238 log.msg('Hash could not be found in database: %r' % hash)
240 # Only local requests (apt) get past this point
241 if request.remoteAddr.host != "127.0.0.1":
242 log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
245 # Block access to index .diff files (for now)
246 if 'Packages.diff' in segments or 'Sources.diff' in segments:
250 # It's a request from apt
251 return FileDownloader(self.directory.path, self.manager), segments[0:]
253 # Will render the statistics page
256 log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
259 class TestTopLevel(unittest.TestCase):
260 """Unit tests for the HTTP Server."""
264 torrent_hash = '\xca \xb8\x0c\x00\xe7\x07\xf8~])+\x9d\xe5_B\xff\x1a\xc4!'
265 torrent = 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
266 file_hash = '\xf8~])+\x9d\xe5_B\xff\x1a\xc4!\xca \xb8\x0c\x00\xe7\x07'
269 self.client = TopLevel(FilePath('/boot'), self, None, 0)
271 def lookupHash(self, hash):
272 if hash == self.torrent_hash:
273 return [{'pieces': self.torrent}]
274 elif hash == self.file_hash:
275 return [{'path': FilePath('/boot/initrd')}]
279 def create_request(self, host, path):
280 req = server.Request(None, 'GET', path, (1,1), 0, http_headers.Headers())
284 req.remoteAddr = addr()
285 req.remoteAddr.host = host
286 req.remoteAddr.port = 23456
287 server.Request._parseURL(req)
290 def test_unauthorized(self):
291 req = self.create_request('128.0.0.1', '/foo/bar')
292 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
294 def test_Packages_diff(self):
295 req = self.create_request('127.0.0.1',
296 '/ftp.us.debian.org/debian/dists/unstable/main/binary-i386/Packages.diff/Index')
297 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
299 def test_Statistics(self):
300 req = self.create_request('127.0.0.1', '/')
301 res = req._getChild(None, self.client, req.postpath)
302 self.failIfEqual(res, None)
303 df = defer.maybeDeferred(res.renderHTTP, req)
304 df.addCallback(self.check_resp, 200)
307 def test_apt_download(self):
308 req = self.create_request('127.0.0.1',
309 '/ftp.us.debian.org/debian/dists/stable/Release')
310 res = req._getChild(None, self.client, req.postpath)
311 self.failIfEqual(res, None)
312 self.failUnless(isinstance(res, FileDownloader))
313 df = defer.maybeDeferred(res.renderHTTP, req)
314 df.addCallback(self.check_resp, 404)
317 def test_torrent_upload(self):
318 req = self.create_request('123.45.67.89',
319 '/~/' + quote_plus(self.torrent_hash))
320 res = req._getChild(None, self.client, req.postpath)
321 self.failIfEqual(res, None)
322 self.failUnless(isinstance(res, static.Data))
323 df = defer.maybeDeferred(res.renderHTTP, req)
324 df.addCallback(self.check_resp, 200)
327 def test_file_upload(self):
328 req = self.create_request('123.45.67.89',
329 '/~/' + quote_plus(self.file_hash))
330 res = req._getChild(None, self.client, req.postpath)
331 self.failIfEqual(res, None)
332 self.failUnless(isinstance(res, FileUploader))
333 df = defer.maybeDeferred(res.renderHTTP, req)
334 df.addCallback(self.check_resp, 200)
337 def test_missing_hash(self):
338 req = self.create_request('123.45.67.89',
339 '/~/' + quote_plus('foobar'))
340 self.failUnlessRaises(http.HTTPError, req._getChild, None, self.client, req.postpath)
342 def check_resp(self, resp, code):
343 self.failUnlessEqual(resp.code, code)
347 for p in self.pending_calls:
350 self.pending_calls = []
354 if __name__ == '__builtin__':
355 # Running from twistd -ny HTTPServer.py
357 # wget -S 'http://localhost:18080/~/whatever'
358 # wget -S 'http://localhost:18080/~/pieces'
361 from twisted.python.filepath import FilePath
364 def lookupHash(self, hash):
366 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
367 return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
369 t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None, 0)
370 factory = t.getHTTPFactory()
372 # Standard twisted application Boilerplate
373 from twisted.application import service, strports
374 application = service.Application("demoserver")
375 s = strports.service('tcp:18080', factory)
376 s.setServiceParent(application)