+"""Hash and store hash information for a file.
+
+@var PIECE_SIZE: the piece size to use for hashing pieces of files
+
+"""
+
from binascii import b2a_hex, a2b_hex
import sys
"""An error has occurred while hashing a file."""
class HashObject:
- """Manages hashes and hashing for a file."""
+ """Manages hashes and hashing for a file.
- """The priority ordering of hashes, and how to extract them."""
+ @ivar ORDER: the priority ordering of hashes, and how to extract them
+
+ """
+
ORDER = [ {'name': 'sha1',
'length': 20,
'AptPkgRecord': 'SHA1Hash',
]
def __init__(self, digest = None, size = None, pieces = ''):
+ """Initialize the hash object."""
self.hashTypeNum = 0 # Use the first if nothing else matters
if sys.version_info < (2, 5):
# sha256 is not available in python before 2.5, remove it
self.done = True
self.result = None
- #### Methods for returning the expected hash
- def expected(self):
- """Get the expected hash."""
- return self.expHash
-
- def hexexpected(self):
- """Get the expected hash in hex format."""
- if self.expHex is None and self.expHash is not None:
- self.expHex = b2a_hex(self.expHash)
- return self.expHex
-
- #### Methods for hashing data
+ #{ Hashing data
def new(self, force = False):
"""Generate a new hashing object suitable for hashing a file.
- @param force: set to True to force creating a new hasher even if
+ @param force: set to True to force creating a new object even if
the hash has been verified already
"""
- if self.result is None or force == True:
+ if self.result is None or force:
self.result = None
self.done = False
self.fileHasher = self._new()
self.fileHasher.update(data)
self.size += len(data)
+ def hashInThread(self, file):
+ """Hashes a file in a separate thread, returning a deferred that will callback with the result."""
+ file.restat(False)
+ if not file.exists():
+ df = defer.Deferred()
+ df.errback(HashError("file not found"))
+ return df
+
+ df = threads.deferToThread(self._hashInThread, file)
+ return df
+
+ def _hashInThread(self, file):
+ """Hashes a file, returning itself as the result."""
+ f = file.open()
+ self.new(force = True)
+ data = f.read(4096)
+ while data:
+ self.update(data)
+ data = f.read(4096)
+ self.digest()
+ return self
+
+ #{ Checking hashes of data
def pieceDigests(self):
"""Get the piece hashes of the added file data."""
self.digest()
self.result = (self.fileHash == self.expHash and self.size == self.expSize)
return self.result
- def hashInThread(self, file):
- """Hashes a file in a separate thread, callback with the result."""
- file.restat(False)
- if not file.exists():
- df = defer.Deferred()
- df.errback(HashError("file not found"))
- return df
-
- df = threads.deferToThread(self._hashInThread, file)
- return df
+ #{ Expected hash
+ def expected(self):
+ """Get the expected hash."""
+ return self.expHash
- def _hashInThread(self, file):
- """Hashes a file, returning itself as the result."""
- f = file.open()
- self.new(force = True)
- data = f.read(4096)
- while data:
- self.update(data)
- data = f.read(4096)
- self.digest()
- return self
-
- #### Methods for setting the expected hash
+ def hexexpected(self):
+ """Get the expected hash in hex format."""
+ if self.expHex is None and self.expHash is not None:
+ self.expHex = b2a_hex(self.expHash)
+ return self.expHex
+
+ #{ Setting the expected hash
def set(self, hashType, hashHex, size):
"""Initialize the hash object.
skip = "skippingme"
def test_failure(self):
+ """Tests that the hash object fails when treated badly."""
h = HashObject()
h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
self.failUnlessRaises(HashError, h.digest)
self.failUnlessRaises(HashError, h.update, 'gfgf')
def test_pieces(self):
+ """Tests the hashing of large files into pieces."""
h = HashObject()
h.new()
h.update('1234567890'*120*1024)
self.failUnless(pieces[2] == 'M[\xbf\xee\xaa+\x19\xbaV\xf699\r\x17o\xcb\x8e\xcfP\x19')
def test_sha1(self):
+ """Test hashing using the SHA1 hash."""
h = HashObject()
found = False
for hashType in h.ORDER:
self.failUnless(h.verify() == True)
def test_md5(self):
+ """Test hashing using the MD5 hash."""
h = HashObject()
found = False
for hashType in h.ORDER:
self.failUnless(h.verify() == True)
def test_sha256(self):
+ """Test hashing using the SHA256 hash."""
h = HashObject()
found = False
for hashType in h.ORDER: