from binascii import b2a_hex, a2b_hex
import sys
+from twisted.internet import threads, defer
from twisted.trial import unittest
+class HashError(ValueError):
+ """An error has occurred while hashing a file."""
+
class HashObject:
"""Manages hashes and hashing for a file."""
{'name': 'md5',
'AptPkgRecord': 'MD5Hash',
'AptSrcRecord': True,
- 'AptIndexRecord': 'MD5Sum',
+ 'AptIndexRecord': 'MD5SUM',
'old_module': 'md5',
'hashlib_func': 'md5',
},
]
- def __init__(self):
+ def __init__(self, digest = None, size = None):
self.hashTypeNum = 0 # Use the first if nothing else matters
self.expHash = None
self.expHex = None
self.expSize = None
self.expNormHash = None
self.fileHasher = None
- self.fileHash = None
+ self.fileHash = digest
+ self.size = size
self.fileHex = None
self.fileNormHash = None
self.done = True
+ self.result = None
if sys.version_info < (2, 5):
# sha256 is not available in python before 2.5, remove it
for hashType in self.ORDER:
if bits is not None:
bytes = (bits - 1) // 8 + 1
else:
- assert(bytes is not None)
+ if bytes is None:
+ raise HashError, "you must specify one of bits or bytes"
if len(hashString) < bytes:
hashString = hashString + '\000'*(bytes - len(hashString))
elif len(hashString) > bytes:
return self.expNormHash
#### Methods for hashing data
- def new(self):
- """Generate a new hashing object suitable for hashing a file."""
- self.size = 0
- self.done = False
- if sys.version_info < (2, 5):
- mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
- self.fileHasher = mod.new()
- else:
- import hashlib
- func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
- self.fileHasher = func()
- return self.fileHasher
+ def new(self, force = False):
+ """Generate a new hashing object suitable for hashing a file.
+
+ @param force: set to True to force creating a new hasher even if
+ the hash has been verified already
+ """
+ if self.result is None or force == True:
+ self.result = None
+ self.size = 0
+ self.done = False
+ if sys.version_info < (2, 5):
+ mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
+ self.fileHasher = mod.new()
+ else:
+ import hashlib
+ func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
+ self.fileHasher = func()
def update(self, data):
"""Add more data to the file hasher."""
- assert self.done == False, "Already done, you can't add more data after calling digest() or verify()"
- assert self.fileHasher is not None, "file hasher not initialized"
- self.fileHasher.update(data)
- self.size += len(data)
+ if self.result is None:
+ if self.done:
+ raise HashError, "Already done, you can't add more data after calling digest() or verify()"
+ if self.fileHasher is None:
+ raise HashError, "file hasher not initialized"
+ self.fileHasher.update(data)
+ self.size += len(data)
def digest(self):
"""Get the hash of the added file data."""
if self.fileHash is None:
- assert self.fileHasher is not None, "you must hash some data first"
+ if self.fileHasher is None:
+ raise HashError, "you must hash some data first"
self.fileHash = self.fileHasher.digest()
self.done = True
return self.fileHash
def verify(self):
"""Verify that the added file data hash matches the expected hash."""
- if self.fileHash == None:
- return None
- return (self.fileHash == self.expHash and self.size == self.expSize)
+ if self.result is None and self.fileHash is not None and self.expHash is not None:
+ self.result = (self.fileHash == self.expHash and self.size == self.expSize)
+ return self.result
+
+ def hashInThread(self, file):
+ """Hashes a file in a separate thread, callback with the result."""
+ file.restat(False)
+ if not file.exists():
+ df = defer.Deferred()
+ df.errback(HashError("file not found"))
+ return df
+
+ df = threads.deferToThread(self._hashInThread, file)
+ return df
+ def _hashInThread(self, file):
+ """Hashes a file, returning itself as the result."""
+ f = file.open()
+ self.new(force = True)
+ data = f.read(4096)
+ while data:
+ self.update(data)
+ data = f.read(4096)
+ self.digest()
+ return self
+
#### Methods for setting the expected hash
def set(self, hashType, hashHex, size):
"""Initialize the hash object.
"""
self.hashTypeNum = self.ORDER.index(hashType) # error if not found
self.expHex = hashHex
- self.expSize = size
+ self.expSize = int(size)
self.expHash = a2b_hex(self.expHex)
def setFromIndexRecord(self, record):
def test_normalize(self):
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
+ h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567'), 0)
+ h.set(h.ORDER[0], b2a_hex('12345678901234567'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567\000\000\000')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), 0)
+ h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), '0')
self.failUnless(h.normexpected(bytes = 20) == '12345678901234567890')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('1234567890123456789'), 0)
+ h.set(h.ORDER[0], b2a_hex('1234567890123456789'), '0')
self.failUnless(h.normexpected(bytes = 20) == '1234567890123456789\000')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('123456789012345678901'), 0)
+ h.set(h.ORDER[0], b2a_hex('123456789012345678901'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
def test_failure(self):
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
- self.failUnlessRaises(AssertionError, h.normexpected)
- self.failUnlessRaises(AssertionError, h.digest)
- self.failUnlessRaises(AssertionError, h.hexdigest)
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
+ self.failUnlessRaises(HashError, h.normexpected)
+ self.failUnlessRaises(HashError, h.digest)
+ self.failUnlessRaises(HashError, h.hexdigest)
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
def test_sha1(self):
h = HashObject()
found = True
break
self.failUnless(found == True)
- h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', 19)
+ h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
def test_md5(self):
found = True
break
self.failUnless(found == True)
- h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', 19)
+ h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
def test_sha256(self):
found = True
break
self.failUnless(found == True)
- h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', 19)
+ h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
- self.failUnlessRaises(AssertionError, h.update, 'gfgf')
+ self.failUnlessRaises(HashError, h.update, 'gfgf')
self.failUnless(h.verify() == True)
if sys.version_info < (2, 5):
- test_sha256.skip = "SHA256 hashes are not supported on python until version 2.5"
+ test_sha256.skip = "SHA256 hashes are not supported by Python until version 2.5"