- def get_mirror_path(self, name, version):
- "Find the path for version 'version' of package 'name'"
- if not self.load(): return None
- try:
- for pack_vers in self.cache[name].VersionList:
- if(pack_vers.VerStr == version):
- file, index = pack_vers.FileList[0]
- self.records.Lookup((file,index))
- path = self.records.FileName
- if len(path)>2 and path[0:2] == './':
- path = path[2:] # Remove any leading './'
- return path
-
- except KeyError:
- pass
- return None
-
-
- def get_mirror_versions(self, package_name):
- """
- Find the available versions of the package name given
- @type package_name: string
- @param package_name: package name to search for e.g. ;apt'
- @return: A list of mirror versions available
-
- """
- vers = []
- if not self.load(): return vers
- try:
- for pack_vers in self.cache[package_name].VersionList:
- vers.append(pack_vers.VerStr)
- except KeyError:
- pass
- return vers
-
-
-def cleanup(factory):
- for backend in factory.backends.values():
- backend.get_packages_db().cleanup()
-
-def get_mirror_path(factory, file):
- """
- Look for the path of 'file' in all backends.
- """
- info = AptDpkgInfo(file)
- paths = []
- for backend in factory.backends.values():
- path = backend.get_packages_db().get_mirror_path(info['Package'],
- info['Version'])
- if path:
- paths.append('/'+backend.base+'/'+path)
- return paths
-
-def get_mirror_versions(factory, package):
- """
- Look for the available version of a package in all backends, given
- an existing package name
- """
- all_vers = []
- for backend in factory.backends.values():
- vers = backend.get_packages_db().get_mirror_versions(package)
- for ver in vers:
- path = backend.get_packages_db().get_mirror_path(package, ver)
- all_vers.append((ver, "%s/%s"%(backend.base,path)))
- return all_vers
-
-def closest_match(info, others):
- def compare(a, b):
- return apt_pkg.VersionCompare(a[0], b[0])
-
- others.sort(compare)
- version = info['Version']
- match = None
- for ver,path in others:
- if version <= ver:
- match = path
- break
- if not match:
- if not others:
- return None
- match = others[-1][1]
-
- dirname=re.sub(r'/[^/]*$', '', match)
- version=re.sub(r'^[^:]*:', '', info['Version'])
- if dirname.find('/pool/') != -1:
- return "/%s/%s_%s_%s.deb"%(dirname, info['Package'],
- version, info['Architecture'])
- else:
- return "/%s/%s_%s.deb"%(dirname, info['Package'], version)
-
-def import_directory(factory, dir, recursive=0):
- """
- Import all files in a given directory into the cache
- This is used by apt-proxy-import to import new files
- into the cache
- """
- imported_count = 0
-
- if not os.path.exists(dir):
- log.err('Directory ' + dir + ' does not exist')
- return
-
- if recursive:
- log.msg("Importing packages from directory tree: " + dir)
- for root, dirs, files in os.walk(dir):
- for file in files:
- imported_count += import_file(factory, root, file)
- else:
- log.msg("Importing packages from directory: " + dir)
- for file in os.listdir(dir):
- mode = os.stat(dir + '/' + file)[stat.ST_MODE]
- if not stat.S_ISDIR(mode):
- imported_count += import_file(factory, dir, file)
-
- for backend in factory.backends.values():
- backend.get_packages_db().unload()
-
- log.msg("Imported %s files" % (imported_count))
- return imported_count
-
-def import_file(factory, dir, file):
- """
- Import a .deb or .udeb into cache from given filename
- """
- if file[-4:]!='.deb' and file[-5:]!='.udeb':
- log.msg("Ignoring (unknown file type):"+ file)
- return 0
-
- log.msg("considering: " + dir + '/' + file)
- try:
- paths = get_mirror_path(factory, dir+'/'+file)
- except SystemError:
- log.msg(file + ' skipped - wrong format or corrupted')
- return 0
- if paths:
- if len(paths) != 1:
- log.msg("WARNING: multiple ocurrences")
- log.msg(str(paths), 'import')
- cache_path = paths[0]
- else:
- log.msg("Not found, trying to guess")
- info = AptDpkgInfo(dir+'/'+file)
- cache_path = closest_match(info,
- get_mirror_versions(factory, info['Package']))
- if cache_path:
- log.msg("MIRROR_PATH:"+ cache_path)
- src_path = dir+'/'+file
- dest_path = factory.config.cache_dir+cache_path
-
- if not os.path.exists(dest_path):
- log.msg("IMPORTING:" + src_path)
- dest_path = re.sub(r'/\./', '/', dest_path)
- if not os.path.exists(dirname(dest_path)):
- os.makedirs(dirname(dest_path))
- f = open(dest_path, 'w')
- fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
- f.truncate(0)
- shutil.copy2(src_path, dest_path)
- f.close()
- if hasattr(factory, 'access_times'):
- atime = os.stat(src_path)[stat.ST_ATIME]
- factory.access_times[cache_path] = atime
- log.msg(file + ' imported')
- return 1
- else:
- log.msg(file + ' skipped - already in cache')
- return 0
-
- else:
- log.msg(file + ' skipped - no suitable backend found')
- return 0
-