from os.path import dirname, basename
import re, shelve, shutil, fcntl
from twisted.internet import process
-import apt_proxy, copy, UserDict
-from misc import log
+from twisted.python import log
+import copy, UserDict
+from twisted.trial import unittest
-aptpkg_dir='.apt-proxy'
-apt_pkg.InitSystem()
+aptpkg_dir='.apt-dht'
+apt_pkg.init()
class AptDpkgInfo(UserDict.UserDict):
"""
# Make sure that file is always closed.
filehandle.close()
except SystemError:
- log.debug("Had problems reading: %s"%(filename), 'AptDpkgInfo')
+ log.msg("Had problems reading: %s"%(filename))
raise
for line in self.control.split('\n'):
if line.find(': ') != -1:
key, value = line.split(': ', 1)
self.data[key] = value
-class PackageFileList:
+class PackageFileList(UserDict.DictMixin):
"""
Manages a list of package files belonging to a backend
"""
def __init__(self, backendName, cache_dir):
self.cache_dir = cache_dir
- self.packagedb_dir = cache_dir+'/'+ apt_proxy.status_dir + \
+ self.packagedb_dir = cache_dir+'/'+ aptpkg_dir + \
'/backends/' + backendName
if not os.path.exists(self.packagedb_dir):
os.makedirs(self.packagedb_dir)
def open(self):
if self.packages is None:
self.packages = shelve.open(self.packagedb_dir+'/packages.db')
+
def close(self):
if self.packages is not None:
self.packages.close()
- def update_file(self, entry):
+ def update_file(self, filename, cache_path, file_path):
"""
Called from apt_proxy.py when files get updated so we can update our
fake lists/ directory and sources.list.
-
- @param entry CacheEntry for cached file
"""
- if entry.filename=="Packages" or entry.filename=="Release":
- log.msg("Registering package file: "+entry.cache_path, 'apt_pkg', 4)
- stat_result = os.stat(entry.file_path)
- self.packages[entry.cache_path] = stat_result
+ if filename=="Packages" or filename=="Release":
+ log.msg("Registering package file: "+cache_path)
+ self.packages[cache_path] = file_path
+ return True
+ return False
- def get_files(self):
+ def check_files(self):
"""
- Get list of files in database. Each file will be checked that it exists
+ Check all files in the database to make sure it exists.
"""
files = self.packages.keys()
#print self.packages.keys()
for f in files:
- if not os.path.exists(self.cache_dir + os.sep + f):
- log.debug("File in packages database has been deleted: "+f, 'apt_pkg')
- del files[files.index(f)]
+ if not os.path.exists(self.packages[f]):
+ log.msg("File in packages database has been deleted: "+f)
del self.packages[f]
- return files
+
+ def __getitem__(self, key): return self.packages[key]
+ def __setitem__(self, key, item): self.packages[key] = item
+ def __delitem__(self, key): del self.packages[key]
+ def keys(self): return self.packages.keys()
class AptPackages:
"""
"""
DEFAULT_APT_CONFIG = {
#'APT' : '',
- 'APT::Architecture' : 'i386', # TODO: Fix this, see bug #436011 and #285360
+ #'APT::Architecture' : 'amd64', # TODO: Fix this, see bug #436011 and #285360
#'APT::Default-Release' : 'unstable',
'Dir':'.', # /
#print "start aptPackages [%s] %s " % (self.backendName, self.cache_dir)
self.packages.close()
#print "Deleted aptPackages [%s] %s " % (self.backendName, self.cache_dir)
- def file_updated(self, entry):
+ def file_updated(self, filename, cache_path, file_path):
"""
A file in the backend has changed. If this affects us, unload our apt database
"""
- if self.packages.update_file(entry):
+ if self.packages.update_file(filename, cache_path, file_path):
self.unload()
def __save_stdout(self):
sources_filename = self.status_dir+'/'+'apt/etc/sources.list'
sources = open(sources_filename, 'w')
sources_count = 0
- for file in self.packages.get_files():
+ self.packages.check_files()
+ for f in self.packages:
# we should probably clear old entries from self.packages and
# take into account the recorded mtime as optimization
- filepath = self.cache_dir + file
- fake_uri='http://apt-proxy/'+file
+ filepath = self.packages[f]
+ fake_uri='http://apt-dht/'+f
source_line='deb '+dirname(fake_uri)+'/ /'
listpath=(self.status_dir+'/apt/lists/'
+apt_pkg.URItoFileName(fake_uri))
sources.write(source_line+'\n')
- log.debug("Sources line: " + source_line, 'apt_pkg')
+ log.msg("Sources line: " + source_line)
sources_count = sources_count + 1
try:
os.unlink(listpath)
except:
pass
- os.symlink('../../../../../'+file, listpath)
+ os.symlink(self.packages[f], listpath)
sources.close()
if sources_count == 0:
- log.msg("No Packages files available for %s backend"%(self.backendName), 'apt_pkg')
+ log.msg("No Packages files available for %s backend"%(self.backendName))
return False
- log.msg("Loading Packages database for "+self.status_dir,'apt_pkg')
+ log.msg("Loading Packages database for "+self.status_dir)
#apt_pkg.Config = apt_pkg.newConfiguration(); #-- this causes unit tests to fail!
for key, value in self.apt_config.items():
apt_pkg.Config[key] = value
# for I in apt_pkg.Config.keys():
# print "%s \"%s\";"%(I,apt_pkg.Config[I]);
- if log.isEnabled('apt'):
+ # apt_pkg prints progress messages to stdout, disable
+ self.__save_stdout()
+ try:
self.cache = apt_pkg.GetCache()
- else:
- # apt_pkg prints progress messages to stdout, disable
- self.__save_stdout()
- try:
- self.cache = apt_pkg.GetCache()
- finally:
- self.__restore_stdout()
+ finally:
+ self.__restore_stdout()
self.records = apt_pkg.GetPkgRecords(self.cache)
#for p in self.cache.Packages:
imported_count = 0
if not os.path.exists(dir):
- log.err('Directory ' + dir + ' does not exist', 'import')
+ log.err('Directory ' + dir + ' does not exist')
return
if recursive:
- log.msg("Importing packages from directory tree: " + dir, 'import',3)
+ log.msg("Importing packages from directory tree: " + dir)
for root, dirs, files in os.walk(dir):
for file in files:
imported_count += import_file(factory, root, file)
else:
- log.debug("Importing packages from directory: " + dir, 'import',3)
+ log.msg("Importing packages from directory: " + dir)
for file in os.listdir(dir):
mode = os.stat(dir + '/' + file)[stat.ST_MODE]
if not stat.S_ISDIR(mode):
Import a .deb or .udeb into cache from given filename
"""
if file[-4:]!='.deb' and file[-5:]!='.udeb':
- log.msg("Ignoring (unknown file type):"+ file, 'import')
+ log.msg("Ignoring (unknown file type):"+ file)
return 0
- log.debug("considering: " + dir + '/' + file, 'import')
+ log.msg("considering: " + dir + '/' + file)
try:
paths = get_mirror_path(factory, dir+'/'+file)
except SystemError:
- log.msg(file + ' skipped - wrong format or corrupted', 'import')
+ log.msg(file + ' skipped - wrong format or corrupted')
return 0
if paths:
if len(paths) != 1:
- log.debug("WARNING: multiple ocurrences", 'import')
- log.debug(str(paths), 'import')
+ log.msg("WARNING: multiple ocurrences")
+ log.msg(str(paths), 'import')
cache_path = paths[0]
else:
- log.debug("Not found, trying to guess", 'import')
+ log.msg("Not found, trying to guess")
info = AptDpkgInfo(dir+'/'+file)
cache_path = closest_match(info,
get_mirror_versions(factory, info['Package']))
if cache_path:
- log.debug("MIRROR_PATH:"+ cache_path, 'import')
+ log.msg("MIRROR_PATH:"+ cache_path)
src_path = dir+'/'+file
dest_path = factory.config.cache_dir+cache_path
if not os.path.exists(dest_path):
- log.debug("IMPORTING:" + src_path, 'import')
+ log.msg("IMPORTING:" + src_path)
dest_path = re.sub(r'/\./', '/', dest_path)
if not os.path.exists(dirname(dest_path)):
os.makedirs(dirname(dest_path))
if hasattr(factory, 'access_times'):
atime = os.stat(src_path)[stat.ST_ATIME]
factory.access_times[cache_path] = atime
- log.msg(file + ' imported', 'import')
+ log.msg(file + ' imported')
return 1
else:
- log.msg(file + ' skipped - already in cache', 'import')
+ log.msg(file + ' skipped - already in cache')
return 0
else:
- log.msg(file + ' skipped - no suitable backend found', 'import')
+ log.msg(file + ' skipped - no suitable backend found')
return 0
-def test(factory, file):
- "Just for testing purposes, this should probably go to hell soon."
- for backend in factory.backends:
- backend.get_packages_db().load()
+class TestAptPackages(unittest.TestCase):
+ """Unit tests for the AptPackages cache."""
+
+ pending_calls = []
- info = AptDpkgInfo(file)
- path = get_mirror_path(factory, file)
- print "Exact Match:"
- print "\t%s:%s"%(info['Version'], path)
-
- vers = get_mirror_versions(factory, info['Package'])
- print "Other Versions:"
- for ver in vers:
- print "\t%s:%s"%(ver)
- print "Guess:"
- print "\t%s:%s"%(info['Version'], closest_match(info, vers))
-if __name__ == '__main__':
- from apt_proxy_conf import factoryConfig
- class DummyFactory:
- def debug(self, msg):
- pass
- factory = DummyFactory()
- factoryConfig(factory)
- test(factory,
- '/home/ranty/work/apt-proxy/related/tools/galeon_1.2.5-1_i386.deb')
- test(factory,
- '/storage/apt-proxy/debian/dists/potato/main/binary-i386/base/'
- +'libstdc++2.10_2.95.2-13.deb')
+ def test_sha1(self):
+ a = AptPackages('whatever', '/tmp')
+
+ packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | tail -n 1').read().rstrip('\n')
+ for f in os.walk('/var/lib/apt/lists').next()[2]:
+ if f[-7:] == "Release" and packagesFile.startswith(f[:-7]):
+ releaseFile = f
+ break
+
+ a.file_updated('Release', releaseFile[releaseFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + releaseFile)
+ a.file_updated('Packages', packagesFile[packagesFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + packagesFile)
+
+ a.load()
+
+ a.records.Lookup(a.cache['dpkg'].VersionList[0].FileList[0])
+
+ pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + '/var/lib/apt/lists/' + packagesFile + ' | grep -E "^SHA1:" | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
- cleanup(factory)
+ self.failUnless(a.records.SHA1Hash == pkg_hash)
+ def tearDown(self):
+ for p in self.pending_calls:
+ if p.active():
+ p.cancel()
+ self.pending_calls = []