summaryrefslogtreecommitdiff
path: root/devel/management/commands/reporead_inotify.py
blob: 8c1e47bf8682089ab53a643b97411abbdddd5861 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# -*- coding: utf-8 -*-
"""
reporead_inotify command

Watches repo.files.tar.gz files for updates and parses them after a short delay
in order to catch all updates in a single bulk update.

Usage: ./manage.py reporead_inotify [path_template]

Where 'path_template' is an optional path_template for finding the
repo.files.tar.gz files. The form is '/srv/ftp/%(repo)s/os/%(arch)s/', which is
also the default template if none is specified. While 'repo' is not required to
be present in the path_template, note that 'arch' is so reporead can function
correctly.
"""

import logging
import multiprocessing
import os
import pyinotify
import sys
import threading
import time

from django.core.management.base import BaseCommand, CommandError
from django.db import connection, transaction

from main.models import Arch, Repo
from .reporead import read_repo

logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s -> %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stderr)
logger = logging.getLogger()

class Command(BaseCommand):
    help = "Watch database files and run an update when necessary."
    args = "[path_template]"

    def handle(self, path_template=None, **options):
        v = int(options.get('verbosity', 0))
        if v == 0:
            logger.level = logging.ERROR
        elif v == 1:
            logger.level = logging.INFO
        elif v == 2:
            logger.level = logging.DEBUG

        if not path_template:
            path_template = '/srv/ftp/%(repo)s/os/%(arch)s/'
        self.path_template = path_template

        notifier = self.setup_notifier()
        # this thread is done using the database; all future access is done in
        # the spawned read_repo() processes, so close the otherwise completely
        # idle connection.
        connection.close()

        logger.info('Entering notifier loop')
        notifier.loop()

        logger.info('Cancelling remaining threads...')
        for thread in threading.enumerate():
            if hasattr(thread, 'cancel'):
                thread.cancel()

    @transaction.commit_on_success
    def setup_notifier(self):
        '''Set up and configure the inotify machinery and logic.
        This takes the provided or default path_template and builds a list of
        directories we need to watch for database updates. It then validates
        and passes these on to the various pyinotify pieces as necessary and
        finally builds and returns a notifier object.'''
        transaction.commit_manually()
        arches = Arch.objects.filter(agnostic=False)
        repos = Repo.objects.all()
        transaction.set_dirty()
        arch_path_map = {arch: None for arch in arches}
        all_paths = set()
        total_paths = 0
        for arch in arches:
            combos = ({ 'repo': repo.name.lower(), 'arch': arch.name }
                    for repo in repos)
            # take a python format string and generate all unique combinations
            # of directories from it; using set() ensures we filter it down
            paths = {self.path_template % values for values in combos}
            total_paths += len(paths)
            all_paths |= paths
            arch_path_map[arch] = paths

        logger.info('Watching %d total paths', total_paths)
        logger.debug(all_paths)

        # sanity check- basically ensure every path we created from the
        # template mapped to only one architecture
        if total_paths != len(all_paths):
            raise CommandError('path template did not uniquely '
                    'determine architecture for each file')

        # A proper atomic replacement of the database as done by rsync is type
        # IN_MOVED_TO. repo-add/remove will finish with a IN_CLOSE_WRITE.
        mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO

        manager = pyinotify.WatchManager()
        for name in all_paths:
            manager.add_watch(name, mask)

        handler = EventHandler(arch_paths=arch_path_map)
        return pyinotify.Notifier(manager, handler)


class Database(object):
    '''A object representing a pacman database on the filesystem. It stores
    various bits of metadata and state representing the file path, when we last
    updated, how long our delay is before performing the update, whether we are
    updating now, etc.'''
    def __init__(self, arch, path, delay=60.0, nice=3):
        self.arch = arch
        self.path = path
        self.delay = delay
        self.nice = nice
        self.mtime = None
        self.last_import = None
        self.update_thread = None
        self.updating = False
        self.run_again = False
        self.lock = threading.Lock()

    def _start_update_countdown(self):
        self.update_thread = threading.Timer(self.delay, self.update)
        logger.info('Starting %.1f second countdown to update %s',
                self.delay, self.path)
        self.update_thread.start()

    def queue_for_update(self, mtime):
        logger.debug('Queueing database %s...', self.path)
        with self.lock:
            self.mtime = mtime
            if self.updating:
                # store the fact that we will need to run it again
                self.run_again = True
                return
            if self.update_thread:
                self.update_thread.cancel()
                self.update_thread = None
            self._start_update_countdown()

    def update(self):
        logger.debug('Updating database %s...', self.path)
        with self.lock:
            self.last_import = time.time()
            self.updating = True

        try:
            # invoke reporead's primary method. we do this in a separate
            # process for memory conservation purposes; these processes grow
            # rather large so it is best to free up the memory ASAP.
            def run():
                if self.nice != 0:
                    os.nice(self.nice)
                read_repo(self.arch, self.path, {})

            process = multiprocessing.Process(target=run)
            process.start()
            process.join()
        finally:
            logger.debug('Done updating database %s.', self.path)
            with self.lock:
                self.update_thread = None
                self.updating = False
                if self.run_again:
                    self.run_again = False
                    self._start_update_countdown()


class EventHandler(pyinotify.ProcessEvent):
    '''Our main event handler which listens for database change events. Because
    we are watching the whole directory, we filter down and only look at those
    events dealing with files databases.'''

    def my_init(self, **kwargs):
        self.databases = {}
        self.arch_lookup = {}

        # we really want a single path to arch mapping, so massage the data
        arch_paths = kwargs['arch_paths']
        for arch, paths in arch_paths.items():
            self.arch_lookup.update((path.rstrip('/'), arch) for path in paths)

    def process_default(self, event):
        '''Primary event processing function which kicks off reporead timer
        threads if a files database was updated.'''
        if not event.name:
            return
        # screen to only the files we care about
        if event.name.endswith('.files.tar.gz'):
            path = event.pathname
            stat = os.stat(path)
            database = self.databases.get(path, None)
            if database is None:
                arch = self.arch_lookup.get(event.path, None)
                if arch is None:
                    logger.warning(
                            'Could not determine arch for %s, skipping update',
                            path)
                    return
                database = Database(arch, path)
                self.databases[path] = database
            database.queue_for_update(stat.st_mtime)


# vim: set ts=4 sw=4 et: