2 from binascii import b2a_hex, a2b_hex
5 from twisted.internet import threads, defer
6 from twisted.trial import unittest
8 class HashError(ValueError):
9 """An error has occurred while hashing a file."""
12 """Manages hashes and hashing for a file."""
14 """The priority ordering of hashes, and how to extract them."""
15 ORDER = [ {'name': 'sha1',
16 'AptPkgRecord': 'SHA1Hash',
17 'AptSrcRecord': False,
18 'AptIndexRecord': 'SHA1',
20 'hashlib_func': 'sha1',
23 'AptPkgRecord': 'SHA256Hash',
24 'AptSrcRecord': False,
25 'AptIndexRecord': 'SHA256',
26 'hashlib_func': 'sha256',
29 'AptPkgRecord': 'MD5Hash',
31 'AptIndexRecord': 'MD5SUM',
33 'hashlib_func': 'md5',
37 def __init__(self, digest = None, size = None):
38 self.hashTypeNum = 0 # Use the first if nothing else matters
42 self.expNormHash = None
43 self.fileHasher = None
44 self.fileHash = digest
47 self.fileNormHash = None
50 if sys.version_info < (2, 5):
51 # sha256 is not available in python before 2.5, remove it
52 for hashType in self.ORDER:
53 if hashType['name'] == 'sha256':
54 del self.ORDER[self.ORDER.index(hashType)]
57 def _norm_hash(self, hashString, bits=None, bytes=None):
59 bytes = (bits - 1) // 8 + 1
62 raise HashError, "you must specify one of bits or bytes"
63 if len(hashString) < bytes:
64 hashString = hashString + '\000'*(bytes - len(hashString))
65 elif len(hashString) > bytes:
66 hashString = hashString[:bytes]
69 #### Methods for returning the expected hash
71 """Get the expected hash."""
74 def hexexpected(self):
75 """Get the expected hash in hex format."""
76 if self.expHex is None and self.expHash is not None:
77 self.expHex = b2a_hex(self.expHash)
80 def normexpected(self, bits=None, bytes=None):
81 """Normalize the binary hash for the given length.
83 You must specify one of bits or bytes.
85 if self.expNormHash is None and self.expHash is not None:
86 self.expNormHash = self._norm_hash(self.expHash, bits, bytes)
87 return self.expNormHash
89 #### Methods for hashing data
90 def new(self, force = False):
91 """Generate a new hashing object suitable for hashing a file.
93 @param force: set to True to force creating a new hasher even if
94 the hash has been verified already
96 if self.result is None or force == True:
100 if sys.version_info < (2, 5):
101 mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
102 self.fileHasher = mod.new()
105 func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
106 self.fileHasher = func()
108 def update(self, data):
109 """Add more data to the file hasher."""
110 if self.result is None:
112 raise HashError, "Already done, you can't add more data after calling digest() or verify()"
113 if self.fileHasher is None:
114 raise HashError, "file hasher not initialized"
115 self.fileHasher.update(data)
116 self.size += len(data)
119 """Get the hash of the added file data."""
120 if self.fileHash is None:
121 if self.fileHasher is None:
122 raise HashError, "you must hash some data first"
123 self.fileHash = self.fileHasher.digest()
128 """Get the hash of the added file data in hex format."""
129 if self.fileHex is None:
130 self.fileHex = b2a_hex(self.digest())
133 def norm(self, bits=None, bytes=None):
134 """Normalize the binary hash for the given length.
136 You must specify one of bits or bytes.
138 if self.fileNormHash is None:
139 self.fileNormHash = self._norm_hash(self.digest(), bits, bytes)
140 return self.fileNormHash
143 """Verify that the added file data hash matches the expected hash."""
144 if self.result is None and self.fileHash is not None and self.expHash is not None:
145 self.result = (self.fileHash == self.expHash and self.size == self.expSize)
148 def hashInThread(self, file):
149 """Hashes a file in a separate thread, callback with the result."""
151 if not file.exists():
152 df = defer.Deferred()
153 df.errback(HashError("file not found"))
156 df = threads.deferToThread(self._hashInThread, file)
159 def _hashInThread(self, file):
160 """Hashes a file, returning itself as the result."""
162 self.new(force = True)
170 #### Methods for setting the expected hash
171 def set(self, hashType, hashHex, size):
172 """Initialize the hash object.
174 @param hashType: must be one of the dictionaries from L{ORDER}
176 self.hashTypeNum = self.ORDER.index(hashType) # error if not found
177 self.expHex = hashHex
178 self.expSize = int(size)
179 self.expHash = a2b_hex(self.expHex)
181 def setFromIndexRecord(self, record):
182 """Set the hash from the cache of index file records.
184 @type record: C{dictionary}
185 @param record: keys are hash types, values are tuples of (hash, size)
187 for hashType in self.ORDER:
188 result = record.get(hashType['AptIndexRecord'], None)
190 self.set(hashType, result[0], result[1])
194 def setFromPkgRecord(self, record, size):
195 """Set the hash from Apt's binary packages cache.
197 @param record: whatever is returned by apt_pkg.GetPkgRecords()
199 for hashType in self.ORDER:
200 hashHex = getattr(record, hashType['AptPkgRecord'], None)
202 self.set(hashType, hashHex, size)
206 def setFromSrcRecord(self, record):
207 """Set the hash from Apt's source package records cache.
209 Currently very simple since Apt only tracks MD5 hashes of source files.
211 @type record: (C{string}, C{int}, C{string})
212 @param record: the hash, size and path of the source file
214 for hashType in self.ORDER:
215 if hashType['AptSrcRecord']:
216 self.set(hashType, record[0], record[1])
220 class TestHashObject(unittest.TestCase):
221 """Unit tests for the hash objects."""
224 if sys.version_info < (2, 4):
227 def test_normalize(self):
229 h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
230 self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
232 h.set(h.ORDER[0], b2a_hex('12345678901234567'), '0')
233 self.failUnless(h.normexpected(bits = 160) == '12345678901234567\000\000\000')
235 h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), '0')
236 self.failUnless(h.normexpected(bytes = 20) == '12345678901234567890')
238 h.set(h.ORDER[0], b2a_hex('1234567890123456789'), '0')
239 self.failUnless(h.normexpected(bytes = 20) == '1234567890123456789\000')
241 h.set(h.ORDER[0], b2a_hex('123456789012345678901'), '0')
242 self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
244 def test_failure(self):
246 h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
247 self.failUnlessRaises(HashError, h.normexpected)
248 self.failUnlessRaises(HashError, h.digest)
249 self.failUnlessRaises(HashError, h.hexdigest)
250 self.failUnlessRaises(HashError, h.update, 'gfgf')
255 for hashType in h.ORDER:
256 if hashType['name'] == 'sha1':
259 self.failUnless(found == True)
260 h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', '19')
262 h.update('apt-dht is the best')
263 self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
264 self.failUnlessRaises(HashError, h.update, 'gfgf')
265 self.failUnless(h.verify() == True)
270 for hashType in h.ORDER:
271 if hashType['name'] == 'md5':
274 self.failUnless(found == True)
275 h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', '19')
277 h.update('apt-dht is the best')
278 self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
279 self.failUnlessRaises(HashError, h.update, 'gfgf')
280 self.failUnless(h.verify() == True)
282 def test_sha256(self):
285 for hashType in h.ORDER:
286 if hashType['name'] == 'sha256':
289 self.failUnless(found == True)
290 h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', '19')
292 h.update('apt-dht is the best')
293 self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
294 self.failUnlessRaises(HashError, h.update, 'gfgf')
295 self.failUnless(h.verify() == True)
297 if sys.version_info < (2, 5):
298 test_sha256.skip = "SHA256 hashes are not supported by Python until version 2.5"