]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/HTTPServer.py
Move all streams to new Streams module and replace ProxyFileStream with GrowingFileSt...
[quix0rs-apt-p2p.git] / apt_p2p / HTTPServer.py
index d6c12bf3ce402a949700b53887a4fd94c5a06d2a..652198f990132a8041b6f8ca37be30a55ad34a1d 100644 (file)
@@ -3,6 +3,7 @@
 
 from urllib import quote_plus, unquote_plus
 from binascii import b2a_hex
 
 from urllib import quote_plus, unquote_plus
 from binascii import b2a_hex
+import operator
 
 from twisted.python import log
 from twisted.internet import defer
 
 from twisted.python import log
 from twisted.internet import defer
@@ -12,6 +13,7 @@ from twisted.trial import unittest
 from twisted.python.filepath import FilePath
 
 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
 from twisted.python.filepath import FilePath
 
 from policies import ThrottlingFactory, ThrottlingProtocol, ProtocolWrapper
+from Streams import UploadStream, FileUploadStream, PiecesUploadStream
 from apt_p2p_conf import config
 from apt_p2p_Khashmir.bencode import bencode
 
 from apt_p2p_conf import config
 from apt_p2p_Khashmir.bencode import bencode
 
@@ -29,12 +31,31 @@ class FileDownloader(static.File):
     def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
         self.manager = manager
         super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
     def __init__(self, path, manager, defaultType="text/plain", ignoredExts=(), processors=None, indexNames=None):
         self.manager = manager
         super(FileDownloader, self).__init__(path, defaultType, ignoredExts, processors, indexNames)
-        
+    
+    def locateChild(self, req, segments):
+        child, segments = super(FileDownloader, self).locateChild(req, segments)
+        # Make sure we always call renderHTTP()
+        if isinstance(child, FileDownloader):
+            return child, segments
+        else:
+            return self, server.StopTraversal
+            
     def renderHTTP(self, req):
         log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
     def renderHTTP(self, req):
         log.msg('Got request for %s from %s' % (req.uri, req.remoteAddr))
+        
+        # Make sure the file is in the DB and unchanged
+        if self.manager and not self.manager.db.isUnchanged(self.fp):
+            if self.fp.exists() and self.fp.isfile():
+                self.fp.remove()
+            return self._renderHTTP_done(http.Response(404,
+                        {'content-type': http_headers.MimeType('text', 'html')},
+                        '<html><body><p>File found but it has changed.</body></html>'),
+                        req)
+            
         resp = super(FileDownloader, self).renderHTTP(req)
         if isinstance(resp, defer.Deferred):
         resp = super(FileDownloader, self).renderHTTP(req)
         if isinstance(resp, defer.Deferred):
-            resp.addCallback(self._renderHTTP_done, req)
+            resp.addCallbacks(self._renderHTTP_done, self._renderHTTP_error,
+                              callbackArgs = (req, ), errbackArgs = (req, ))
         else:
             resp = self._renderHTTP_done(resp, req)
         return resp
         else:
             resp = self._renderHTTP_done(resp, req)
         return resp
@@ -45,56 +66,42 @@ class FileDownloader(static.File):
         if self.manager:
             path = 'http:/' + req.uri
             if resp.code >= 200 and resp.code < 400:
         if self.manager:
             path = 'http:/' + req.uri
             if resp.code >= 200 and resp.code < 400:
-                return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
+                return self.manager.get_resp(req, path, resp)
             
             log.msg('Not found, trying other methods for %s' % req.uri)
             return self.manager.get_resp(req, path)
         
         return resp
 
             
             log.msg('Not found, trying other methods for %s' % req.uri)
             return self.manager.get_resp(req, path)
         
         return resp
 
+    def _renderHTTP_error(self, err, req):
+        log.msg('Failed to render %s: %r' % (req.uri, err))
+        log.err(err)
+        
+        if self.manager:
+            path = 'http:/' + req.uri
+            return self.manager.get_resp(req, path)
+        
+        return err
+
     def createSimilarFile(self, path):
         return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
                               self.processors, self.indexNames[:])
         
     def createSimilarFile(self, path):
         return self.__class__(path, self.manager, self.defaultType, self.ignoredExts,
                               self.processors, self.indexNames[:])
         
-class FileUploaderStream(stream.FileStream):
-    """Modified to make it suitable for streaming to peers.
-    
-    Streams the file in small chunks to make it easier to throttle the
-    streaming to peers.
+class PiecesUploader(static.Data):
+    """Modified to identify it for peer requests.
     
     
-    @ivar CHUNK_SIZE: the size of chunks of data to send at a time
+    Uses the modified L{Streams.PieceUploadStream} to stream the pieces for throttling.
     """
 
     """
 
-    CHUNK_SIZE = 4*1024
-    
-    def read(self, sendfile=False):
-        if self.f is None:
-            return None
-
-        length = self.length
-        if length == 0:
-            self.f = None
-            return None
+    def render(self, req):
+        return http.Response(responsecode.OK,
+                             http_headers.Headers({'content-type': self.contentType()}),
+                             stream=PiecesUploadStream(self.data))
         
         
-        # Remove the SendFileBuffer and mmap use, just use string reads and writes
-
-        readSize = min(length, self.CHUNK_SIZE)
-
-        self.f.seek(self.start)
-        b = self.f.read(readSize)
-        bytesRead = len(b)
-        if not bytesRead:
-            raise RuntimeError("Ran out of data reading file %r, expected %d more bytes" % (self.f, length))
-        else:
-            self.length -= bytesRead
-            self.start += bytesRead
-            return b
-
-
 class FileUploader(static.File):
     """Modified to make it suitable for peer requests.
     
 class FileUploader(static.File):
     """Modified to make it suitable for peer requests.
     
