summaryrefslogtreecommitdiff
path: root/devel/management/commands
diff options
context:
space:
mode:
authorDan McGee <dan@archlinux.org>2011-11-17 12:34:12 -0600
committerDan McGee <dan@archlinux.org>2011-11-17 12:49:10 -0600
commita9819e3d715ce3e5c20c9665db9a6100f06ab562 (patch)
tree0d5b457e7de06401095a17993735b1530f20f454 /devel/management/commands
parent9d2fdbe5bc6a0d9ab2907b377056851fc5eb56c3 (diff)
Ensure reporead is protected against simultaneous runs
This adds a bunch of transaction magic and SELECT FOR UPDATE stuff to reporead to cope with the now-concurrent runs of reporead we get when invoked from our inotify-based updater. The collision occurs with 'any' architecture packages as both repo databases contain the new version, and the updates occur at exactly the same time. Signed-off-by: Dan McGee <dan@archlinux.org>
Diffstat (limited to 'devel/management/commands')
-rw-r--r--devel/management/commands/reporead.py206
1 files changed, 106 insertions, 100 deletions
diff --git a/devel/management/commands/reporead.py b/devel/management/commands/reporead.py
index ad76db4d..b6bd8457 100644
--- a/devel/management/commands/reporead.py
+++ b/devel/management/commands/reporead.py
@@ -13,10 +13,6 @@ Example:
./manage.py reporead i686 /tmp/core.db.tar.gz
"""
-from django.core.management.base import BaseCommand, CommandError
-from django.contrib.auth.models import User
-from django.db import transaction
-
from collections import defaultdict
import io
import os
@@ -27,6 +23,11 @@ import logging
from datetime import datetime
from optparse import make_option
+from django.core.management.base import BaseCommand, CommandError
+from django.contrib.auth.models import User
+from django.db import connections, router, transaction
+from django.db.utils import IntegrityError
+
from devel.utils import UserFinder
from main.models import Arch, Package, PackageDepend, PackageFile, Repo
from packages.models import Conflict, Provision, Replacement
@@ -189,8 +190,6 @@ def create_multivalued(dbpkg, repopkg, db_attr, repo_attr):
finder = UserFinder()
def populate_pkg(dbpkg, repopkg, force=False, timestamp=None):
- db_score = 1
-
if repopkg.base:
dbpkg.pkgbase = repopkg.base
else:
@@ -214,7 +213,7 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None):
dbpkg.last_update = timestamp
dbpkg.save()
- db_score += populate_files(dbpkg, repopkg, force=force)
+ populate_files(dbpkg, repopkg, force=force)
dbpkg.packagedepend_set.all().delete()
for y in repopkg.depends:
@@ -235,28 +234,23 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None):
create_multivalued(dbpkg, repopkg, 'groups', 'groups')
create_multivalued(dbpkg, repopkg, 'licenses', 'license')
- related_score = (len(repopkg.depends) + len(repopkg.optdepends)
- + len(repopkg.conflicts) + len(repopkg.provides)
- + len(repopkg.replaces) + len(repopkg.groups)
- + len(repopkg.license))
- if related_score:
- db_score += (related_score / 20) + 1
- return db_score
+pkg_same_version = lambda pkg, dbpkg: pkg.ver == dbpkg.pkgver \
+ and pkg.rel == dbpkg.pkgrel and pkg.epoch == dbpkg.epoch
def populate_files(dbpkg, repopkg, force=False):
if not force:
- if dbpkg.pkgver != repopkg.ver or dbpkg.pkgrel != repopkg.rel \
- or dbpkg.epoch != repopkg.epoch:
+ if not pkg_same_version(repopkg, dbpkg):
logger.info("DB version (%s) didn't match repo version "
"(%s) for package %s, skipping file list addition",
dbpkg.full_version, repopkg.full_version, dbpkg.pkgname)
- return 0
+ return
if not dbpkg.files_last_update or not dbpkg.last_update:
pass
elif dbpkg.files_last_update > dbpkg.last_update:
- return 0
+ return
+
# only delete files if we are reading a DB that contains them
if repopkg.has_files:
dbpkg.packagefile_set.all().delete()
@@ -275,30 +269,19 @@ def populate_files(dbpkg, repopkg, force=False):
pkgfile.save(force_insert=True)
dbpkg.files_last_update = datetime.utcnow()
dbpkg.save()
- return (len(repopkg.files) / 50) + 1
- return 0
-
-class Batcher(object):
- def __init__(self, threshold, start=0):
- self.threshold = threshold
- self.meter = start
- def batch_commit(self, score):
- """
- Track updates to the database and perform a commit if the batch
- becomes sufficiently large. "Large" is defined by waiting for the
- sum of scores to exceed the arbitrary threshold value; once it is
- hit a commit is issued.
- """
- self.meter += score
- if self.meter > self.threshold:
- logger.debug("Committing transaction, batch threshold hit")
- transaction.commit()
- self.meter = 0
+def select_pkg_for_update(dbpkg):
+ database = router.db_for_write(Package, instance=dbpkg)
+ connection = connections[database]
+ if 'sqlite' in connection.settings_dict['ENGINE'].lower():
+ return dbpkg
+ new_pkg = Package.objects.raw(
+ 'SELECT * FROM packages WHERE id = %s FOR UPDATE',
+ [dbpkg.id])
+ return list(new_pkg)[0]
-@transaction.commit_on_success
def db_update(archname, reponame, pkgs, options):
"""
Parses a list and updates the Arch dev database accordingly.
@@ -310,88 +293,111 @@ def db_update(archname, reponame, pkgs, options):
logger.info('Updating Arch: %s', archname)
force = options.get('force', False)
filesonly = options.get('filesonly', False)
- repository = Repo.objects.get(name__iexact=reponame)
- architecture = Arch.objects.get(name__iexact=archname)
- # no-arg order_by() removes even the default ordering; we don't need it
- dbpkgs = Package.objects.filter(
- arch=architecture, repo=repository).order_by()
- # This makes our inner loop where we find packages by name *way* more
- # efficient by not having to go to the database for each package to
- # SELECT them by name.
- dbdict = dict([(pkg.pkgname, pkg) for pkg in dbpkgs])
-
- logger.debug("Creating sets")
- dbset = set(dbdict.keys())
- syncset = set([pkg.name for pkg in pkgs])
- logger.info("%d packages in current web DB", len(dbset))
- logger.info("%d packages in new updating db", len(syncset))
- in_sync_not_db = syncset - dbset
- logger.info("%d packages in sync not db", len(in_sync_not_db))
-
- # Try to catch those random package deletions that make Eric so unhappy.
- if len(dbset):
- dbpercent = 100.0 * len(syncset) / len(dbset)
- else:
- dbpercent = 0.0
- logger.info("DB package ratio: %.1f%%", dbpercent)
-
- # Fewer than 20 packages makes the percentage check unreliable, but it also
- # means we expect the repo to fluctuate a lot.
- msg = "Package database has %.1f%% the number of packages in the " \
- "web database" % dbpercent
- if len(dbset) == 0 and len(syncset) == 0:
- pass
- elif not filesonly and \
- len(dbset) > 20 and dbpercent < 50.0 and \
- not repository.testing and not repository.staging:
- logger.error(msg)
- raise Exception(msg)
- elif dbpercent < 75.0:
- logger.warning(msg)
-
- batcher = Batcher(100)
+
+ with transaction.commit_manually():
+ repository = Repo.objects.get(name__iexact=reponame)
+ architecture = Arch.objects.get(name__iexact=archname)
+ # no-arg order_by() removes even the default ordering; we don't need it
+ dbpkgs = Package.objects.filter(
+ arch=architecture, repo=repository).order_by()
+ # This makes our inner loop where we find packages by name *way* more
+ # efficient by not having to go to the database for each package to
+ # SELECT them by name.
+ dbdict = dict((dbpkg.pkgname, dbpkg) for dbpkg in dbpkgs)
+
+ logger.debug("Creating sets")
+ dbset = set(dbdict.keys())
+ syncset = set([pkg.name for pkg in pkgs])
+ logger.info("%d packages in current web DB", len(dbset))
+ logger.info("%d packages in new updating db", len(syncset))
+ in_sync_not_db = syncset - dbset
+ logger.info("%d packages in sync not db", len(in_sync_not_db))
+
+ # Try to catch those random package deletions that make Eric so unhappy.
+ if len(dbset):
+ dbpercent = 100.0 * len(syncset) / len(dbset)
+ else:
+ dbpercent = 0.0
+ logger.info("DB package ratio: %.1f%%", dbpercent)
+
+ # Fewer than 20 packages makes the percentage check unreliable, but it also
+ # means we expect the repo to fluctuate a lot.
+ msg = "Package database has %.1f%% the number of packages in the " \
+ "web database" % dbpercent
+ if len(dbset) == 0 and len(syncset) == 0:
+ pass
+ elif not filesonly and \
+ len(dbset) > 20 and dbpercent < 50.0 and \
+ not repository.testing and not repository.staging:
+ logger.error(msg)
+ raise Exception(msg)
+ elif dbpercent < 75.0:
+ logger.warning(msg)
+
+ # If isolation level is repeatable-read, we need to ensure each package
+ # update starts a new transaction and re-queries the database as necessary
+ # to guard against simultaneous updates
+ transaction.commit()
if not filesonly:
# packages in syncdb and not in database (add to database)
- for p in [x for x in pkgs if x.name in in_sync_not_db]:
- logger.info("Adding package %s", p.name)
- pkg = Package(pkgname=p.name, arch=architecture, repo=repository)
- score = populate_pkg(pkg, p, timestamp=datetime.utcnow())
- batcher.batch_commit(score)
+ for pkg in (pkg for pkg in pkgs if pkg.name in in_sync_not_db):
+ logger.info("Adding package %s", pkg.name)
+ dbpkg = Package(pkgname=pkg.name, arch=architecture, repo=repository)
+ try:
+ with transaction.commit_on_success():
+ populate_pkg(dbpkg, pkg, timestamp=datetime.utcnow())
+ except IntegrityError:
+ logger.warning("Could not add package %s; "
+ "not fatal if another thread beat us to it.",
+ pkg.name, exc_info=True)
# packages in database and not in syncdb (remove from database)
- in_db_not_sync = dbset - syncset
- for p in in_db_not_sync:
- logger.info("Removing package %s", p)
- dbp = dbdict[p]
- dbp.delete()
- batcher.batch_commit(1)
+ for pkgname in (dbset - syncset):
+ logger.info("Removing package %s", pkgname)
+ dbpkg = dbdict[pkgname]
+ with transaction.commit_on_success():
+ # no race condition here as long as simultaneous threads both
+ # issue deletes; second delete will be a no-op
+ dbpkg.delete()
# packages in both database and in syncdb (update in database)
pkg_in_both = syncset & dbset
- for p in [x for x in pkgs if x.name in pkg_in_both]:
- logger.debug("Checking package %s", p.name)
- dbp = dbdict[p.name]
+ for pkg in (x for x in pkgs if x.name in pkg_in_both):
+ logger.debug("Checking package %s", pkg.name)
+ dbpkg = dbdict[pkg.name]
timestamp = None
# for a force, we don't want to update the timestamp.
# for a non-force, we don't want to do anything at all.
if filesonly:
pass
- elif p.ver == dbp.pkgver and p.rel == dbp.pkgrel \
- and p.epoch == dbp.epoch:
+ elif pkg_same_version(pkg, dbpkg):
if not force:
continue
else:
timestamp = datetime.utcnow()
+ # The odd select_for_update song and dance here are to ensure
+ # simultaneous updates don't happen on a package, causing
+ # files/depends/all related items to be double-imported.
if filesonly:
- logger.debug("Checking files for package %s", p.name)
- score = populate_files(dbp, p, force=force)
+ with transaction.commit_on_success():
+ # TODO Django 1.4 select_for_update() will work once released
+ dbpkg = select_pkg_for_update(dbpkg)
+ if pkg_same_version(pkg, dbpkg):
+ logger.debug("Package %s was already updated", pkg.name)
+ continue
+ logger.debug("Checking files for package %s", pkg.name)
+ populate_files(dbpkg, pkg, force=force)
else:
- logger.info("Updating package %s", p.name)
- score = populate_pkg(dbp, p, force=force, timestamp=timestamp)
-
- batcher.batch_commit(score)
+ with transaction.commit_on_success():
+ # TODO Django 1.4 select_for_update() will work once released
+ dbpkg = select_pkg_for_update(dbpkg)
+ if pkg_same_version(pkg, dbpkg):
+ logger.debug("Package %s was already updated", pkg.name)
+ continue
+ logger.info("Updating package %s", pkg.name)
+ populate_pkg(dbpkg, pkg, force=force, timestamp=timestamp)
logger.info('Finished updating Arch: %s', archname)