blob: 55c12aed7bce6d564784856bfa40453cd158e041 [file] [log] [blame]
#!/usr/bin/env python
import sqlalchemy as sa
import time
import json
import logging
log = logging.getLogger(__name__)
# From
# http://stackoverflow.com/questions/5631078/sqlalchemy-print-the-actual-query
def query_to_str(statement, bind=None):
"""
print a query, with values filled in
for debugging purposes *only*
for security, you should always separate queries from their values
please also note that this function is quite slow
"""
import sqlalchemy.orm
if isinstance(statement, sqlalchemy.orm.Query):
if bind is None:
bind = statement.session.get_bind(
statement._mapper_zero_or_none())
statement = statement.statement
elif bind is None:
bind = statement.bind
dialect = bind.dialect
compiler = statement._compiler(dialect)
class LiteralCompiler(compiler.__class__):
def visit_bindparam(
self, bindparam, within_columns_clause=False,
literal_binds=False, **kwargs
):
return super(LiteralCompiler, self).render_literal_bindparam(
bindparam, within_columns_clause=within_columns_clause,
literal_binds=literal_binds, **kwargs)
compiler = LiteralCompiler(dialect, statement)
return compiler.process(statement)
def cleaner_upper(select_query, delete_queries):
"""
Cleans stuff up.
Runs select query, which should return a list of ids to delete.
Chunks of ids are passed to each query of delete_queries.
delete queries should be functions that accept a list of ids to delete.
Returns once select_query no longer returns any results
Sleeps between rounds
"""
while True:
t = time.time()
log.debug("finding rows: %s", query_to_str(select_query))
rows = select_query.execute()
log.info("found %i rows in %.2fs", rows.rowcount, time.time() - t)
if rows.rowcount == 0:
break
# Delete 100 at a time
ids = [row[0] for row in rows]
chunk_size = 100
for i in range(0, len(ids), chunk_size):
for q in delete_queries:
t = time.time()
q(ids[i:i + chunk_size])
sleep_time = max(0.1, (time.time() - t) * 4.0)
log.debug("sleeping for %.2fs", sleep_time)
time.sleep(sleep_time)
def deleter(column):
"""
Returns a function that accepts a list of ids, and deletes
rows that match.
"""
def delete_func(ids):
t = time.time()
q = column.table.delete().where(column.in_(ids))
log.debug(query_to_str(q))
n = q.execute().rowcount
log.info("deleted %i rows from %s in %.2fs", n, column.table.name,
time.time() - t)
return delete_func
def cleanup_statusdb_builds(meta, cutoff):
log.info("Cleaning up builds before %s", cutoff)
t_builds = sa.Table('builds', meta, autoload=True)
t_steps = sa.Table('steps', meta, autoload=True)
t_properties = sa.Table('build_properties', meta, autoload=True)
t_schedulerdb_requests = sa.Table('schedulerdb_requests', meta,
autoload=True)
builds_q = sa.select([t_builds.c.id]).\
where(t_builds.c.starttime < cutoff).\
order_by(t_builds.c.starttime.asc()).\
limit(10000)
cleaner_upper(builds_q, [
deleter(t_steps.c.build_id),
deleter(t_properties.c.build_id),
deleter(t_schedulerdb_requests.c.status_build_id),
deleter(t_builds.c.id),
])
log.info("Finished cleaning up builds")
def cleanup_statusdb_orphaned_steps(meta):
log.info("Cleaning up orphaned steps")
t_builds = sa.Table('builds', meta, autoload=True)
t_steps = sa.Table('steps', meta, autoload=True)
q = sa.select(
[t_steps.c.build_id],
# nopep8 - we really do need to use == None
t_builds.c.id == None,
from_obj=[t_steps.outerjoin(t_builds, t_steps.c.build_id ==
t_builds.c.id)],
distinct=True,
).limit(10000)
cleaner_upper(q, [
deleter(t_steps.c.build_id),
])
log.info("Finished cleaning up orphaned steps")
def cleanup_statusdb_orphaned_properties(meta):
log.info("Cleaning up orphaned build properties")
t_builds = sa.Table('builds', meta, autoload=True)
t_properties = sa.Table('build_properties', meta, autoload=True)
q = sa.select(
[t_properties.c.build_id],
# nopep8 - we really do need to use == None
t_builds.c.id == None,
from_obj=[t_properties.outerjoin(t_builds, t_properties.c.build_id ==
t_builds.c.id)],
distinct=True,
).limit(10000)
cleaner_upper(q, [
deleter(t_properties.c.build_id),
])
log.info("Finished cleaning up orphaned build properties")
IGNORABLE_CLASSES = (
"buildbotcustom.l10n.TriggerableL10n",
"buildbotcustom.scheduler.PersistentScheduler",
"buildbot.schedulers.basic.Dependent",
"buildbot.schedulers.triggerable.Triggerable",
"buildbotcustom.scheduler.TriggerBouncerCheck",
"buildbotcustom.scheduler.Dependent-props",
"buildbotcustom.buildbotcustom.l10n.TriggerableL10n",
"buildbot.schedulers.timed.Nightly",
"buildbotcustom.scheduler.SpecificNightly-props",
"buildbotcustom.scheduler.Nightly-props",
"buildbotcustom.scheduler.AggregatingScheduler",
)
PERBUILD_CLASSES = (
"buildbot.schedulers.basic.Scheduler",
"buildbotcustom.scheduler.MultiScheduler",
"buildbotcustom.scheduler.BuilderChooserScheduler",
"buildbotcustom.scheduler.Scheduler-props",
"buildbotcustom.scheduler.BuilderChooserScheduler-props",
"buildbotcustom.scheduler.EveryNthScheduler")
_change_cache = {}
def get_change_date(db, changeid):
if changeid in _change_cache:
return _change_cache[changeid]
q = sa.text("SELECT when_timestamp FROM changes WHERE changeid=:changeid")
results = db.execute(q, changeid=changeid)
row = results.fetchone()
if row:
_change_cache[changeid] = row.when_timestamp
return row.when_timestamp
else:
_change_cache[changeid] = None
return None
def should_delete(db, s, cutoff):
state = json.loads(s.state)
if s.class_name in IGNORABLE_CLASSES:
# Not sure if we have enough data here to make a decision...
return False
elif s.class_name in PERBUILD_CLASSES:
assert "last_processed" in state
change_time = get_change_date(db, state['last_processed'])
if change_time < cutoff:
return True
else:
log.warning("unhandled scheduler class for scheduler %s: %s %s",
s.schedulerid, s.class_name, s.name)
return False
def cleanup_schedulerdb_schedulers(db):
now = time.time()
regular_cutoff = now - 7 * 86400 # 1 week
release_cutoff = now - 60 * 86400 # 2 months
# Get all the schedulers from the db
q = sa.text("SELECT * FROM schedulers")
schedulers = db.execute(q)
to_delete = []
for s in schedulers:
if s.name.startswith("release-"):
cutoff = release_cutoff
else:
cutoff = regular_cutoff
try:
if should_delete(db, s, cutoff):
to_delete.append(s)
except Exception:
log.exception("couldn't process scheduler %s: %s", s.schedulerid,
s.name)
for s in to_delete:
log.info("deleting scheduler %s: %s %s %s", s.schedulerid,
s.class_name, s.name, s.state)
q = sa.text("DELETE FROM schedulers WHERE schedulerid=:schedulerid")
db.execute(q, schedulerid=s.schedulerid)
if __name__ == '__main__':
from optparse import OptionParser
from ConfigParser import RawConfigParser
parser = OptionParser()
parser.set_defaults(
filename=None,
loglevel=logging.INFO,
logfile=None,
skip_orphans=False,
)
parser.add_option("-l", "--logfile", dest="logfile")
parser.add_option("--status-db", dest="status_db")
parser.add_option("--scheduler-db", dest="scheduler_db")
parser.add_option("--config", dest="config_file", help="config file")
parser.add_option("--cutoff", dest="cutoff",
help="cutoff date, prior to which we'll delete data. "
"format is YYYY-MM-DD")
parser.add_option("--skip-orphans", dest="skip_orphans",
action="store_true")
parser.add_option("-v", "--verbose", dest="loglevel", action="store_const",
const=logging.DEBUG, help="run verbosely")
parser.add_option("-q", "--quiet", dest="loglevel", action="store_const",
const=logging.WARNING, help="run quietly")
options, args = parser.parse_args()
# Configure logging
root_log = logging.getLogger()
log_formatter = logging.Formatter("%(asctime)s - %(message)s")
if options.logfile:
import logging.handlers
# Week one log file per week for 4 weeks
handler = logging.handlers.TimedRotatingFileHandler(options.logfile,
when='W6',
backupCount=4)
else:
handler = logging.StreamHandler()
handler.setFormatter(log_formatter)
root_log.setLevel(options.loglevel)
root_log.addHandler(handler)
if not options.cutoff:
parser.error("cutoff date is required")
# Load options from config if it's set
if options.config_file:
config_parser = RawConfigParser()
config_parser.read([options.config_file])
if config_parser.has_option("db", "status_db"):
options.status_db = config_parser.get("db", "status_db")
if config_parser.has_option("db", "scheduler_db"):
options.scheduler_db = config_parser.get("db", "scheduler_db")
# Clean up statusdb
if options.status_db:
status_db = sa.create_engine(options.status_db, pool_recycle=300)
meta = sa.MetaData(bind=status_db)
cleanup_statusdb_builds(meta, options.cutoff)
if not options.skip_orphans:
cleanup_statusdb_orphaned_steps(meta)
cleanup_statusdb_orphaned_properties(meta)
if options.scheduler_db:
scheduler_db = sa.create_engine(options.scheduler_db, pool_recycle=300)
cleanup_schedulerdb_schedulers(scheduler_db)