self.fileHasher = None
self.pieceHasher = None
self.fileHash = digest
- self.pieceHash = [pieces[x:x+self.ORDER[self.hashTypeNum]['length']]
- for x in xrange(0, len(pieces), self.ORDER[self.hashTypeNum]['length'])]
+ self.pieceHash = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
self.size = size
self.fileHex = None
self.fileNormHash = None
self.result = None
self.done = False
self.fileHasher = self._new()
- self.pieceHasher = None
+ if self.ORDER[self.hashTypeNum]['name'] == 'sha1':
+ self.pieceHasher = None
+ else:
+ self.pieceHasher = self._newSHA1()
+ self.pieceSize = 0
self.fileHash = None
self.pieceHash = []
self.size = 0
func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
return func()
+ def _newSHA1(self):
+ """Create a new SHA1 hashing object."""
+ if sys.version_info < (2, 5):
+ import sha
+ return sha.new()
+ else:
+ import hashlib
+ return hashlib.sha1()
+
def update(self, data):
"""Add more data to the file hasher."""
if self.result is None:
# Save the first piece digest and initialize a new piece hasher
self.pieceHash.append(self.fileHasher.digest())
- self.pieceHasher = self._new()
+ self.pieceHasher = self._newSHA1()
if self.pieceHasher:
# Loop in case the data contains multiple pieces
# Save the piece hash and start a new one
self.pieceHasher.update(data[:(PIECE_SIZE - self.pieceSize)])
self.pieceHash.append(self.pieceHasher.digest())
- self.pieceHasher = self._new()
+ self.pieceHasher = self._newSHA1()
# Don't forget to hash the data normally
self.fileHasher.update(data[:(PIECE_SIZE - self.pieceSize)])
timeout = 5
if sys.version_info < (2, 4):
skip = "skippingme"
-
+
def test_failure(self):
"""Tests that the hash object fails when treated badly."""
h = HashObject()
self.failUnlessRaises(HashError, h.update, 'gfgf')
def test_pieces(self):
- """Tests the hashing of large files into pieces."""
+ """Tests updating of pieces a little at a time."""
+ h = HashObject()
+ h.new(True)
+ for i in xrange(120*1024):
+ h.update('1234567890')
+ pieces = h.pieceDigests()
+ self.failUnless(h.digest() == '1(j\xd2q\x0b\n\x91\xd2\x13\x90\x15\xa3E\xcc\xb0\x8d.\xc3\xc5')
+ self.failUnless(len(pieces) == 3)
+ self.failUnless(pieces[0] == ',G \xd8\xbbPl\xf1\xa3\xa0\x0cW\n\xe6\xe6a\xc9\x95/\xe5')
+ self.failUnless(pieces[1] == '\xf6V\xeb/\xa8\xad[\x07Z\xf9\x87\xa4\xf5w\xdf\xe1|\x00\x8e\x93')
+ self.failUnless(pieces[2] == 'M[\xbf\xee\xaa+\x19\xbaV\xf699\r\x17o\xcb\x8e\xcfP\x19')
+
+ def test_pieces_at_once(self):
+ """Tests the updating of multiple pieces at once."""
h = HashObject()
h.new()
h.update('1234567890'*120*1024)
self.failUnless(pieces[0] == ',G \xd8\xbbPl\xf1\xa3\xa0\x0cW\n\xe6\xe6a\xc9\x95/\xe5')
self.failUnless(pieces[1] == '\xf6V\xeb/\xa8\xad[\x07Z\xf9\x87\xa4\xf5w\xdf\xe1|\x00\x8e\x93')
self.failUnless(pieces[2] == 'M[\xbf\xee\xaa+\x19\xbaV\xf699\r\x17o\xcb\x8e\xcfP\x19')
+
+ def test_pieces_boundaries(self):
+ """Tests the updating exactly to piece boundaries."""
+ h = HashObject()
h.new(True)
- for i in xrange(120*1024):
- h.update('1234567890')
+ h.update('1234567890'*52428)
+ h.update('12345678')
+ assert h.size % PIECE_SIZE == 0
+ h.update('90')
+ h.update('1234567890'*52428)
+ h.update('123456')
+ assert h.size % PIECE_SIZE == 0
+ h.update('7890')
+ h.update('1234567890'*18022)
+ assert h.size == 10*120*1024
pieces = h.pieceDigests()
self.failUnless(h.digest() == '1(j\xd2q\x0b\n\x91\xd2\x13\x90\x15\xa3E\xcc\xb0\x8d.\xc3\xc5')
self.failUnless(len(pieces) == 3)
self.failUnless(pieces[1] == '\xf6V\xeb/\xa8\xad[\x07Z\xf9\x87\xa4\xf5w\xdf\xe1|\x00\x8e\x93')
self.failUnless(pieces[2] == 'M[\xbf\xee\xaa+\x19\xbaV\xf699\r\x17o\xcb\x8e\xcfP\x19')
+ def test_pieces_other_hashes(self):
+ """Tests updating of pieces a little at a time."""
+ h = HashObject()
+ for hashType in h.ORDER:
+ if hashType['name'] != 'sha1':
+ h.hashTypeNum = h.ORDER.index(hashType)
+ break
+ assert h.ORDER[h.hashTypeNum]['name'] != 'sha1'
+ h.new(True)
+ for i in xrange(120*1024):
+ h.update('1234567890')
+ pieces = h.pieceDigests()
+ self.failUnless(len(pieces) == 3)
+ self.failUnless(pieces[0] == ',G \xd8\xbbPl\xf1\xa3\xa0\x0cW\n\xe6\xe6a\xc9\x95/\xe5')
+ self.failUnless(pieces[1] == '\xf6V\xeb/\xa8\xad[\x07Z\xf9\x87\xa4\xf5w\xdf\xe1|\x00\x8e\x93')
+ self.failUnless(pieces[2] == 'M[\xbf\xee\xaa+\x19\xbaV\xf699\r\x17o\xcb\x8e\xcfP\x19')
+
def test_sha1(self):
"""Test hashing using the SHA1 hash."""
h = HashObject()