self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash))
def verifyHash(self, found_hash, path, true_hash):
- self.failUnless(found_hash[0] == true_hash,
- "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
+ self.failUnless(found_hash.hexexpected() == true_hash,
+ "%s hashes don't match: %s != %s" % (path, found_hash.hexexpected(), true_hash))
def test_findIndexHash(self):
lastDefer = defer.Deferred()
"""
self.hashTypeNum = self.ORDER.index(hashType) # error if not found
self.expHex = hashHex
- self.expSize = size
+ self.expSize = int(size)
self.expHash = a2b_hex(self.expHex)
def setFromIndexRecord(self, record):
def test_normalize(self):
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
+ h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567'), 0)
+ h.set(h.ORDER[0], b2a_hex('12345678901234567'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567\000\000\000')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), 0)
+ h.set(h.ORDER[0], b2a_hex('1234567890123456789012345'), '0')
self.failUnless(h.normexpected(bytes = 20) == '12345678901234567890')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('1234567890123456789'), 0)
+ h.set(h.ORDER[0], b2a_hex('1234567890123456789'), '0')
self.failUnless(h.normexpected(bytes = 20) == '1234567890123456789\000')
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('123456789012345678901'), 0)
+ h.set(h.ORDER[0], b2a_hex('123456789012345678901'), '0')
self.failUnless(h.normexpected(bits = 160) == '12345678901234567890')
def test_failure(self):
h = HashObject()
- h.set(h.ORDER[0], b2a_hex('12345678901234567890'), 0)
+ h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0')
self.failUnlessRaises(AssertionError, h.normexpected)
self.failUnlessRaises(AssertionError, h.digest)
self.failUnlessRaises(AssertionError, h.hexdigest)
found = True
break
self.failUnless(found == True)
- h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', 19)
+ h.set(hashType, 'c722df87e1acaa64b27aac4e174077afc3623540', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540')
found = True
break
self.failUnless(found == True)
- h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', 19)
+ h.set(hashType, '2a586bcd1befc5082c872dcd96a01403', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403')
found = True
break
self.failUnless(found == True)
- h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', 19)
+ h.set(hashType, '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7', '19')
h.new()
h.update('apt-dht is the best')
self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7')
from zlib import decompressobj, MAX_WBITS
from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
from urlparse import urlparse
-from binascii import a2b_hex, b2a_hex
-import os, sha, md5
+import os
from twisted.python import log, filepath
from twisted.internet import defer
if result:
log.msg('Hashes match: %s' % url)
else:
- log.msg('Hashed file to %s: %s' % (b2a_hex(result), url))
+ log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
self.updatedFile(url, destFile.path)
if ext:
def verifyHash(self, found_hash, path, true_hash):
self.failUnless(found_hash.hexexpected() == true_hash,
- "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
+ "%s hashes don't match: %s != %s" % (path, found_hash.hexexpected(), true_hash))
def test_findHash(self):
self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "_main_.*Packages$" | tail -n 1').read().rstrip('\n')
idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
d = self.client.findHash(idx_path)
- d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
'/var/lib/apt/lists/' + self.packagesFile +
' | cut -d\ -f 2').read().rstrip('\n')
d = self.client.findHash(pkg_path)
- d.addCallback(self.verifyHash, pkg_path, a2b_hex(pkg_hash))
+ d.addCallback(self.verifyHash, pkg_path, pkg_hash)
src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
'/var/lib/apt/lists/' + self.sourcesFile +
for i in range(len(src_hashes)):
src_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i]
d = self.client.findHash(src_path)
- d.addCallback(self.verifyHash, src_path, a2b_hex(src_hashes[i]))
+ d.addCallback(self.verifyHash, src_path, src_hashes[i])
idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
'/var/lib/apt/lists/' + self.releaseFile +
idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2'
d = self.client.findHash(idx_path)
- d.addCallback(self.verifyHash, idx_path, a2b_hex(idx_hash))
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
d.addBoth(lastDefer.callback)
return lastDefer
from PeerManager import PeerManager
from HTTPServer import TopLevel
from MirrorManager import MirrorManager
+from Hash import HashObject
class AptDHT:
def __init__(self, dht):
log.msg('Trying to find hash for %s' % path)
findDefer = self.mirrors.findHash(path)
- findDefer.addCallback(self.findHash_done, path, d)
- findDefer.addErrback(self.findHash_error, path, d)
+ findDefer.addCallbacks(self.findHash_done, self.findHash_error,
+ callbackArgs=(path, d), errbackArgs=(path, d))
+ findDefer.addErrback(log.err)
return d
def findHash_error(self, failure, path, d):
log.err(failure)
- self.findHash_done((None, None), path, d)
+ self.findHash_done(HashObject(), path, d)
def findHash_done(self, hash, path, d):
if hash.expected() is None: