diff options
author | Aurélien DESBRIÈRES <aurelien@hackers.camp> | 2016-11-06 18:05:41 +0100 |
---|---|---|
committer | Aurélien DESBRIÈRES <aurelien@hackers.camp> | 2016-11-06 18:05:41 +0100 |
commit | ed1fc59f0daa7d95960a7be2f828c866894cfd29 (patch) | |
tree | bb87ef8c02c10a2ca560fe9decc94f24e9399567 /pcr/paraboley-git/paraboley | |
parent | 4cdbb3c27f7e8818ea31d402ef31c6a34a4fef78 (diff) |
moved paraboley to paraboley-gti
Diffstat (limited to 'pcr/paraboley-git/paraboley')
-rwxr-xr-x | pcr/paraboley-git/paraboley | 865 |
1 files changed, 865 insertions, 0 deletions
diff --git a/pcr/paraboley-git/paraboley b/pcr/paraboley-git/paraboley new file mode 100755 index 000000000..3f7667faf --- /dev/null +++ b/pcr/paraboley-git/paraboley @@ -0,0 +1,865 @@ +#!/usr/bin/env python +# +# Paraboley is a forked made by Aurélien DESBIRÈRES <aurelien@hackers.camp> +# of archey3 to present the Parabola GNU / Linux-libre logo and informations +# +# paraboley [version 0.2] +# +# Distributed under the terms of the GNU General Public License v3. +# See http://www.gnu.org/licenses/gpl.txt for the full license text. +# +# Simple python script to display an Parabola logo in ASCII art +# Along with basic system information. + +# Import libraries + +import collections +import subprocess, optparse, re, sys, configparser +from subprocess import Popen, PIPE +from optparse import OptionParser +from getpass import getuser +from time import ctime, sleep +from os import getenv +from datetime import datetime +import re +import os.path +import multiprocessing + +try: + from logbook import Logger, lookup_level +except ImportError: + class Logger(object): + def __init__(self, name, level=0): + self.name = name + self.level = level + debug = info = warn = warning = notice = error = exception = \ + critical = log = lambda *a, **kw: None + + def lookup_level(_): + return 0 + +UNAME_FLAG_MEANINGS = { + 'a': 'System Infomation', + 's': 'Kernel Name', + 'n': 'Hostname', + 'r': 'Kernel Release', + 'v': 'Kernel Version', + 'm': 'Machine Hardware name', + 'p': 'Processor Type', + 'i': 'Hardware Platform', +} + +LOGOS = {'Parabola': '''{c1} +{c1} ## ### {results[0]} +{c1} ## ## ##### {results[1]} +{c1} ## ## ## ####### {results[2]} +{c1} # ## ## ## ######## {results[3]} +{c1} ### # ######### {results[4]} +{c1} ### ######### {results[5]} +{c1} ## ######## {results[6]} +{c1} ####### {results[7]} +{c1} ###### {results[8]} +{c1} ###### {results[9]} +{c1} ##### {results[10]} +{c1} ##### {results[11]} +{c1} #### {results[12]} +{c1} #### {results[13]} +{c1} ### {results[14]} +{c1} ### {results[15]} +{c1} ## {results[16]} +{c1} # {results[17]} +\x1b[0m''' +} + +CLASS_MAPPINGS = {} + +def module_register(name): + """ + Registers the class in the CLASS_MAPPING global. + """ + def decorator(cls): + CLASS_MAPPINGS[name] = cls + return cls + return decorator + +DE_DICT = collections.OrderedDict([ + ('cinnamon', 'Cinnamon'), + ('gnome-session', 'GNOME'), + ('ksmserver', 'KDE'), + ('mate-session', 'MATE'), + ('xfce4-session', 'Xfce'), + ('lxsession', 'LXDE'), + ('', 'None'), + ]) + +WM_DICT = collections.OrderedDict([ + ('awesome', 'Awesome'), + ('beryl', 'Beryl'), + ('blackbox', 'Blackbox'), + ('dwm', 'DWM'), + ('enlightenment', 'Enlightenment'), + ('fluxbox', 'Fluxbox'), + ('fvwm', 'FVWM'), + ('herbstluftwm', 'herbstluftwm'), + ('i3', 'i3'), + ('icewm', 'IceWM'), + (re.compile('kwin(_x11|_wayland)?'), 'KWin'), + ('metacity', 'Metacity'), + ('musca', 'Musca'), + ('openbox', 'Openbox'), + ('pekwm', 'PekWM'), + ('ratpoison', 'ratpoison'), + ('scrotwm', 'ScrotWM'), + ('subtle', 'subtle'), + ('monsterwm', 'MonsterWM'), + ('wmaker', 'Window Maker'), + ('wmfs', 'Wmfs'), + ('wmii', 'wmii'), + ('xfwm4', 'Xfwm'), + ('emerald', 'Emerald'), + ('compiz', 'Compiz'), + (re.compile('xmonad-*'), 'xmonad'), + ('qtile', 'QTile'), + ('wingo', 'Wingo'), + ('', 'None'), + ]) + +COLORS = { + 'black': '0', + 'red': '1', + 'green': '2', + 'yellow': '3', + 'blue': '4', + 'magenta': '5', + 'cyan': '6', + 'white': '7' +} + +class ArgumentError(Exception): + def __init__(self, caller, message): + msg = "{0}: {1}".format(caller.__class__.__name__, message) + super().__init__(msg) + +# State must be serializable +State = collections.namedtuple("State", "color config logger") + +class display(object): + command_line = '' + stdindata = '' + + def __init__(self, state, args=()): + self.state = state + # Python3 unpacking is awesome + self.arg1, self.arg2, self.arg3, *_ = tuple(args) + ('', '', '') + + @staticmethod + def call_command(command): + """ + Calls a command, waits for it to exit and returns all text from stdout. + Discards all other information. + """ + proc = Popen(command.split(), stdout=PIPE) + proc.wait() + return proc.communicate()[0].decode() + + def run_command(self): + if self.command_line: + if '{arg3}' in self.command_line: + cmd = self.command_line.format(arg1=self.arg1, arg2=self.arg2, + arg3=self.arg3) + elif '{arg2}' in self.command_line: + cmd = self.command_line.format(arg1=self.arg1, arg2=self.arg2) + elif '{arg1}' in self.command_line: + cmd = self.command_line.format(arg1=self.arg1) + else: + cmd = self.command_line + + try: + self.process = Popen(cmd.split(), stdin=PIPE, stdout=PIPE, + stderr=PIPE) + except Exception as e: + self.state.logger.error("Could not run command {0}".format(cmd)) + + def render(self): + (stdoutdata, stderrdata) = self.process.communicate(self.stdindata + or None) + + return self.format_output(stdoutdata.decode()) + + def color_me(self, output, number=None, low=30, low_color='green', + medium=60, medium_color='yellow', high_color='red'): + if number is None and output.isdigit(): + number = int(output) + elif number is None: + return output + + if number <= low: + color_= low_color + elif low < number <= medium: + color_ = medium_color + elif medium < number: + color_ = high_color + + return '{0}{1}{2}'.format(color(self.state, color_), output, + color(self.state, 'clear')) + + regex_class = re.compile("").__class__ + def process_exists(self, key): + global PROCESSES + if isinstance(key, self.regex_class): + for proc in PROCESSES._processes: + if key.search(proc): + return True + return PROCESSES(key) + + +@module_register("fs") +class fsDisplay(display): + command_line = "df -TPh {arg1}" + + conversions = { + 'binary': { + 'K': 2 ** 10, + 'M': 2 ** 20, + 'G': 2 ** 30, + 'T': 2 ** 40, + }, + 'si': { + 'K': 10 ** 3, + 'M': 10 ** 6, + 'G': 10 ** 9, + 'T': 10 ** 12, + }, + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + if not self.arg1: + msg = "Did not any arguments, require one, the fs to display" + self.state.logger.error(msg) + raise ArgumentError(self, msg) + + def format_output(self, instring): + try: + decimal_point = self.call_command( + 'locale -ck decimal_point').split('\n')[1].split('=')[1] + except Exception as e: + self.state.logger.warning('Could not determine locale decimal point,' + + 'defaulting to \'.\', failed with error {0}'.format(e)) + decimal_point = '.' + values = [line for line in instring.split('\n') if line][1].split() + used = values[3].replace(decimal_point, '.') + total = values[2].replace(decimal_point, '.') + fstype = values[1] + conversion_type = self.state.config.get('fs', 'unit', fallback="si").lower() + conversions = self.conversions[conversion_type] + + mount = '/root' if self.arg1 == '/' else self.arg1 + title = mount.split('/')[-1].title() + + low = self.state.config.getint('fs', 'low_bound', fallback=40) + medium = self.state.config.getint('fs', 'medium_bound', fallback=70) + + try: + #convert to straight float + used_ = float(used[:-1]) * conversions[used[-1].upper()] + total_ = float(total[:-1]) * conversions[total[-1].upper()] + persentage = used_ / total_ * 100 + except Exception as e: + self.state.logger.error( + "Could not colorize output, errored with {0}".format(e)) + return + else: + used = self.color_me(used, persentage, low=low, medium=medium) + + if self.state.config.getboolean("fs", "persentage", fallback=True): + part = '{used} / {total} ({persentage}%) ({fstype})'.format( + used=used, total=total, persentage=int(persentage), + fstype=fstype) + else: + part = '{used} / {total} ({fstype})'.format( + used=used, total=total, fstype=fstype) + return title, part + +@module_register("ram") +class ramDisplay(display): + command_line = "free -m" + + def format_output(self, instring): + ram = ''.join(line for line in str(instring).split('\n') if\ + line.startswith('Mem:')).split() + used = int(ram[2]) + total = int(ram[1]) + title = 'RAM' + try: + persentage = (used / total * 100) + except: + used += ' MB' + else: + used = self.color_me(number=persentage, output=str(used) + ' MB') + part = '{used} / {total} MB'.format(used=used, total=total) + return title, part + +@module_register("sensor") +class sensorDisplay(display): + command_line = "sensors {arg1}" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + arg_from_conf = self.state.config.get('sensor', 'sensor', + fallback='coretemp-*') + try: + arg_from_arg = kwargs["args"][0] + except IndexError: + self.state.logger.error( + "Did not get any arguments, require one, the sensor to display.") + raise + + if arg_from_arg: + self.arg1 = arg_from_arg + else: + self.arg1 = arg_from_conf + + def format_output(self, instring): + tempinfo = instring.split('\n')[2::4] + + out = [] + for line in tempinfo: + info = [re.sub("\s\s+", "", line) for line in line.split(' ') if\ + line] + value = info[1] + intvalue = int(value[:3]) + if intvalue > 45: + temp = (color(self.state, "red") + info[1] + + color(self.state, "clear")) + elif intvalue in range(30, 45): + temp = (color(self.state, "magenta") + info[1] + + color(self.state, "clear")) + else: + temp = (color(self.state, "green") + info[1] + + color(self.state, "clear")) + out.append((info[0], temp)) + return out + +@module_register("env") +class envDisplay(display): + def __init__(self, **kwargs): + try: + self.arg1 = kwargs["args"][0] + except IndexError: + self.state.logger.error("Did not get any arguments, require one," + + " the env variable to display.") + raise + + super().__init__(**kwargs) + + def render(self): + argvalue = getenv(self.arg1.upper()) + return ('$' + self.arg1.upper(), argvalue) + +@module_register("uname") +class unameDisplay(display): + command_line = "uname {arg1}" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + try: + flag = kwargs["args"][0] + except IndexError: + self.state.logger.error("Did not get any arguments, require one," + + " the flag to pass to uname") + raise + + arg_from_conf = self.state.config.get('uname', 'argument', fallback="") + arg_from_arg = flag + if arg_from_arg: + self.arg1 = '-' + arg_from_arg + elif arg_from_conf: + self.arg1 = '-' + arg_from_conf + else: + self.arg1 = '' + + def format_output(self, instring): + return (UNAME_FLAG_MEANINGS[self.arg1[1]], instring) + +@module_register("cpu") +class cpuDisplay(display): + command_line = "cat /proc/cpuinfo" + + def format_output(self, instring): + kv = [line.split(":") for line in instring.split("\n") if line] + infodict = {} + for k, v in kv: + infodict[k.strip()] = v.strip() + return "Processor Type", infodict["model name"] + +@module_register("uptime") +class uptimeDisplay(display): + def render(self): + with open("/proc/uptime") as upfile: + raw = upfile.read() + fuptime = int(raw.split('.')[0]) + + day = int(fuptime / 86400) + fuptime = fuptime % 86400 + hour = int(fuptime / 3600) + fuptime = fuptime % 3600 + minute = int(fuptime / 60) + uptime = '{daystring}{hours}:{mins:02d}'.format( + daystring='{days} day{s}, '.format(days=day, s=('s' if day > 1 + else '')) if day else '', + hours = hour, mins = minute + ) + return "Uptime", uptime + +@module_register("packages") +class packageDisplay(display): + command_line = "pacman -Q" + + def format_output(self, instring): + return "Packages", len(instring.rstrip('\n').split('\n')) + +@module_register("distro") +class distroCheck(display): + def render(self): + try: + _ = open("/etc/pacman.conf") + except IOError: + distro = self.call_command("uname -o") + else: + distro = "Parabola" + distro = '{0} {1}'.format(distro, self.call_command("uname -m")) + return "OS", distro + +@module_register("process") +class processCheck(display): + command_line = "ps -u " + getuser() + + render = lambda self: self + + def run_command(self): + super().run_command() + out = str(self.process.communicate()[0]) + + self._processes = set() + for line in out.split("\\n"): + words = line.split() + if len(words) <= 3: + continue + + self._processes.add(words[3]) + + def __call__(self, proc): + if proc in self._processes: + return True + return False + +@module_register("wm") +class wmDisplay(display): + def render(self): + if self.state.config.get('wm', 'manual', fallback=False): + return "WM", self.state.config.get('wm', 'manual') + wm = '' + for key in WM_DICT.keys(): + if self.process_exists(key): + wm = key + break + return "WM", WM_DICT[wm] + +@module_register("de") +class deDisplay(display): + def render(self): + if self.state.config.get('de', 'manual', fallback=False): + return "DE", self.state.config.get('de', 'manual') + de = '' + for key in DE_DICT.keys(): + if self.process_exists(key): + de = key + break + return "DE", DE_DICT[de] + +@module_register("mpd") +class mpdDisplay(display): + """ + Displays certain stat about MPD database. If mpd not installed, output + nothing. + """ + command_line = "mpc stats --host {arg1} --port {arg2}" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + try: + self.stat = kwargs["args"][0] + except IndexError: + self.state.logger.error("Did not get any arguments, require one," + + " the stat to display.") + self.arg1 = self.state.config.get('mpd', 'host', fallback='localhost') + self.arg2 = self.state.config.getint('mpd', 'port', fallback=6600) + + def format_output(self, instring): + lines = instring.split('\n') + stats = {} + try: + stats['artists'] = lines[0].split(':')[1].strip() + stats['albums'] = lines[1].split(':')[1].strip() + stats['songs'] = lines[2].split(':')[1].strip() + #if people don't have mpc installed then return None) + except: + self.state.logger.error( + "Could not parse mpc output, is mpc installed?") + return + + return ('{statname} in MPD database'.format(statname=self.stat.title()), + stats[self.stat]) + +@module_register("system_upgrade") +class systemUpgrade(display): + + _upgrade_message = 'starting full system upgrade' + + def render(self): + try: + datestr = None + for line in reversed(list(open('/var/log/pacman.log'))): + if line.rstrip().endswith(self._upgrade_message): + datestart = line.find('[') + dateend = line.find(']') + if datestart != -1 and dateend != -1: + datestr = line[datestart + 1 : dateend] + break + except Exception as err: + print(err) + + if not datestr: + datestr = 'Unknown' + else: + currenttime = datetime.today() + updatetime = datetime.strptime(datestr, '%Y-%m-%d %H:%M') + numdays = (currenttime - updatetime).days + datestr = '{0} ({1} days ago)'.format(datestr, numdays) + + return "Last Upgrade", datestr + +#------------ Config ----------- + +class ArcheyConfigParser(configparser.SafeConfigParser): + """ + A parser for the archey config file. + """ + + defaults = {'core': {'align': 'top', + 'color': 'blue', + 'display_modules': + """\ +distro(), uname(n), uname(r), uptime(), wm(), de(), packages(), ram(),\ + cpu(), env(editor), fs(/), mpd(albums)""" + }, + } + + def read(self, file_location=None): + """ + Loads the config options stored in at file_location. If file_location + does not exist, it will attempt to load from the default config location + ($XDG_CONFIG_HOME/archey3.cfg). If that does not exist, it will write a + default config file to $XDG_CONFIG_HOME/archey3.cfg. + """ + + if file_location is None and "XDG_CONFIG_HOME" not in os.environ: + config_location = os.path.expanduser("~/.archey3.cfg") + elif file_location is None: + config_location = os.path.expandvars("$XDG_CONFIG_HOME/archey3.cfg") + else: + config_location = \ + os.path.expandvars(os.path.expanduser(file_location)) + + loaded = super(ArcheyConfigParser, self).read(config_location) + + if file_location == None and not loaded: + self.load_default_config() + self.write_config(config_location) + return [config_location] + if not loaded: + #Try with default + loaded = super(ArcheyConfigParser, self).read() + return loaded + + def load_default_config(self): + """ + Loads the config options stored at self.defaults. + """ + for section, values in self.defaults.items(): + if not self.has_section(section): + self.add_section(section) + + for option, value in values.items(): + #strip any excess spaces + value = re.sub("( +)", " ", value) + self.set(section, option, value) + + def write_config(self, location): + """ + Writes the current config to the given location. + """ + with open(location, 'w') as configfile: + self.write(configfile) + + +#------------ Functions ----------- + +def screenshot(state): + print('Screenshotting in') + screenshot_time = state.config.getint("core", "screenshotwait", fallback=5) + for x in sorted(range(1, screenshot_time + 1), reverse=True): + print('%s' % x, end='') + sys.stdout.flush() + sleep(1.0/3) + for x in range(3): + print('.', end='') + sys.stdout.flush() + sleep(1.0/3) + + print('Say Cheese!') + sys.stdout.flush() + + screenshot_command = state.config.get('core', 'screenshot_command', + fallback="import -window root <datetime>.jpg") + try: + subprocess.check_call( + screenshot_command.replace('<datetime>', + ctime().replace(' ','_')).split(" ")) + except subprocess.CalledProcessError as e: + state.logger.critical('Screenshot failed with return code {0}.'.format( + e.returncode)) + raise + except subprocess.FileNotFoundError: + print("Could not find import command, install imagemagick") + +def color(state, code, bold=False): + """ + Returns a character color sequence acording to the code given, and the + color theme in the state argument. + """ + if code == 2: + bold = True + first_bitty_bit = '\x1b[{0};'.format(int(not bold)) + if code in range(3): + second_bitty_bit = '3{0}m'.format(state.color) + elif code == "clear": + return '\x1b[0m' + else: + second_bitty_bit = '3{0}m'.format(COLORS[code]) + + return first_bitty_bit + second_bitty_bit + +def _mp_render_helper(container): + """ + A little helper to get round the one iterator argument with + multiprocessing.Pool.map. + """ + state = container["state"] + cls_name = container["cls_name"] + args = container["args"] + cls = CLASS_MAPPINGS[cls_name] + return render_class(state, cls, args) + +def render_class(state, cls, args): + """ + Returns the result of the run_command method for the class passed. + """ + try: + instance = cls(args=args, state=State( + logger=Logger(cls.__name__, state.logger.level), + color=state.color, + config=state.config)) + + except Exception as e: + state.logger.error( + "Could not instantiate {0}, failed with error {1}".format( + cls.__name__, e)) + return + try: + instance.run_command() + return instance.render() + except Exception as e: + state.logger.error( + "Could not render line for {0}, failed with error {1}".format( + cls.__name__, e)) + +#------------ Display object --------- + +class Archey(object): + DISPLAY_PARSING_REGEX = "(?P<func>\w+)\((|(?P<args>[\w, /]+))\)" + + def __init__(self, config, options): + log_level = lookup_level(options.log_level) + logger = Logger("Core", log_level) + + self.display = config.get("core", "display_modules") + colorscheme = options.color or config.get( + "core", "color", fallback="blue") + for key in COLORS.keys(): + if key == colorscheme: + colorcode = COLORS[key] + + self.state = State(colorcode, config, logger) + + global PROCESSES + PROCESSES = render_class(self.state, processCheck, ()) + + distro_out = render_class(self.state, distroCheck, ()) + + if not distro_out: + self.state.logger.critical( + "Unrecognised distribution.") + raise RuntimeException("Unrecognised distribution.") + + self.distro_name = ' '.join(distro_out[1].split()[:-1]) + + def run(self, screenshot_=False): + """ + Actually print the logo etc, and take a screenshot if required. + """ + print(self.render()) + + if screenshot_: + screenshot(self.state) + + def render(self): + results = self.prepare_results() + results = self.arrange_results(results) + + return LOGOS[self.distro_name].format(c1=color(self.state, 1), + results = results + ) + + def prepare_results(self): + """ + Renders all classes found in the display array, and then returns them + as a list. The returned list will be exactly 18 items long, with any + left over spaces being filled with empty strings. + """ + poolsize = self.state.config.getint("core", "poolsize", fallback=5) + + pool = multiprocessing.Pool(poolsize) + + arguments = [] + for cls_name, args in self.parse_display(): + arguments.append({ + 'cls_name': cls_name, + 'args': args, + 'state': self.state + }) + raw_out = pool.map(_mp_render_helper, arguments) + outputs = list(map(self.format_item, + filter(bool, raw_out))) + + + return outputs + [""] * (18 - len(outputs)) + + def arrange_results(self, results): + """ + Arranges the results as specified in the config file. + """ + arrangement = self.state.config.get("core", "align", fallback="top") + if arrangement == "top": + return results + elif arrangement == "bottom": + actuall_res = [res for res in results if res] + return [""] * (len(results) - len(actuall_res)) + actuall_res + elif arrangement == "center": + actuall_res = [res for res in results if res] + offset = [""] * int((len(results) - len(actuall_res)) / 2) + return (offset + actuall_res + + [""] * (len(results) - len(actuall_res))) + else: + return results + + def parse_display(self): + """ + Iterates over the display attribute of the Archey class, and tries to + parse them using the DISPLAY_PARSING_REGEX. + """ + for func in self.display.split(","): + func = func.strip() + + info = re.match(self.DISPLAY_PARSING_REGEX, func) + if not info: + self.state.logger.error( + "Could not parse display string {0}".format(func)) + continue + + groups = info.groupdict() + if groups["args"]: + args = [arg.strip() for arg in groups["args"].split(",")] + else: + args = () + + yield groups["func"], args + raise StopIteration + + def format_item(self, item): + title = item[0].rstrip(':') + data = str(item[1]).rstrip() + + #if we're dealing with a fraction + if len(data.split('/')) == 2: + numerator = data.split('/')[0] + numerator = (color(self.state, 1, bold=True) + numerator + + color(self.state, 'clear')) + denominator = data.split('/')[1] + data = '/'.join((numerator, denominator)) + + return "{color}{title}:{clear} {data}".format( + color=color(self.state, 1), + title=title, + data=data, + clear=color(self.state, "clear") + ) + +def main(): + parser = OptionParser( + usage='%prog', + description="""%prog is a utility to display system info and take\ + screenshots""", + version="%prog 0.3") + parser.add_option('-c', '--color', + action='store', type='choice', dest='color', + choices=('black', + 'red', + 'green', + 'yellow', + 'blue', + 'magenta', + 'cyan', + 'white'), + help="""choose a color: black, red, green, yellow, blue, magenta,\ + cyan, white [Default: blue]""") + parser.add_option('-s', '--screenshot', + action='store_true', dest='screenshot', help='Take a screenshot') + parser.add_option('--config', + action='store', dest='config', default=None, + help="Set the location of the config file to load.") + parser.add_option('--debug', + action='store', type='choice', dest='log_level', + choices=('NOTSET', + 'DEBUG', + 'INFO', + 'WARNING', + 'ERROR', + 'CRITICAL'), + default='CRITICAL', + help="The level of errors you wish to display. Choices are\ + NOTSET, DEBUG, INFO, WARNING, ERROR, and CRITICAL. CRITICAL is the default.") + (options, args) = parser.parse_args() + + config = ArcheyConfigParser() + config.read(options.config) + + archey = Archey(config=config, options=options) + archey.run(options.screenshot) + +if __name__ == "__main__": + main() |