From 2a2df0074e39a797a0a4b5f7db7cfc9097301328 Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Tue, 15 Nov 2011 21:28:42 -0600 Subject: Add new reporead_inotify management command This is the new on-the-fly updates hotness. Rather than continue to schedule reporead to run once an hour in cron or however else you ran it, this command can be run once and left running, and will automagically pick up on any database file changes and run an import. It operates on the files databases only; this will keep both the packages and files always in sync and remove the delay in updating, especially helpful for new testing packages. Signed-off-by: Dan McGee --- devel/management/commands/reporead_inotify.py | 188 ++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100755 devel/management/commands/reporead_inotify.py (limited to 'devel/management/commands/reporead_inotify.py') diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py new file mode 100755 index 00000000..135c0367 --- /dev/null +++ b/devel/management/commands/reporead_inotify.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +""" +reporead_inotify command + +Watches repo.files.tar.gz files for updates and parses them after a short delay +in order to catch all updates in a single bulk update. + +Usage: ./manage.py reporead_inotify [path_template] + +Where 'path_template' is an optional path_template for finding the +repo.files.tar.gz files. The form is '/srv/ftp/%(repo)s/os/%(arch)s/', which is +also the default template if none is specified. While 'repo' is not required to +be present in the path_template, note that 'arch' is so reporead can function +correctly. +""" + +import logging +import os.path +import pyinotify +import sys +import threading +import time + +from django.core.management.base import BaseCommand, CommandError + +from main.models import Arch, Repo +from .reporead import read_repo + +logging.basicConfig( + level=logging.WARNING, + format='%(asctime)s -> %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stderr) +logger = logging.getLogger() + +class Command(BaseCommand): + help = "Watch database files and run an update when necessary." + args = "[path_template]" + + def handle(self, path_template=None, **options): + v = int(options.get('verbosity', 0)) + if v == 0: + logger.level = logging.ERROR + elif v == 1: + logger.level = logging.INFO + elif v == 2: + logger.level = logging.DEBUG + + if not path_template: + path_template = '/srv/ftp/%(repo)s/os/%(arch)s/' + self.path_template = path_template + + notifier = self.setup_notifier() + logger.info('Entering notifier loop') + notifier.loop() + + def setup_notifier(self): + '''Set up and configure the inotify machinery and logic. + This takes the provided or default path_template and builds a list of + directories we need to watch for database updates. It then validates + and passes these on to the various pyinotify pieces as necessary and + finally builds and returns a notifier object.''' + arches = Arch.objects.filter(agnostic=False) + repos = Repo.objects.all() + arch_path_map = dict((arch, None) for arch in arches) + all_paths = set() + total_paths = 0 + for arch in arches: + combos = ({ 'repo': repo.name.lower(), 'arch': arch.name } + for repo in repos) + # take a python format string and generate all unique combinations + # of directories from it; using set() ensures we filter it down + paths = set(self.path_template % values for values in combos) + total_paths += len(paths) + all_paths |= paths + arch_path_map[arch] = paths + + logger.info('Watching %d total paths', total_paths) + logger.debug(all_paths) + + # sanity check- basically ensure every path we created from the + # template mapped to only one architecture + if total_paths != len(all_paths): + raise CommandError('path template did not uniquely ' + 'determine architecture for each file') + + # A proper atomic replacement of the database as done by rsync is type + # IN_MOVED_TO. repo-add/remove will finish with a IN_CLOSE_WRITE. + mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO + + manager = pyinotify.WatchManager() + for name in all_paths: + manager.add_watch(name, mask) + + handler = EventHandler(arch_paths=arch_path_map) + return pyinotify.Notifier(manager, handler) + + +class Database(object): + '''A object representing a pacman database on the filesystem. It stores + various bits of metadata and state representing the file path, when we last + updated, how long our delay is before performing the update, whether we are + updating now, etc.''' + def __init__(self, arch, path, delay=60.0): + self.arch = arch + self.path = path + self.delay = delay + self.mtime = None + self.last_import = None + self.update_thread = None + self.updating = False + self.run_again = False + self.lock = threading.Lock() + + def _start_update_countdown(self): + self.update_thread = threading.Timer(self.delay, self.update) + logger.info('Starting %.1f second countdown to update %s', + self.delay, self.path) + self.update_thread.start() + + def queue_for_update(self, mtime): + logger.debug('Queueing database %s...', self.path) + with self.lock: + self.mtime = mtime + if self.updating: + # store the fact that we will need to run it again + self.run_again = True + return + if self.update_thread: + self.update_thread.cancel() + self._start_update_countdown() + + def update(self): + logger.debug('Updating database %s...', self.path) + with self.lock: + self.last_import = time.time() + self.updating = True + + try: + # invoke reporead's primary method + read_repo(self.arch, self.path, {}) + finally: + logger.debug('Done updating database %s.', self.path) + with self.lock: + self.update_thread = None + self.updating = False + if self.run_again: + self.run_again = False + self._start_update_countdown() + + +class EventHandler(pyinotify.ProcessEvent): + '''Our main event handler which listens for database change events. Because + we are watching the whole directory, we filter down and only look at those + events dealing with files databases.''' + + def my_init(self, **kwargs): + self.databases = {} + self.arch_lookup = {} + + # we really want a single path to arch mapping, so massage the data + arch_paths = kwargs['arch_paths'] + for arch, paths in arch_paths.items(): + self.arch_lookup.update((path.rstrip('/'), arch) for path in paths) + + def process_default(self, event): + '''Primary event processing function which kicks off reporead timer + threads if a files database was updated.''' + if not event.name: + return + # screen to only the files we care about + if event.name.endswith('.files.tar.gz'): + path = event.pathname + stat = os.stat(path) + database = self.databases.get(path, None) + if database is None: + arch = self.arch_lookup.get(event.path, None) + if arch is None: + logger.warning( + 'Could not determine arch for %s, skipping update', + path) + return + database = Database(arch, path) + self.databases[path] = database + database.queue_for_update(stat.st_mtime) + + +# vim: set ts=4 sw=4 et: -- cgit v1.2.3-2-g168b From d3b36e1ce992a8b70f4fe8fe9e8df74e835fb865 Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Tue, 15 Nov 2011 22:03:01 -0600 Subject: reporead_inotify: cancel threads that haven't started yet on shutdown Signed-off-by: Dan McGee --- devel/management/commands/reporead_inotify.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'devel/management/commands/reporead_inotify.py') diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py index 135c0367..4c865ce1 100755 --- a/devel/management/commands/reporead_inotify.py +++ b/devel/management/commands/reporead_inotify.py @@ -54,6 +54,11 @@ class Command(BaseCommand): logger.info('Entering notifier loop') notifier.loop() + logger.info('Cancelling remaining threads...') + for thread in threading.enumerate(): + if hasattr(thread, 'cancel'): + thread.cancel() + def setup_notifier(self): '''Set up and configure the inotify machinery and logic. This takes the provided or default path_template and builds a list of -- cgit v1.2.3-2-g168b From b1e406d73844d5a30344ca8ac855fe850c52bc2f Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Wed, 16 Nov 2011 12:49:17 -0600 Subject: reporead_inotify: spin up read_repo() in separate thread This prevents memory usage from ballooning to absolutely huge values, such as when multiple threads kick off at the same time. The bulk of our memory allocation obviously comes in these threads and not the main threads, so being able to isolate them in processes helps a lot. Signed-off-by: Dan McGee --- devel/management/commands/reporead_inotify.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'devel/management/commands/reporead_inotify.py') diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py index 4c865ce1..ffd49b8f 100755 --- a/devel/management/commands/reporead_inotify.py +++ b/devel/management/commands/reporead_inotify.py @@ -15,6 +15,7 @@ correctly. """ import logging +import multiprocessing import os.path import pyinotify import sys @@ -133,6 +134,7 @@ class Database(object): return if self.update_thread: self.update_thread.cancel() + self.update_thread = None self._start_update_countdown() def update(self): @@ -142,8 +144,13 @@ class Database(object): self.updating = True try: - # invoke reporead's primary method - read_repo(self.arch, self.path, {}) + # invoke reporead's primary method. we do this in a separate + # process for memory conservation purposes; these processes grow + # rather large so it is best to free up the memory ASAP. + process = multiprocessing.Process(target=read_repo, + args=[self.arch, self.path, {}]) + process.start() + process.join() finally: logger.debug('Done updating database %s.', self.path) with self.lock: -- cgit v1.2.3-2-g168b From aa20c798ca8af365b2549591700e932a74d068b8 Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Wed, 16 Nov 2011 13:02:35 -0600 Subject: reporead_inotify: close connection once we are done with it This prevents an otherwise idle connection from sitting around and being totally useless. Signed-off-by: Dan McGee --- devel/management/commands/reporead_inotify.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'devel/management/commands/reporead_inotify.py') diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py index ffd49b8f..acb53a54 100755 --- a/devel/management/commands/reporead_inotify.py +++ b/devel/management/commands/reporead_inotify.py @@ -23,6 +23,7 @@ import threading import time from django.core.management.base import BaseCommand, CommandError +from django.db import connection from main.models import Arch, Repo from .reporead import read_repo @@ -90,6 +91,11 @@ class Command(BaseCommand): raise CommandError('path template did not uniquely ' 'determine architecture for each file') + # this thread is done using the database; all future access is done in + # the spawned read_repo() processes, so close the otherwise completely + # idle connection. + connection.close() + # A proper atomic replacement of the database as done by rsync is type # IN_MOVED_TO. repo-add/remove will finish with a IN_CLOSE_WRITE. mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO -- cgit v1.2.3-2-g168b From 9d2fdbe5bc6a0d9ab2907b377056851fc5eb56c3 Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Thu, 17 Nov 2011 10:17:55 -0600 Subject: reporead_inotify: nice the spawned subprocesses This prevents the reporead job from taking over time from more important processes; this is not a rush task. Signed-off-by: Dan McGee --- devel/management/commands/reporead_inotify.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'devel/management/commands/reporead_inotify.py') diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py index acb53a54..c74762eb 100755 --- a/devel/management/commands/reporead_inotify.py +++ b/devel/management/commands/reporead_inotify.py @@ -16,7 +16,7 @@ correctly. import logging import multiprocessing -import os.path +import os import pyinotify import sys import threading @@ -113,10 +113,11 @@ class Database(object): various bits of metadata and state representing the file path, when we last updated, how long our delay is before performing the update, whether we are updating now, etc.''' - def __init__(self, arch, path, delay=60.0): + def __init__(self, arch, path, delay=60.0, nice=3): self.arch = arch self.path = path self.delay = delay + self.nice = nice self.mtime = None self.last_import = None self.update_thread = None @@ -153,8 +154,12 @@ class Database(object): # invoke reporead's primary method. we do this in a separate # process for memory conservation purposes; these processes grow # rather large so it is best to free up the memory ASAP. - process = multiprocessing.Process(target=read_repo, - args=[self.arch, self.path, {}]) + def run(): + if self.nice != 0: + os.nice(self.nice) + read_repo(self.arch, self.path, {}) + + process = multiprocessing.Process(target=run) process.start() process.join() finally: -- cgit v1.2.3-2-g168b