Delete Diagnostics Rows
A quick hack of a script used to clean up a database when multiple applications have finished processing the data.
This was refactored because it was originally running under a terminated user’s screen session and had no error handling logic. It was written so that it could run comfortably as a daemon.
Original Version
#!/usr/bin/python
"""Script for cleaning up diagnostics_historydump_bak rows.
NOTE: You need to set your mysql user and password as environment
variables before using this script.
"""
import logging
import MySQLdb
import os
import sys
import time
# MySQL credentials
MYSQL_USER = os.environ["MYSQL_USER"]
MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"]
# MySQL database & table
MYSQL_DATABASE = "diagnostics"
MYSQL_TABLE = "diagnostics_historydump"
# Limit query to 2000 rows at a time
LIMIT = 10000
# Sleep time between each delete/drop
SLEEP_TIME = 10
# Location of diagnostics dump files
DUMPS_LOCATION = os.path.expanduser("~st/infrastructure/uploaded_files/dumps/")
def InitializeLogging():
"""Configure logging to console and optionally to disk."""
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
# All scripts log to console
console = logging.StreamHandler()
console.setLevel(logging.ERROR)
formatter = logging.Formatter("%(levelname)-8s: %(message)s")
console.setFormatter(formatter)
logger.addHandler(console)
log_filename = ConstructLogFilename()
disk = logging.FileHandler(filename=log_filename, mode="a")
#disk.setLevel(logging.DEBUG)
disk.setLevel(logging.WARNING)
formatter = logging.Formatter(
"%(asctime)s.%(msecs).03d %(levelname)-8s %(message)s",
"%Y-%m-%d %H:%M:%S")
disk.setFormatter(formatter)
logger.addHandler(disk)
def ConstructLogFilename():
"""Returns filename for logfile.
Returns:
Filename format <script_name>.Y-m-d-H-M-S
"""
#base_name = os.path.basename(sys.argv[0])
#pgm_name = os.path.splitext(base_name)[0]
time_str = time.strftime("%Y-%m-%d-%H-%M-%S")
filename = "infra_dump_cleanup.%s.log" % (time_str)
return filename
def GetHistoryDumpRows(cursor):
"""Retreive the id, dump value from MYSQL_TABLE.
Args:
cursor: MySQLDB.cursor object
Returns:
Results of the query
"""
sql = ("select id, dump from %s.%s "
"where network_diagnostics_processed='p' "
"and refactoring_processed='p' "
"and sentry_reports_processed='p' "
"order by id asc limit %s;" % (MYSQL_DATABASE, MYSQL_TABLE, LIMIT))
cursor.execute(sql)
return cursor.fetchall()
def DeleteDumpRow(db, cursor, row_id):
"""Delete a dump row.
Args:
db: MySQLdb object to diagnostics database
cursor: MySQL cursor object
row_id: Number ID of row
"""
sql = "delete from %s.%s where id='%s';" % (
MYSQL_DATABASE, MYSQL_TABLE, row_id)
cursor.execute(sql)
db.commit()
logging.debug("Row %s has been deleted." % row_id)
def DeleteDumpFile(dump_file):
"""Delete a diagnostics dump file.
Args:
dump_file: Path to dump file
"""
dump_file = dump_file.replace("dumps/", DUMPS_LOCATION)
if os.path.exists(dump_file):
logging.debug("Removing %s" % dump_file)
os.remove(dump_file)
else:
logging.warning("File %s not found" % dump_file)
def StartDiagnosticsCleanup(db):
"""Continue looping until all rows matching the conditions are deleted.
Args:
db: MySQLdb object to diagnostics database
"""
cursor = db.cursor()
while(1):
rows = GetHistoryDumpRows(cursor)
if rows:
for row in rows:
row_id, dump_file = (int(row[0]), row[1])
try:
DeleteDumpFile(dump_file)
except:
logging.error("Unable to delete file %s." % dump_file)
continue
try:
DeleteDumpRow(db, cursor, row_id)
except:
logging.error("Unable to delete row %s." % row_id)
db.rollback()
continue
else:
logging.info("Script is complete")
break
# Sleep for 2 seconds for IO reprieve
logging.info("Script will now sleep for %s seconds." % SLEEP_TIME)
time.sleep(SLEEP_TIME)
def main():
InitializeLogging()
logging.info("Starting diagnostics cleanup.")
logging.info("Initializing DB connection...")
db = MySQLdb.connect("localhost",
MYSQL_USER, MYSQL_PASSWORD,
MYSQL_DATABASE)
cursor = db.cursor()
logging.info("DB connection established")
StartDiagnosticsCleanup(db)
db.close()
logging.info("Database connection closed.")
if __name__ == "__main__":
main()
Biggest Issues
Must be run interactively (daemonized via screen)
Requires restart to change any settings
Logging used, but required script edits to activate
Required constant restart to continue operating
New Version
#!/usr/bin/env python2
'''
This script deletes rows from a mysql db/table (diagnostics.diagnostics_historydump)
that have been marked as being processed by other services.
SIGUSR1 toggles logging level.
SIGUSR2 reloads configuration values.
Configuration:
See the _defaults attribute of DiagConf() for available
configuration settings and their default values.
Files:
/root/.diag_cleanup.py
/etc/.diag_cleanup.py
'''
import json
import logging
import MySQLdb
import os
import signal
import sys
import time
class DiagConf(object):
'''Parse configuration settings from various sources.'''
_defaults = {
# Start with debug logging and re-read config files with every
# request to .get(). This re-checking stops once DEBUG_ENABLED
# is set to False and requires a configuration reload (triggered
# via SIGUSR2) to re-enable.
'DEBUG_ENABLED': False,
# Log file location
'LOG_PATH': '/var/log/diag_cleanup.log',
# Location of dump files
'DUMP_DIR': os.path.expanduser('~st/infrastructure/uploaded_files/dumps/'),
# Database credentials
'DB_USER': '',
'DB_PASS': '',
'DB_HOST': '127.0.0.1',
'DB_NAME': 'diagnostics',
'DB_TABLE': 'diagnostics_historydump',
# Optimizations
'QUERY_LIMIT': 10000,
'SLEEP_TIME': 10}
_attr = {}
@classmethod
def get(cls, key, default=None):
'''Returns a config value if key is found or else returns None.'''
if not cls._attr:
cls.reload()
elif cls._attr['DEBUG_ENABLED']:
cls.reload()
if key in list(cls._attr.keys()):
return cls._attr[key]
return default
@classmethod
def reload(cls):
'''Load settings from files and merge the returned dictionaries.'''
conf = {}
conf.update(cls._defaults)
conf.update(cls._get_conf('/etc/.diag_cleanup.json'))
conf.update(cls._get_conf('/root/.diag_cleanup.json'))
cls._attr = conf
@classmethod
def _get_conf(cls, import_path):
'''Returns a dictionary of parsed settings.'''
if not os.path.isfile(import_path):
return {}
try:
with open(import_path, 'r') as fh:
return json.load(fh)
except:
logging.warning('Failed to load configuration from path: {}'.format(import_path))
return {}
class DiagCleanup(object):
'''Object to handle Diagnostics Cleanup.'''
def __init__(self):
self.db = self._get_db()
def _get_db(self):
logging.info('Establishing database connection.')
return MySQLdb.connect(
DiagConf.get('DB_HOST'),
DiagConf.get('DB_USER'),
DiagConf.get('DB_PASS'),
DiagConf.get('DB_NAME'))
def run(self):
cursor = self.db.cursor()
rm_rows = []
rows = self.get_rows(cursor)
if not rows:
logging.debug('Early return from processing: no diagnostic rows found.')
return None
# For each processed dump, collect a row ID and delete the file
for row in rows:
rm_rows.append(row[0])
self.delete_file(row[1])
# Delete rows of processed files
self.delete_rows(cursor, rm_rows)
# Release cursor
cursor.close()
def get_rows(self, cursor):
'''Returns a list of processed dumps in the form of tuple(id, dump).'''
logging.debug('Collecting completed diagnostics rows.')
cursor.execute(
'SELECT id, dump FROM {}.{} '
'WHERE network_diagnostics_processed="p" '
'AND refactoring_processed="p" '
'AND sentry_reports_processed="p" '
'ORDER BY id ASC LIMIT {}'.format(
DiagConf.get('DB_NAME'),
DiagConf.get('DB_TABLE'),
DiagConf.get('QUERY_LIMIT')))
ret = cursor.fetchall()
if not ret:
return []
return ret
def delete_rows(self, cursor, rows):
row_ids = ', '.join(map(str, rows))
logging.debug('Removing diagnostics rows.'.format(row_ids))
cursor.execute(
'DELETE FROM {}.{} '
'WHERE id IN ({})'.format(
DiagConf.get('DB_NAME'),
DiagConf.get('DB_TABLE'),
row_ids))
self.db.commit()
def delete_file(self, filename):
fpath = filename.replace('dumps/', DiagConf.get('DUMP_DIR'))
try:
os.remove(fpath)
except:
logging.warning('Unable to delete file: {}'.format(fpath))
def main():
'''Launch into an infinite loop: run cleanup, sleep, repeat.'''
DiagConf.reload()
dc = DiagCleanup()
while True:
logging.debug('Running diag cleanup.')
dc.run()
logging.debug('Sleeping before next execution.')
time.sleep(DiagConf.get('SLEEP_TIME'))
def handle_signal(signum, *args): # pylint: disable=unused-argument
'''Generic interface to handle signals'''
if signum == signal.SIGUSR1:
logging.warning('SIGUSR1 received.')
if logging.getLogger().getEffectiveLevel() == logging.WARNING:
set_loglevel('DEBUG')
else:
set_loglevel('WARNING')
elif signum == signal.SIGUSR2:
logging.warning('SIGUSR2 received.')
DiagConf.reload()
if DiagConf.get('DEBUG_ENABLED'):
set_loglevel('DEBUG')
else:
set_loglevel('WARNING')
def set_loglevel(level):
'''Set the log level.'''
lvl = getattr(logging, level, None)
if not lvl:
raise 'Invalid log level.'
if logging.getLogger().getEffectiveLevel() != lvl:
logging.warning('Setting log level to: {}'.format(level))
logging.getLogger().setLevel(lvl)
if __name__ == '__main__':
# Set up the log handler
level = logging.WARNING
if DiagConf.get('DEBUG_ENABLED'):
level = logging.DEBUG
logging.basicConfig(level=level, filename=DiagConf.get('LOG_PATH'))
# Set up signal handling
signal.signal(signal.SIGUSR1, handle_signal)
signal.signal(signal.SIGUSR2, handle_signal)
# Kick off main execution
try:
main()
except (KeyboardInterrupt, SystemExit):
sys.exit(0)
except:
logging.exception('Exception running main():')
raise