]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht/Hash.py
Fixed some bugs in the new hashing scheme and tests.
[quix0rs-apt-p2p.git] / apt_dht / Hash.py
1
2 from binascii import b2a_hex, a2b_hex
3 import sys
4
5 from twisted.trial import unittest
6
7 class HashObject:
8     """Manages hashes and hashing for a file."""
9     
10     """The priority ordering of hashes, and how to extract them."""
11     ORDER = [ {'name': 'sha1', 
12                    'AptPkgRecord': 'SHA1Hash', 
13                    'AptSrcRecord': False, 
14                    'AptIndexRecord': 'SHA1',
15                    'old_module': 'sha',
16                    'hashlib_func': 'sha1',
17                    },
18               {'name': 'sha256',
19                    'AptPkgRecord': 'SHA256Hash', 
20                    'AptSrcRecord': False, 
21                    'AptIndexRecord': 'SHA256',
22                    'hashlib_func': 'sha256',
23                    },
24               {'name': 'md5',
25                    'AptPkgRecord': 'MD5Hash', 
26                    'AptSrcRecord': True, 
27                    'AptIndexRecord': 'MD5Sum',
28                    'old_module': 'md5',
29                    'hashlib_func': 'md5',
30                    },
31             ]
32     
33     def __init__(self):
34         self.hashTypeNum = 0    # Use the first if nothing else matters
35         self.expHash = None
36         self.expHex = None
37         self.expSize = None
38         self.expNormHash = None
39         self.fileHasher = None
40         self.fileHash = None
41         self.fileHex = None
42         self.fileNormHash = None
43         self.done = True
44         self.result = None
45         if sys.version_info < (2, 5):
46             # sha256 is not available in python before 2.5, remove it
47             for hashType in self.ORDER:
48                 if hashType['name'] == 'sha256':
49                     del self.ORDER[self.ORDER.index(hashType)]
50                     break
51         
52     def _norm_hash(self, hashString, bits=None, bytes=None):
53         if bits is not None:
54             bytes = (bits - 1) // 8 + 1
55         else:
56             assert(bytes is not None)
57         if len(hashString) < bytes:
58             hashString = hashString + '\000'*(bytes - len(hashString))
59         elif len(hashString) > bytes:
60             hashString = hashString[:bytes]
61         return hashString
62
63     #### Methods for returning the expected hash
64     def expected(self):
65         """Get the expected hash."""
66         return self.expHash
67     
68     def hexexpected(self):
69         """Get the expected hash in hex format."""
70         if self.expHex is None and self.expHash is not None:
71             self.expHex = b2a_hex(self.expHash)
72         return self.expHex
73     
74     def normexpected(self, bits=None, bytes=None):
75         """Normalize the binary hash for the given length.
76         
77         You must specify one of bits or bytes.
78         """
79         if self.expNormHash is None and self.expHash is not None:
80             self.expNormHash = self._norm_hash(self.expHash, bits, bytes)
81         return self.expNormHash
82
83     #### Methods for hashing data
84     def new(self, force = False):
85         """Generate a new hashing object suitable for hashing a file.
86         
87         @param force: set to True to force creating a new hasher even if
88             the hash has been verified already
89         """
90         if self.result is None or force == True:
91             self.result = None
92             self.size = 0
93             self.done = False
94             if sys.version_info < (2, 5):
95                 mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
96                 self.fileHasher = mod.new()
97             else:
98                 import hashlib
99                 func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
100                 self.fileHasher = func()
101
102     def update(self, data):
103         """Add more data to the file hasher."""
104         if self.result is None:
105             assert self.done == False, "Already done, you can't add more data after calling digest() or verify()"
106             assert self.fileHasher is not None, "file hasher not initialized"
107             self.fileHasher.update(data)
108             self.size += len(data)
109         
110     def digest(self):
111         """Get the hash of the added file data."""
112         if self.fileHash is None:
113             assert self.fileHasher is not None, "you must hash some data first"
114             self.fileHash = self.fileHasher.digest()
115             self.done = True
116         return self.fileHash
117
118     def hexdigest(self):
119         """Get the hash of the added file data in hex format."""
120         if self.fileHex is None:
121             self.fileHex = b2a_hex(self.digest())
122         return self.fileHex
123         
124     def norm(self, bits=None, bytes=None):
125         """Normalize the binary hash for the given length.
126         
127         You must specify one of bits or bytes.
128         """
129         if self.fileNormHash is None:
130             self.fileNormHash = self._norm_hash(self.digest(), bits, bytes)
131         return self.fileNormHash
132
133     def verify(self):
134         """Verify that the added file data hash matches the expected hash."""
135         if self.result is None and self.fileHash is not None and self.expHash is not None:
136             self.result = (self.fileHash == self.expHash and self.size == self.expSize)
137         return self.result
138     
139     #### Methods for setting the expected hash
140     def set(self, hashType, hashHex, size):
141         """Initialize the hash object.
142         
143         @param hashType: must be one of the dictionaries from L{ORDER}
144         """
145         self.hashTypeNum = self.ORDER.index(hashType)    # error if not found
146         self.expHex = hashHex
147         self.expSize = int(size)
148         self.expHash = a2b_hex(self.expHex)
149         
150     def setFromIndexRecord(self, record):
151         """Set the hash from the cache of index file records.
152         
153         @type record: C{dictionary}
154         @param record: keys are hash types, values are tuples of (hash, size)
155         """
156         for hashType in self.ORDER:
157             result = record.get(hashType['AptIndexRecord'], None)
158             if result:
159                 self.set(hashType, result[0], result[1])
160                 return True
161         return False
162
163     def setFromPkgRecord(self, record, size):
164         """Set the hash from Apt's binary packages cache.
165         
166         @param record: whatever is returned by apt_pkg.GetPkgRecords()
167         """
168         for hashType in self.ORDER:
169             hashHex = getattr(record, hashType['AptPkgRecord'], None)
170             if hashHex:
171                 self.set(hashType, hashHex, size)
172                 return True
173         return False
174     
175     def setFromSrcRecord(self, record):
176         """Set the hash from Apt's source package records cache.
177         
178         Currently very simple since Apt only tracks MD5 hashes of source files.
179         
180         @type record: (C{string}, C{int}, C{string})
181         @param record: the hash, size and path of the source file
182         """
183         for hashType in self.ORDER:
184             if hashType['AptSrcRecord']:
185                 self.set(hashType, record[0], record[1])
186                 return True
187         return False
188
189 class TestHashObject(unittest.TestCase):
190     """Unit tests for the hash objects."""
191     
192     timeout = 5
193     if sys.version_info < (2, 4):
194         skip = "skippingme"
195     
196     def test_normalize(self):
197         h = HashObject()
198         h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
199         self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
200         h = HashObject()
201         h.set(h.ORDER[0], b2a_hex('12345678901234567'), '0')
202         self.failUnless(h.normexpected(bits = 160) == '12345678901234567\000\000\000')
203         h = HashObject()
204         h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), '0')
205         self.failUnless(h.normexpected(bytes = 20) == '12345678901234567890')
206         h = HashObject()
207         h.set(h.ORDER[0], b2a_hex('1234567890123456789'), '0')
208         self.failUnless(h.normexpected(bytes = 20) == '1234567890123456789\000')
209         h = HashObject()
210         h.set(h.ORDER[0], b2a_hex('123456789012345678901'), '0')
211         self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
212
213     def test_failure(self):
214         h = HashObject()
215         h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
216         self.failUnlessRaises(AssertionError, h.normexpected)
217         self.failUnlessRaises(AssertionError, h.digest)
218         self.failUnlessRaises(AssertionError, h.hexdigest)
219         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
220     
221     def test_sha1(self):
222         h = HashObject()
223         found = False
224         for hashType in h.ORDER:
225             if hashType['name'] == 'sha1':
226                 found = True
227                 break
228         self.failUnless(found == True)
229         h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', '19')
230         h.new()
231         h.update('apt-dht is the best')
232         self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
233         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
234         self.failUnless(h.verify() == True)
235         
236     def test_md5(self):
237         h = HashObject()
238         found = False
239         for hashType in h.ORDER:
240             if hashType['name'] == 'md5':
241                 found = True
242                 break
243         self.failUnless(found == True)
244         h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', '19')
245         h.new()
246         h.update('apt-dht is the best')
247         self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
248         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
249         self.failUnless(h.verify() == True)
250         
251     def test_sha256(self):
252         h = HashObject()
253         found = False
254         for hashType in h.ORDER:
255             if hashType['name'] == 'sha256':
256                 found = True
257                 break
258         self.failUnless(found == True)
259         h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', '19')
260         h.new()
261         h.update('apt-dht is the best')
262         self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
263         self.failUnlessRaises(AssertionError, h.update, 'gfgf')
264         self.failUnless(h.verify() == True)
265
266     if sys.version_info < (2, 5):
267         test_sha256.skip = "SHA256 hashes are not supported by Python until version 2.5"