-    Uses the modified L{FileUploaderStream} to stream the file for throttling,
+    Uses the modified L{Streams.FileUploadStream} to stream the file for throttling,
     and doesn't do any listing of directory contents.
     """
 
     and doesn't do any listing of directory contents.
     """
 
@@ -119,7 +126,7 @@ class FileUploader(static.File):
 
         response = http.Response()
         # Use the modified FileStream
 
         response = http.Response()
         # Use the modified FileStream
-        response.stream = FileUploaderStream(f, 0, self.fp.getsize())
+        response.stream = FileUploadStream(f, 0, self.fp.getsize())
 
         for (header, value) in (
             ("content-type", self.contentType()),
 
         for (header, value) in (
             ("content-type", self.contentType()),
@@ -134,9 +141,10 @@ class UploadThrottlingProtocol(ThrottlingProtocol):
     """Protocol for throttling uploads.
     
     Determines whether or not to throttle the upload based on the type of stream.
     """Protocol for throttling uploads.
     
     Determines whether or not to throttle the upload based on the type of stream.
-    Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
-    apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
+    Uploads use instances of L{Streams.UploadStream}.
     """
     """
+    
+    stats = None
 
     def __init__(self, factory, wrappedProtocol):
         ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
 
     def __init__(self, factory, wrappedProtocol):
         ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
@@ -145,13 +153,23 @@ class UploadThrottlingProtocol(ThrottlingProtocol):
     def write(self, data):
         if self.throttle:
             ThrottlingProtocol.write(self, data)
     def write(self, data):
         if self.throttle:
             ThrottlingProtocol.write(self, data)
+            if self.stats:
+                self.stats.sentBytes(len(data))
         else:
             ProtocolWrapper.write(self, data)
 
         else:
             ProtocolWrapper.write(self, data)
 
+    def writeSequence(self, seq):
+        if self.throttle:
+            ThrottlingProtocol.writeSequence(self, seq)
+            if self.stats:
+                self.stats.sentBytes(reduce(operator.add, map(len, seq)))
+        else:
+            ProtocolWrapper.writeSequence(self, seq)
+
     def registerProducer(self, producer, streaming):
         ThrottlingProtocol.registerProducer(self, producer, streaming)
         streamType = getattr(producer, 'stream', None)
     def registerProducer(self, producer, streaming):
         ThrottlingProtocol.registerProducer(self, producer, streaming)
         streamType = getattr(producer, 'stream', None)
-        if isinstance(streamType, FileUploaderStream) or isinstance(streamType, stream.MemoryStream):
+        if isinstance(streamType, UploadStream):
             self.throttle = True
 
 
             self.throttle = True
 
 
@@ -196,6 +214,8 @@ class TopLevel(resource.Resource):
                                                   'betweenRequestsTimeOut': 60})
             self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
             self.factory.protocol = UploadThrottlingProtocol
                                                   'betweenRequestsTimeOut': 60})
             self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
             self.factory.protocol = UploadThrottlingProtocol
+            if self.manager:
+                self.factory.protocol.stats = self.manager.stats
         return self.factory
 
     def render(self, ctx):
         return self.factory
 
     def render(self, ctx):
@@ -234,24 +254,32 @@ class TopLevel(resource.Resource):
                 else:
                     # It's not for a file, but for a piece string, so return that
                     log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
                 else:
                     # It's not for a file, but for a piece string, so return that
                     log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
-                    return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
+                    return PiecesUploader(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
             else:
                 log.msg('Hash could not be found in database: %r' % hash)
             else:
                 log.msg('Hash could not be found in database: %r' % hash)
+                return None, ()
 
 
-        # Only local requests (apt) get past this point
-        if request.remoteAddr.host != "127.0.0.1":
-            log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
-            return None, ()
-        
-        # Block access to index .diff files (for now)
-        if 'Packages.diff' in segments or 'Sources.diff' in segments:
-            return None, ()
-         
         if len(name) > 1:
             # It's a request from apt
         if len(name) > 1:
             # It's a request from apt
+
+            # Only local requests (apt) get past this point
+            if request.remoteAddr.host != "127.0.0.1":
+                log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
+                return None, ()
+
+            # Block access to index .diff files (for now)
+            if 'Packages.diff' in segments or 'Sources.diff' in segments or name == 'favicon.ico':
+                return None, ()
+             
             return FileDownloader(self.directory.path, self.manager), segments[0:]
         else:
             # Will render the statistics page
             return FileDownloader(self.directory.path, self.manager), segments[0:]
         else:
             # Will render the statistics page
+
+            # Only local requests for stats are allowed
+            if not config.getboolean('DEFAULT', 'REMOTE_STATS') and request.remoteAddr.host != "127.0.0.1":
+                log.msg('Blocked illegal access to %s from %s' % (request.uri, request.remoteAddr))
+                return None, ()
+
             return self, ()
         
         log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
             return self, ()
         
         log.msg('Got a malformed request for "%s" from %s' % (request.uri, request.remoteAddr))
@@ -367,7 +395,7 @@ if __name__ == '__builtin__':
                 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
             return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
     
                 return [{'pieces': 'abcdefghij0123456789\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'}]
             return [{'path': FilePath(os.path.expanduser('~/school/optout'))}]
     
-    t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None, 0)
+    t = TopLevel(FilePath(os.path.expanduser('~')), DB(), None)
     factory = t.getHTTPFactory()
     
     # Standard twisted application Boilerplate
     factory = t.getHTTPFactory()
     
     # Standard twisted application Boilerplate