summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config.py3
-rw-r--r--filter.py165
-rw-r--r--pato2.py339
-rw-r--r--test/blacklist_sample2
-rw-r--r--test/core.db.tar.gzbin0 -> 1345 bytes
-rw-r--r--test/depends4
-rw-r--r--test/desc39
-rw-r--r--test/rsync_output_sample14
-rw-r--r--test/test1.py66
-rw-r--r--test/test_filter.py192
10 files changed, 547 insertions, 277 deletions
diff --git a/config.py b/config.py
index 56a7c81..39f512a 100644
--- a/config.py
+++ b/config.py
@@ -32,6 +32,9 @@ for var in boolvars:
else:
print('%s is not True or False' % var)
+# Rsync commands
+rsync_list_command="rsync -a --no-motd --list-only "
+
# Classes and Exceptions
class NonValidFile(ValueError): pass
class NonValidDir(ValueError): pass
diff --git a/filter.py b/filter.py
index ffc0965..668822b 100644
--- a/filter.py
+++ b/filter.py
@@ -1,56 +1,66 @@
-#! /usr/bin/python
+ #! /usr/bin/python
#-*- encoding: utf-8 -*-
-import commands
-import os
-import re
+from glob import glob
from repm.config import *
from repm.pato2 import *
-rsync_list_command="rsync -a --no-motd --list-only "
+def pkginfo_from_filename(filename):
+ """ Generates a Package object with info from a filename,
+ filename can be relative or absolute
+
+ Parameters:
+ ----------
+ filename -> str Must contain .pkg.tar.
-def generate_rsync_command(base_command, dir_list, destdir=repodir, mirror_name=mirror,
- mirror_path=mirrorpath, blacklist_file=False):
- """ Generates an rsync command for executing it by combining all parameters.
+ Returns:
+ ----------
+ pkg -> Package object"""
+ if ".pkg.tar." not in filename:
+ raise NonValidFile
+ pkg = Package()
+ pkg["location"] = filename
+ fileattrs = os.path.basename(filename).split("-")
+ pkg["arch"] = fileattrs.pop(-1).split(".")[0]
+ pkg["release"] = fileattrs.pop(-1)
+ pkg["version"] = fileattrs.pop(-1)
+ pkg["name"] = "-".join(fileattrs)
+ return pkg
+
+def pkginfo_from_desc(filename):
+ """ Returns pkginfo from desc file.
Parameters:
----------
- base_command -> str
- mirror_name -> str
- mirror_path -> str
- dir_list -> list or tuple
- destdir -> str Path to dir, dir must exist.
- blacklist_file -> False or str Path to file, file must exist.
+ filename -> str File must exist
- Return:
+ Returns:
----------
- rsync_command -> str """
- from os.path import isfile, isdir
-
- if blacklist_file and not isfile(blacklist_file):
- print(blacklist_file + " is not a file")
+ pkg -> Package object"""
+ if not os.path.isfile(filename):
raise NonValidFile
-
- if not os.path.isdir(destdir):
- print(destdir + " is not a directory")
- raise NonValidDir
-
- dir_list="{" + ",".join(dir_list) + "}"
-
- if blacklist_file:
- return " ".join((base_command, "--exclude-from-file="+blacklist_file,
- mirror_name + mirror_path + dir_list, destdir))
- return " ".join((base_command, mirror_name + mirror_path + dir_list, destdir))
-
-def run_rsync(base_for_rsync=rsync_list_command, dir_list_for_rsync=(repo_list + dir_list),
- debug=verbose):
- """ Runs rsync and gets returns it's output """
- cmd = str(generate_rsync_command(rsync_list_command, (repo_list + dir_list)))
- if debug:
- printf("rsync_command" + cmd)
- return commands.getoutput(cmd)
-
-def get_file_list_from_rsync_output(rsync_output):
- """ Generates a list of packages and versions from an rsync output using --list-only --no-motd.
+ try:
+ f=open(filename)
+ info=f.read().rsplit()
+ finally:
+ f.close()
+ pkg = Package()
+ info_map={"name" :("%NAME%" , None),
+ "version" :("%VERSION%" , 0 ),
+ "release" :("%VERSION%" , 1 ),
+ "arch" :("%ARCH%" , None),
+ "license" :("%LICENSE%" , None),
+ "location":("%FILENAME%", None),}
+
+ for key in info_map.keys():
+ field,pos=info_map[key]
+ pkg[key]=info[info.index(field)+1]
+ if pos is not None:
+ pkg[key]=pkg[key].split("-")[pos]
+ return pkg
+
+def pkginfo_from_rsync_output(rsync_output):
+ """ Generates a list of packages and versions from an rsync output
+ wich uses --list-only and --no-motd options.
Parameters:
----------
@@ -59,34 +69,54 @@ def get_file_list_from_rsync_output(rsync_output):
Returns:
----------
package_list -> tuple Contains Package objects. """
- a=list()
-
- def directory(line):
- pass
def package_or_link(line):
""" Take info out of filename """
location_field = 4
- pkg = Package()
- pkg["location"] = line.rsplit()[location_field]
- fileattrs = pkg["location"].split("/")[-1].split("-")
- pkg["arch"] = fileattrs.pop(-1).split(".")[0]
- pkg["release"] = fileattrs.pop(-1)
- pkg["version"] = fileattrs.pop(-1)
- pkg["name"] = "-".join(fileattrs)
- return pkg
-
- options = { "d": directory,
+ return pkginfo_from_filename(line.rsplit()[location_field])
+
+ def do_nothing():
+ pass
+
+ options = { "d": do_nothing,
"l": package_or_link,
- "-": package_or_link}
+ "-": package_or_link,
+ " ": do_nothing}
+
+ package_list=list()
+
+ lines=[x for x in rsync_output.split("\n") if ".pkg.tar" in x]
+
+ for line in lines:
+ pkginfo=options[line[0]](line)
+ if pkginfo:
+ package_list.append(pkginfo)
+
+ return tuple(package_list)
+
+def pkginfo_from_files_in_dir(directory):
+ """ Returns pkginfo from filenames of packages in dir
+ wich has .pkg.tar. on them
+
+ Parameters:
+ ----------
+ directory -> str Directory must exist
- for line in rsync_output.split("\n"):
- if ".pkg.tar" in line:
- pkginfo=options[line[0]](line)
- if pkginfo:
- a.append(pkginfo)
+ Returns:
+ ----------
+ package_list -> tuple Contains Package objects """
+ package_list=list()
- return tuple(a)
+ if not os.path.isdir(directory):
+ raise NonValidDir
+
+ for filename in glob(os.path.join(directory,"*")):
+ if ".pkg.tar." in filename:
+ package_list.append(pkginfo_from_filename(filename))
+ return tuple(package_list)
+
+def pkginfo_from_db(path_to_db):
+ """ """
def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,
exclude_file=rsync_blacklist, debug=verbose):
@@ -111,8 +141,7 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,
a.append(package["location"])
if debug:
- printf(a)
-
+ return a
try:
fsock = open(exclude_file,"w")
try:
@@ -121,4 +150,8 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,
fsock.close()
except IOError:
printf("%s wasnt written" % blacklist_file)
-
+
+if __name__ == "__main__":
+ a=run_rsync(rsync_list_command)
+ packages=pkginfo_from_rsync_output(a)
+ generate_exclude_list_from_blacklist(packages,listado(blacklist))
diff --git a/pato2.py b/pato2.py
index cfdd859..0d77d6b 100644
--- a/pato2.py
+++ b/pato2.py
@@ -25,175 +25,224 @@
"""
from repm.config import *
-
+from repm.filter import *
import tarfile
from glob import glob
from os.path import isdir, isfile
def printf(text,output_=output):
- """Guarda el texto en la variable log y puede imprimir en pantalla."""
- log_file = open(logname, 'a')
- log_file.write("\n" + str(text) + "\n")
- log_file.close()
- if output_: print (str(text) + "\n")
+ """Guarda el texto en la variable log y puede imprimir en pantalla."""
+ log_file = open(logname, 'a')
+ log_file.write("\n" + str(text) + "\n")
+ log_file.close()
+ if output_: print (str(text) + "\n")
def listado(filename_):
- """Obtiene una lista de paquetes de un archivo."""
- archivo = open(filename_,"r")
- lista = archivo.read().split("\n")
- archivo.close()
- return [pkg.split(":")[0] for pkg in lista if pkg]
+ """Obtiene una lista de paquetes de un archivo."""
+ archivo = open(filename_,"r")
+ lista = archivo.read().split("\n")
+ archivo.close()
+ return [pkg.split(":")[0].rstrip() for pkg in lista if pkg]
def db(repo_,arch_):
- """Construye un nombre para sincronizar una base de datos."""
- return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_)
+ """Construye un nombre para sincronizar una base de datos."""
+ return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_)
def packages(repo_, arch_, expr="*"):
- """ Get packages on a repo, arch folder """
- return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) )
-
-def sync_all_repo(verbose_=verbose):
- folders = ",".join(repo_list + dir_list)
- cmd_ = "rsync -av --delete-after --delay-updates " + mirror + mirrorpath + "/{" + folders + "} " + repodir
- printf(cmd_)
- a=commands.getoutput(cmd_)
- if verbose_: printf(a)
+ """ Get packages on a repo, arch folder """
+ return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) )
+
+def sync_all_repo(debug=verbose):
+ cmd=generate_rsync_command(rsync_list_command)
+ rsout=run_rsync(cmd)
+ pkgs=pkginfo_from_rsync_output(rsout)
+ generate_exclude_list_from_blacklist(pkgs,listado(blacklist),debug=False)
+ cmd=generate_rsync_command(rsync_update_command,blacklist_file=rsync_blacklist)
+ a=run_rsync(cmd)
+ cmd=generate_rsync_command(rsync_post_command,blacklist_file=rsync_blacklist)
+ b=run_rsync(cmd)
+ if debug:
+ printf(a)
+ printf(b)
def get_from_desc(desc, var,db_tar_file=False):
- """ Get a var from desc file """
- desc = desc.split("\n")
- return desc[desc.index(var)+1]
+ """ Get a var from desc file """
+ desc = desc.split("\n")
+ return desc[desc.index(var)+1]
def get_info(repo_,arch_,db_tar_file=False,verbose_=verbose):
- """ Makes a list of package name, file and license """
- info=list()
- # Extract DB tar.gz
- commands.getoutput("mkdir -p " + archdb)
- if not db_tar_file:
- db_tar_file = repodir + db(repo_,arch_)
- if isfile(db_tar_file):
- try:
- db_open_tar = tarfile.open(db_tar_file, 'r:gz')
- except tarfile.ReadError:
- printf("No valid db_file %s" % db_tar_file)
- return(tuple())
- else:
- printf("No db_file %s" % db_tar_file)
- return(tuple())
- for file in db_open_tar.getmembers():
- db_open_tar.extract(file, archdb)
- db_open_tar.close()
- # Get info from file
- for dir_ in glob(archdb + "/*"):
- if isdir(dir_) and isfile(dir_ + "/desc"):
- pkg_desc_file = open(dir_ + "/desc", "r")
- desc = pkg_desc_file.read()
- pkg_desc_file.close()
- info.append(( get_from_desc(desc,"%NAME%"),
- dir_.split("/")[-1],
- get_from_desc(desc,"%LICENSE%") ))
- if verbose_: printf(info)
- commands.getoutput("rm -r %s/*" % archdb)
- return tuple(info)
+ """ Makes a list of package name, file and license """
+ info=list()
+ # Extract DB tar.gz
+ commands.getoutput("mkdir -p " + archdb)
+ if not db_tar_file:
+ db_tar_file = repodir + db(repo_,arch_)
+ if isfile(db_tar_file):
+ try:
+ db_open_tar = tarfile.open(db_tar_file, 'r:gz')
+ except tarfile.ReadError:
+ printf("No valid db_file %s" % db_tar_file)
+ return(tuple())
+ else:
+ printf("No db_file %s" % db_tar_file)
+ return(tuple())
+ for file in db_open_tar.getmembers():
+ db_open_tar.extract(file, archdb)
+ db_open_tar.close()
+ # Get info from file
+ for dir_ in glob(archdb + "/*"):
+ if isdir(dir_) and isfile(dir_ + "/desc"):
+ pkg_desc_file = open(dir_ + "/desc", "r")
+ desc = pkg_desc_file.read()
+ pkg_desc_file.close()
+ info.append(( get_from_desc(desc,"%NAME%"),
+ dir_.split("/")[-1],
+ get_from_desc(desc,"%LICENSE%") ))
+ if verbose_: printf(info)
+ commands.getoutput("rm -r %s/*" % archdb)
+ return tuple(info)
def make_pending(repo_,arch_,info_):
- """ Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending"""
- search = tuple( listado(blacklist) + listado (whitelist) )
- if verbose: printf("blaclist + whitelist= " + str(search) )
- lista_=list()
- for (name,pkg_,license_) in info_:
- if "custom" in license_:
- if name not in search:
- lista_.append( (name, license_ ) )
- elif not name:
- printf( pkg_ + " package has no %NAME% attibute " )
- if verbose: printf( lista_ )
- a=open( pending + "-" + repo_ + ".txt", "w" ).write(
- "\n".join([name + ":" + license_ for (name,license_) in lista_]) )
+ """ Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending"""
+ search = tuple( listado(blacklist) + listado (whitelist) )
+ if verbose: printf("blaclist + whitelist= " + str(search) )
+ lista_=list()
+ for (name,pkg_,license_) in info_:
+ if "custom" in license_:
+ if name not in search:
+ lista_.append( (name, license_ ) )
+ elif not name:
+ printf( pkg_ + " package has no %NAME% attibute " )
+ if verbose: printf( lista_ )
+ a=open( pending + "-" + repo_ + ".txt", "w" ).write(
+ "\n".join([name + ":" + license_ for (name,license_) in lista_]) + "\n")
def remove_from_blacklist(repo_,arch_,info_,blacklist_):
- """ Check the blacklist and remove packages on the db"""
- lista_=list()
- pack_=list()
- for (name_, pkg_, license_) in info_:
- if name_ in blacklist_:
- lista_.append(name_)
- for p in packages(repo_,arch_,pkg_ + "*"):
- pack_.append(p)
- if lista_:
- lista_=" ".join(lista_)
- com_ = "repo-remove " + repodir + db(repo_,arch_) + " " + lista_
- printf(com_)
- a = commands.getoutput(com_)
- if verbose: printf(a)
- if pack_:
- pack_=" ".join(pack_)
- com_="chmod a-r " + pack_
- printf(com_)
- a=commands.getoutput(com_)
- if verbose: printf(a)
+ """ Check the blacklist and remove packages on the db"""
+ lista_=list()
+ pack_=list()
+ for (name_, pkg_, license_) in info_:
+ if name_ in blacklist_:
+ lista_.append(name_)
+ for p in packages(repo_,arch_,pkg_ + "*"):
+ pack_.append(p)
+ if lista_:
+ lista_=" ".join(lista_)
+ com_ = "repo-remove " + repodir + db(repo_,arch_) + " " + lista_
+ printf(com_)
+ a = commands.getoutput(com_)
+ if verbose: printf(a)
+
+def cleanup_nonfree_in_dir(directory,blacklisted_names):
+ pkgs=pkginfo_from_files_in_dir(directory)
+ for package in pkgs:
+ if package["name"] in blacklisted_names:
+ os.remove(package["location"])
def link(repo_,arch_,file_):
- """ Makes a link in the repo for the package """
- cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_
- a=commands.getoutput(cmd_)
- if verbose:
- printf(cmd_ + a)
+ """ Makes a link in the repo for the package """
+ cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_
+ a=commands.getoutput(cmd_)
+ if verbose:
+ printf(cmd_ + a)
def add_free_repo(verbose_=verbose):
- for repo_ in repo_list:
- for arch_ in arch_list:
- lista_=list()
- for file_ in glob(free_path + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"):
- lista_.append(file_)
- link(repo_,arch_,file_)
- for dir_ in other:
- for file_ in glob(free_path + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"):
- lista_.append(file_)
- link(repo_,arch_,file_)
- if lista_:
- lista_=" ".join(lista_)
- if verbose: printf(lista_)
- cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_
- printf(cmd_)
- a=commands.getoutput(cmd_)
- if verbose: printf(a)
+ cmd_=os.path.join(home,"/usr/bin/sync-free")
+ printf(cmd_)
+ a=commands.getoutput(cmd_)
+ if verbose_: printf(a)
+ for repo_ in repo_list:
+ for arch_ in arch_list:
+ lista_=list()
+ for file_ in glob(freedir + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"):
+ lista_.append(file_)
+ for dir_ in other:
+ for file_ in glob(freedir + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"):
+ lista_.append(file_)
+
+ printf(lista_)
+
+ if lista_:
+ lista_=" ".join(lista_)
+ if verbose: printf(lista_)
+ cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_
+ printf(cmd_)
+ a=commands.getoutput(cmd_)
+ if verbose: printf(a)
def get_licenses(verbose_=verbose):
- """ Extract the license from packages in repo_,arch_ and in pending_ file"""
- cmd_=home + "/usr/bin/get_license.sh"
- printf(cmd_)
- a=commands.getoutput(cmd_)
- if verbose_: printf(a)
+ """ Extract the license from packages in repo_,arch_ and in pending_ file"""
+ cmd_=home + "/usr/bin/get_license.sh"
+ printf(cmd_)
+ a=commands.getoutput(cmd_)
+ if verbose_: printf(a)
+
+def generate_rsync_command(base_command, dir_list=(repo_list + dir_list), destdir=repodir,
+ source=mirror+mirrorpath, blacklist_file=False):
+ """ Generates an rsync command for executing it by combining all parameters.
+
+ Parameters:
+ ----------
+ base_command -> str
+ dir_list -> list or tuple
+ destdir -> str Path to dir, dir must exist.
+ source -> str The source for rsync
+ blacklist_file -> False or str Path to file, file must exist.
+
+ Return:
+ ----------
+ rsync_command -> str """
+ from os.path import isfile, isdir
+
+ if blacklist_file and not isfile(blacklist_file):
+ print(blacklist_file + " is not a file")
+ raise NonValidFile
+
+ if not os.path.isdir(destdir):
+ print(destdir + " is not a directory")
+ raise NonValidDir
+
+ dir_list="{" + ",".join(dir_list) + "}"
+
+ if blacklist_file:
+ return " ".join((base_command, "--exclude-from="+blacklist_file,
+ os.path.join(source, dir_list), destdir))
+ return " ".join((base_command, os.path.join(source, dir_list), destdir))
+
+def run_rsync(command,debug=verbose):
+ """ Runs rsync and gets returns it's output """
+ if debug:
+ printf("rsync_command: " + command)
+ return commands.getoutput(command)
if __name__ == "__main__":
- from time import time
- start_time = time()
- def minute():
- return str(round((time() - start_time)/60, 1))
-
- printf(" Cleaning %s folder " % (tmp) )
- commands.getoutput("rm -r %s/*" % tmp)
- printf(" Syncing repo")
- sync_all_repo(True)
-
- printf(" Updating databases and pending files lists: minute %s \n" % minute() )
- for repo in repo_list:
- for arch in arch_list:
- printf( "\n" + repo + "-" + arch + "\n" )
- printf( "Get info: minute %s " % minute() )
- info=get_info(repo,arch)
- printf( "Make pending: minute %s" % minute() )
- make_pending(repo,arch,info)
- printf( "Update DB: minute %s" % minute() )
- remove_from_blacklist(
- repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) )
-
- printf("Adding Parabola Packages: minute %s\n" % minute() )
- add_free_repo(True)
-
- printf("Extracting licenses in pending: minute %s" % minute() )
- get_licenses()
-
- printf("\n\nDelay: %s minutes \n" % minute())
-
+ from time import time
+ start_time = time()
+ def minute():
+ return str(round((time() - start_time)/60, 1))
+
+ printf(" Cleaning %s folder " % (tmp) )
+ commands.getoutput("rm -r %s/*" % tmp)
+ printf(" Syncing repo")
+ sync_all_repo(True)
+
+ printf(" Updating databases and pending files lists: minute %s \n" % minute() )
+ for repo in repo_list:
+ for arch in arch_list:
+ printf( "\n" + repo + "-" + arch + "\n" )
+ printf( "Get info: minute %s " % minute() )
+ info=get_info(repo,arch)
+ printf( "Make pending: minute %s" % minute() )
+ make_pending(repo,arch,info)
+ printf( "Update DB: minute %s" % minute() )
+ remove_from_blacklist(
+ repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) )
+
+ printf("Adding Parabola Packages: minute %s\n" % minute() )
+ add_free_repo(True)
+
+ printf("Extracting licenses in pending: minute %s" % minute() )
+ get_licenses()
+
+ printf("\n\nDelay: %s minutes \n" % minute())
+
diff --git a/test/blacklist_sample b/test/blacklist_sample
new file mode 100644
index 0000000..2a02af6
--- /dev/null
+++ b/test/blacklist_sample
@@ -0,0 +1,2 @@
+alex:alex-libre: Aquí va un comentario
+gmime22 ::Non free dependencies \ No newline at end of file
diff --git a/test/core.db.tar.gz b/test/core.db.tar.gz
new file mode 100644
index 0000000..5eb2081
--- /dev/null
+++ b/test/core.db.tar.gz
Binary files differ
diff --git a/test/depends b/test/depends
new file mode 100644
index 0000000..7ff3ad4
--- /dev/null
+++ b/test/depends
@@ -0,0 +1,4 @@
+%DEPENDS%
+glibc>=2.13
+zlib
+
diff --git a/test/desc b/test/desc
new file mode 100644
index 0000000..abba644
--- /dev/null
+++ b/test/desc
@@ -0,0 +1,39 @@
+%FILENAME%
+binutils-2.21-4-x86_64.pkg.tar.xz
+
+%NAME%
+binutils
+
+%VERSION%
+2.21-4
+
+%DESC%
+A set of programs to assemble and manipulate binary and object files
+
+%GROUPS%
+base
+
+%CSIZE%
+3412892
+
+%ISIZE%
+17571840
+
+%MD5SUM%
+4e666f87c78998f4839f33dc06d2043a
+
+%URL%
+http://www.gnu.org/software/binutils/
+
+%LICENSE%
+GPL
+
+%ARCH%
+x86_64
+
+%BUILDDATE%
+1297240369
+
+%PACKAGER%
+Allan McRae <allan@archlinux.org>
+
diff --git a/test/rsync_output_sample b/test/rsync_output_sample
new file mode 100644
index 0000000..72d9cd0
--- /dev/null
+++ b/test/rsync_output_sample
@@ -0,0 +1,14 @@
+dr-xr-sr-x 4096 2010/09/11 11:37:10 .
+-rw-r--r-- 11 2011/02/08 00:00:01 lastsync
+drwxrwxr-x 15 2010/09/11 11:28:50 community-staging
+drwxrwxr-x 30 2010/09/11 11:28:50 community-staging/os
+drwxrwxr-x 8192 2011/02/07 17:00:01 community-staging/os/i686
+lrwxrwxrwx 52 2010/12/23 16:51:01 community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz -> ../../../pool/community/alex-2.3.4-1-i686.pkg.tar.xz
+lrwxrwxrwx 27 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db -> community-staging.db.tar.gz
+-rw-rw-r-- 2237 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db.tar.gz
+-rw-rw-r-- 3209 2011/02/07 14:00:13 community-staging/os/i686/community-staging.db.tar.gz.old
+drwxrwxr-x 15 2009/07/22 15:07:56 community
+drwxrwxr-x 40 2009/08/04 15:57:42 community/os
+drwxrwsr-x 36864 2011/02/03 05:00:01 community/os/any
+-rw-rw-r-- 303336 2010/07/16 10:06:28 community/os/any/any2dvd-0.34-4-any.pkg.tar.xz
+-rw-rw-r-- 221664 2010/03/28 15:55:48 community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz
diff --git a/test/test1.py b/test/test1.py
deleted file mode 100644
index 13b5660..0000000
--- a/test/test1.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# -*- encoding: utf-8 -*-
-""" """
-
-__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>"
-__version__ = "$Revision: 1.1 $"
-__date__ = "$Date: 2011/02/08 $"
-__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández"
-__license__ = "GPL3+"
-
-from repm.config import *
-from repm.filter import *
-import unittest
-
-class KnownValues(unittest.TestCase):
- directory_list=("drwxrwxr-x 15 2010/09/11 11:28:50 community-staging",
- "drwxrwxr-x 30 2010/09/11 11:28:50 community-staging/os",
- 'dr-xr-sr-x 4096 2010/09/11 11:37:10 .')
- # (rsync_out, name, version, arch, release, location)
- examples=(
- ("lrwxrwxrwx 53 2011/01/31 01:52:06 community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz -> ../../../pool/community/apvlv-0.1.0-2-i686.pkg.tar.xz", "apvlv","0.1.0","i686", "2", "community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz"),
- ("lrwxrwxrwx 56 2011/02/04 14:34:08 community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz -> ../../../pool/community/calibre-0.7.44-2-i686.pkg.tar.xz","calibre","0.7.44","i686", "2", "community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz"),
- ("-rw-rw-r-- 5846249 2010/11/13 10:54:25 pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz",
- "abuse","0.7.1","x86_64","1","pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz"),
- ("-rw-rw-r-- 982768 2011/02/05 14:38:17 pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz",
- "acetoneiso2","2.3","i686", "2", "pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz"),
- ("-rw-rw-r-- 982764 2011/02/05 14:38:40 pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz",
- "acetoneiso2","2.3","x86_64","2","pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz")
- )
-
- def generate_results(self, example_tuple, attr):
- rsync_out, name, version, arch, release, location = example_tuple
- return get_file_list_from_rsync_output(rsync_out)[0][attr], locals()[attr]
-
- def testDirectoryOutput(self):
- """get_file_list_from_rsync_output should ignore directories"""
- rsync_out="\n".join(self.directory_list)
- result=get_file_list_from_rsync_output(rsync_out)
- self.assertEqual(tuple(), result)
-
- def testNames(self):
- for i in self.examples:
- k,v = self.generate_results(example_tuple=i,attr="name")
- self.assertEqual(k, v)
-
- def testVersions(self):
- for i in self.examples:
- k,v = self.generate_results(example_tuple=i,attr="version")
- self.assertEqual(k, v)
-
- def testArchs(self):
- for i in self.examples:
- k,v = self.generate_results(example_tuple=i,attr="arch")
- self.assertEqual(k, v)
-
- def testReleases(self):
- for i in self.examples:
- k,v = self.generate_results(example_tuple=i,attr="release")
- self.assertEqual(k, v)
-
- def testLocations(self):
- for i in self.examples:
- k,v = self.generate_results(example_tuple=i,attr="location")
- self.assertEqual(k, v)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/test/test_filter.py b/test/test_filter.py
new file mode 100644
index 0000000..1906b87
--- /dev/null
+++ b/test/test_filter.py
@@ -0,0 +1,192 @@
+# -*- encoding: utf-8 -*-
+""" """
+
+__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>"
+__version__ = "$Revision: 1.1 $"
+__date__ = "$Date: 2011/02/08 $"
+__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández"
+__license__ = "GPL3+"
+
+from repm.config import *
+from repm.filter import *
+import unittest
+
+class pkginfo_from_file_KnownValues(unittest.TestCase):
+ # (filename, name, version, release, arch)
+ # filename is location
+ known=(
+ ("community-testing/os/i686/inputattach-1.24-3-i686.pkg.tar.xz","inputattach","1.24","3","i686"),
+ ("community-testing/os/i686/ngspice-22-1-i686.pkg.tar.xz","ngspice","22","1","i686"),
+ ("community-testing/os/i686/tmux-1.4-2-i686.pkg.tar.xz","tmux","1.4","2","i686"),
+ ("community-testing/os/i686/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"),
+ ("../../../pool/community/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"),
+ ("community-testing/os/x86_64/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"),
+ ("../../../pool/community/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"),
+ ("tor-0.2.1.29-2-x86_64.pkg.tar.xz","tor","0.2.1.29","2","x86_64"),
+ )
+
+ def generate_results(self, example_tuple, attr):
+ location, name, version, release, arch = example_tuple
+ return pkginfo_from_filename(location)[attr], locals()[attr]
+
+ def testReturnPackageObject(self):
+ for i in self.known:
+ location, name, version, release, arch = i
+ self.assertIsInstance(pkginfo_from_filename(location),Package)
+
+ def testNames(self):
+ for i in self.known:
+ k,v = self.generate_results(example_tuple=i,attr="name")
+ self.assertEqual(k, v)
+
+ def testVersions(self):
+ for i in self.known:
+ k,v = self.generate_results(example_tuple=i,attr="version")
+ self.assertEqual(k, v)
+
+ def testArchs(self):
+ for i in self.known:
+ k,v = self.generate_results(example_tuple=i,attr="arch")
+ self.assertEqual(k, v)
+
+ def testReleases(self):
+ for i in self.known:
+ k,v = self.generate_results(example_tuple=i,attr="release")
+ self.assertEqual(k, v)
+
+ def testLocations(self):
+ for i in self.known:
+ k,v = self.generate_results(example_tuple=i,attr="location")
+ self.assertEqual(k, v)
+
+class pkginfo_from_file_BadInput(unittest.TestCase):
+ bad=("community-testing/os/i686/community-testing.db",
+ "community-testing/os/i686/community-testing.db.tar.gz",
+ "community-testing/os/i686/community-testing.db.tar.gz.old",
+ "community-testing/os/i686/community-testing.files",
+ "community-testing/os/i686/community-testing.files.tar.gz",
+ "community-testing/os/x86_64")
+
+ def testBadInput(self):
+ for i in self.bad:
+ self.assertRaises(NonValidFile,pkginfo_from_filename,i)
+
+class pkginfoFromRsyncOutput(unittest.TestCase):
+ example_package_list=(Package(),Package(),Package())
+ example_package_list[0].package_info={ "name" : "alex",
+ "version" : "2.3.4",
+ "release" : "1",
+ "arch" : "i686",
+ "license" : False,
+ "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz",
+ "depends" : False,}
+ example_package_list[1].package_info={ "name" : "any2dvd",
+ "version" : "0.34",
+ "release" : "4",
+ "arch" : "any",
+ "license" : False,
+ "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz",
+ "depends" : False,}
+ example_package_list[2].package_info={ "name" : "gmime22",
+ "version" : "2.2.26",
+ "release" : "1",
+ "arch" : "x86_64",
+ "license" : False,
+ "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz",
+ "depends" : False,}
+
+ try:
+ output_file = open("rsync_output_sample")
+ rsync_out= output_file.read()
+ output_file.close()
+ except IOError: print("There is no rsync_output_sample file")
+
+ pkglist = pkginfo_from_rsync_output(rsync_out)
+
+ def testOutputArePackages(self):
+ if not self.pkglist:
+ self.fail("not pkglist:" + str(self.pkglist))
+ for pkg in self.pkglist:
+ self.assertIsInstance(pkg,Package)
+
+ def testPackageInfo(self):
+ if not self.pkglist:
+ self.fail("Pkglist doesn't exist: " + str(self.pkglist))
+ self.assertEqual(self.pkglist,self.example_package_list)
+
+class generateRsyncBlacklist(unittest.TestCase):
+ example_package_list=(Package(),Package(),Package())
+ example_package_list[0].package_info={ "name" : "alex",
+ "version" : "2.3.4",
+ "release" : "1",
+ "arch" : "i686",
+ "license" : False,
+ "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz",
+ "depends" : False,}
+ example_package_list[1].package_info={ "name" : "any2dvd",
+ "version" : "0.34",
+ "release" : "4",
+ "arch" : "any",
+ "license" : False,
+ "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz",
+ "depends" : False,}
+ example_package_list[2].package_info={ "name" : "gmime22",
+ "version" : "2.2.26",
+ "release" : "1",
+ "arch" : "x86_64",
+ "license" : False,
+ "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz",
+ "depends" : False,}
+
+ def testListado(self):
+ self.assertEqual(listado("blacklist_sample"),["alex","gmime22"])
+
+ def testExcludeFiles(self):
+ a=generate_exclude_list_from_blacklist(self.example_package_list,listado("blacklist_sample"),debug=True)
+ b=[self.example_package_list[0]["location"],self.example_package_list[2]["location"]]
+ self.assertEqual(a,b)
+
+class pkginfo_from_descKnownValues(unittest.TestCase):
+ pkgsample=Package()
+ pkgsample.package_info={"name" : "binutils",
+ "version" : "2.21",
+ "release" : "4",
+ "arch" : "x86_64",
+ "license" : "GPL",
+ "location": "binutils-2.21-4-x86_64.pkg.tar.xz",
+ "depends" : False,}
+ pkggen=pkginfo_from_desc("desc")
+ def testPkginfoFromDesc(self):
+ if self.pkggen is None:
+ self.fail("return value is None")
+ self.assertEqual(self.pkgsample,self.pkggen)
+
+class pkginfo_from_db(unittest.TestCase):
+ archdb = os.path.join("./workdir")
+ example_package_list=(Package(),Package(),Package())
+ example_package_list[0].package_info={ "name" : "acl",
+ "version" : "2.2.49",
+ "release" : "2",
+ "arch" : "x86_64",
+ "license" : ("LGPL",),
+ "location": "acl-2.2.49-2-x86_64.pkg.tar.xz"
+ "depends" : ("attr>=2.4.41"),}
+ example_package_list[1].package_info={ "name" : "glibc",
+ "version" : "2.13",
+ "release" : "4",
+ "arch" : "x86_64",
+ "license" : ("GPL","LGPL"),
+ "location": "glibc-2.13-4-x86_64.pkg.tar.xz"
+ "depends" : ("linux-api-headers>=2.6.37","tzdata",),}
+ example_package_list[2].package_info={ "name" : "",
+ "version" : "2.2.26",
+ "release" : "1",
+ "arch" : "x86_64",
+ "license" : False,
+ "location": ""
+ "depends" : False,}
+
+
+if __name__ == "__main__":
+ unittest.main()
+