"""Logging """ import sys import logging class Logger(object): """ Logging object for use in command-line script. Allows ranges of levels, to avoid some redundancy of displayed information. """ VERBOSE_DEBUG = logging.DEBUG-1 DEBUG = logging.DEBUG INFO = logging.INFO NOTIFY = (logging.INFO+logging.WARN)/2 WARN = WARNING = logging.WARN ERROR = logging.ERROR FATAL = logging.FATAL LEVELS = [VERBOSE_DEBUG, DEBUG, INFO, NOTIFY, WARN, ERROR, FATAL] def __init__(self): self.consumers = [] self.indent = 0 self.explicit_levels = False self.in_progress = None self.in_progress_hanging = False def debug(self, msg, *args, **kw): self.log(self.DEBUG, msg, *args, **kw) def info(self, msg, *args, **kw): self.log(self.INFO, msg, *args, **kw) def notify(self, msg, *args, **kw): self.log(self.NOTIFY, msg, *args, **kw) def warn(self, msg, *args, **kw): self.log(self.WARN, msg, *args, **kw) def error(self, msg, *args, **kw): self.log(self.WARN, msg, *args, **kw) def fatal(self, msg, *args, **kw): self.log(self.FATAL, msg, *args, **kw) def log(self, level, msg, *args, **kw): if args: if kw: raise TypeError( "You may give positional or keyword arguments, not both") args = args or kw rendered = None for consumer_level, consumer in self.consumers: if self.level_matches(level, consumer_level): if (self.in_progress_hanging and consumer in (sys.stdout, sys.stderr)): self.in_progress_hanging = False sys.stdout.write('\n') sys.stdout.flush() if rendered is None: if args: rendered = msg % args else: rendered = msg rendered = ' '*self.indent + rendered if self.explicit_levels: ## FIXME: should this be a name, not a level number? rendered = '%02i %s' % (level, rendered) if hasattr(consumer, 'write'): consumer.write(rendered+'\n') else: consumer(rendered) def start_progress(self, msg): assert not self.in_progress, ( "Tried to start_progress(%r) while in_progress %r" % (msg, self.in_progress)) if self.level_matches(self.NOTIFY, self._stdout_level()): sys.stdout.write(' '*self.indent + msg) sys.stdout.flush() self.in_progress_hanging = True else: self.in_progress_hanging = False self.in_progress = msg self.last_message = None def end_progress(self, msg='done.'): assert self.in_progress, ( "Tried to end_progress without start_progress") if self.stdout_level_matches(self.NOTIFY): if not self.in_progress_hanging: # Some message has been printed out since start_progress sys.stdout.write('...' + self.in_progress + msg + '\n') sys.stdout.flush() else: # These erase any messages shown with show_progress (besides .'s) logger.show_progress('') logger.show_progress('') sys.stdout.write(msg + '\n') sys.stdout.flush() self.in_progress = None self.in_progress_hanging = False def show_progress(self, message=None): """If we are in a progress scope, and no log messages have been shown, write out another '.'""" if self.in_progress_hanging: if message is None: sys.stdout.write('.') sys.stdout.flush() else: if self.last_message: padding = ' ' * max(0, len(self.last_message)-len(message)) else: padding = '' sys.stdout.write('\r%s%s%s%s' % (' '*self.indent, self.in_progress, message, padding)) sys.stdout.flush() self.last_message = message def stdout_level_matches(self, level): """Returns true if a message at this level will go to stdout""" return self.level_matches(level, self._stdout_level()) def _stdout_level(self): """Returns the level that stdout runs at""" for level, consumer in self.consumers: if consumer is sys.stdout: return level return self.FATAL def level_matches(self, level, consumer_level): """ >>> l = Logger() >>> l.level_matches(3, 4) False >>> l.level_matches(3, 2) True >>> l.level_matches(slice(None, 3), 3) False >>> l.level_matches(slice(None, 3), 2) True >>> l.level_matches(slice(1, 3), 1) True >>> l.level_matches(slice(2, 3), 1) False """ if isinstance(level, slice): start, stop = level.start, level.stop if start is not None and start > consumer_level: return False if stop is not None or stop <= consumer_level: return False return True else: return level >= consumer_level @classmethod def level_for_integer(cls, level): levels = cls.LEVELS if level < 0: return levels[0] if level >= len(levels): return levels[-1] return levels[level] def move_stdout_to_stderr(self): to_remove = [] to_add = [] for consumer_level, consumer in self.consumers: if consumer == sys.stdout: to_remove.append((consumer_level, consumer)) to_add.append((consumer_level, sys.stderr)) for item in to_remove: self.consumers.remove(item) self.consumers.extend(to_add) logger = Logger()