Added a new Hash Object for tracking lots of hash details.
[quix0rs-apt-p2p.git] / apt_dht / Hash.py
1
2 from binascii import b2a_hex, a2b_hex
3 import sys
4
5 from twisted.trial import unittest
6
7 class HashObject:
8     """Manages hashes and hashing for a file."""
9     
10     """The priority ordering of hashes, and how to extract them."""
11     ORDER = [ {'name': 'sha1', 
12                    'AptPkgRecord': 'SHA1Hash', 
13                    'AptSrcRecord': False, 
14                    'AptIndexRecord': 'SHA1',
15                    'old_module': 'sha',
16                    'hashlib_func': 'sha1',
17                    },
18               {'name': 'sha256',
19                    'AptPkgRecord': 'SHA256Hash', 
20                    'AptSrcRecord': False, 
21                    'AptIndexRecord': 'SHA256',
22                    'hashlib_func': 'sha256',
23                    },
24               {'name': 'md5',
25                    'AptPkgRecord': 'MD5Hash', 
26                    'AptSrcRecord': True, 
27                    'AptIndexRecord': 'MD5Sum',
28                    'old_module': 'md5',
29                    'hashlib_func': 'md5',
30                    },
31             ]
32     
33     def __init__(self):
34         self.hashTypeNum = 0    # Use the first if nothing else matters
35         self.expHash = None
36         self.expHex = None
37         self.expSize = None
38         self.expNormHash = None
39         self.fileHasher = None
40         self.fileHash = None
41         self.fileHex = None
42         self.fileNormHash = None
43         self.done = True
44         if sys.version_info < (2, 5):
45             # sha256 is not available in python before 2.5, remove it
46             for hashType in self.ORDER:
47                 if hashType['name'] == 'sha256':
48                     del self.ORDER[self.ORDER.index(hashType)]
49                     break
50         
51     def _norm_hash(self, hashString, bits=None, bytes=None):
52         if bits is not None:
53             bytes = (bits - 1) // 8 + 1
54         else:
55             assert(bytes is not None)
56         if len(hashString) < bytes:
57             hashString = hashString + '\000'*(bytes - len(hashString))
58         elif len(hashString) > bytes:
59             hashString = hashString[:bytes]
60         return hashString
61
62     #### Methods for returning the expected hash
63     def expected(self):
64         """Get the expected hash."""
65         return self.expHash
66     
67     def hexexpected(self):
68         """Get the expected hash in hex format."""
69         if self.expHex is None and self.expHash is not None:
70             self.expHex = b2a_hex(self.expHash)
71         return self.expHex
72     
73     def normexpected(self, bits=None, bytes=None):
74         """Normalize the binary hash for the given length.
75         
76         You must specify one of bits or bytes.
77         """
78         if self.expNormHash is None and self.expHash is not None:
79             self.expNormHash = self._norm_hash(self.expHash, bits, bytes)
80         return self.expNormHash
81
82     #### Methods for hashing data
83     def new(self):
84         """Generate a new hashing object suitable for hashing a file."""
85         self.size = 0
86         self.done = False
87         if sys.version_info < (2, 5):
88             mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
89             self.fileHasher = mod.new()
90         else:
91             import hashlib
92             func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
93             self.fileHasher = func()
94         return self.fileHasher
95
96     def update(self, data):
97         """Add more data to the file hasher."""
98         assert self.done == False, "Already done, you can't add more data after calling digest() or verify()"
99         assert self.fileHasher is not None, "file hasher not initialized"
100         self.fileHasher.update(data)
101         self.size += len(data)
102         
103     def digest(self):
104         """Get the hash of the added file data."""
105         if self.fileHash is None:
106             assert self.fileHasher is not None, "you must hash some data first"
107             self.fileHash = self.fileHasher.digest()
108             self.done = True
109         return self.fileHash
110
111     def hexdigest(self):
112         """Get the hash of the added file data in hex format."""
113         if self.fileHex is None:
114             self.fileHex = b2a_hex(self.digest())
115         return self.fileHex
116         
117     def norm(self, bits=None, bytes=None):
118         """Normalize the binary hash for the given length.
119         
120         You must specify one of bits or bytes.
121         """
122         if self.fileNormHash is None:
123             self.fileNormHash = self._norm_hash(self.digest(), bits, bytes)
124         return self.fileNormHash
125
126     def verify(self):
127         """Verify that the added file data hash matches the expected hash."""
128         if self.fileHash == None:
129             return None
130         return (self.fileHash == self.expHash and self.size == self.expSize)
131     
132     #### Methods for setting the expected hash
133     def set(self, hashType, hashHex, size):
134         """Initialize the hash object.
135         
136         @param hashType: must be one of the dictionaries from L{ORDER}
137         """
138         self.hashTypeNum = self.ORDER.index(hashType)    # error if not found
139         self.expHex = hashHex
140         self.expSize = size
141         self.expHash = a2b_hex(self.expHex)
142         
143     def setFromIndexRecord(self, record):
144         """Set the hash from the cache of index file records.
145         
146         @type record: C{dictionary}
147         @param record: keys are hash types, values are tuples of (hash, size)
148         """
149         for hashType in self.ORDER:
150             result = record.get(hashType['AptIndexRecord'], None)
151             if result:
152                 self.set(hashType, result[0], result[1])
153                 return True
154         return False
155
156     def setFromPkgRecord(self, record, size):
157         """Set the hash from Apt's binary packages cache.
158         
159         @param record: whatever is returned by apt_pkg.GetPkgRecords()
160         """
161         for hashType in self.ORDER:
162             hashHex = getattr(record, hashType['AptPkgRecord'], None)
163             if hashHex:
164                 self.set(hashType, hashHex, size)
165                 return True
166         return False
167     
168     def setFromSrcRecord(self, record):
169         """Set the hash from Apt's source package records cache.
170         
171         Currently very simple since Apt only tracks MD5 hashes of source files.
172         
173         @type record: (C{string}, C{int}, C{string})
174         @param record: the hash, size and path of the source file
175         """
176         for hashType in self.ORDER:
177             if hashType['AptSrcRecord']:
178                 self.set(hashType, record[0], record[1])
179                 return True
180         return False
181
182 class TestHashObject(unittest.TestCase):
183     """Unit tests for the hash objects."""
184     
185     timeout = 5
186     if sys.version_info < (2, 4):
187         skip = "skippingme"
188     
189     def test_normalize(self):
190         h = HashObject()
191         h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
192         self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
193         h = HashObject()
194         h.set(h.ORDER[0], b2a_hex('12345678901234567'), 0)
195         self.failUnless(h.normexpected(bits = 160) == '12345678901234567\000\000\000')
196         h = HashObject()
197         h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), 0)
198         self.failUnless(h.normexpected(bytes = 20) == '12345678901234567890')
199         h = HashObject()
200         h.set(h.ORDER[0], b2a_hex('1234567890123456789'), 0)
201         self.failUnless(h.normexpected(bytes = 20) == '1234567890123456789\000')
202         h = HashObject()
203         h.set(h.ORDER[0], b2a_hex('123456789012345678901'), 0)
204         self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
205
206     def test_failure(self):
207         h = HashObject()
208         h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
209         self.failUnlessRaises(AssertionError, h.normexpected)
210         self.failUnlessRaises(AssertionError, h.digest)
211         self.failUnlessRaises(AssertionError, h.hexdigest)
212         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
213     
214     def test_sha1(self):
215         h = HashObject()
216         found = False
217         for hashType in h.ORDER:
218             if hashType['name'] == 'sha1':
219                 found = True
220                 break
221         self.failUnless(found == True)
222         h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', 19)
223         h.new()
224         h.update('apt-dht is the best')
225         self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
226         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
227         self.failUnless(h.verify() == True)
228         
229     def test_md5(self):
230         h = HashObject()
231         found = False
232         for hashType in h.ORDER:
233             if hashType['name'] == 'md5':
234                 found = True
235                 break
236         self.failUnless(found == True)
237         h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', 19)
238         h.new()
239         h.update('apt-dht is the best')
240         self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
241         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
242         self.failUnless(h.verify() == True)
243         
244     def test_sha256(self):
245         h = HashObject()
246         found = False
247         for hashType in h.ORDER:
248             if hashType['name'] == 'sha256':
249                 found = True
250                 break
251         self.failUnless(found == True)
252         h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', 19)
253         h.new()
254         h.update('apt-dht is the best')
255         self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
256         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
257         self.failUnless(h.verify() == True)
258
259     if sys.version_info < (2, 5):
260         test_sha256.skip = "SHA256 hashes are not supported on python until version 2.5"