import argparse import csv import _csv from dataclasses import dataclass from datetime import datetime import enum import os from random import Random import shutil import sys from zipfile import ZipFile import fs.path from fs.base import FS from fs.copy import copy_dir from fs.tempfs import TempFS from fs.zipfs import ZipFS from pathvalidate import sanitize_filename import simfile from simfile.dir import SimfilePack, SimfileDirectory from simfile.sm import SMChart, SMSimfile from simfile.ssc import SSCChart, SSCSimfile from simfile.types import Simfile #################### # Script arguments # #################### class AnonymizeEntriesArgs: """Stores the command-line arguments for this script.""" csv: str files: str dry_run: bool def argparser(): """Get an ArgumentParser instance for this command-line script.""" parser = argparse.ArgumentParser() parser.add_argument("csv", type=str, help="path to the CSV file of form responses") parser.add_argument( "files", type=str, help="path to the directory of file responses" ) parser.add_argument( "-d", "--dry-run", action=argparse.BooleanOptionalAction, help="preview changes without writing the file", ) return parser CsvContents = list[dict[str, str]] ##################### # Utility functions # ##################### class KnownColumns(enum.StrEnum): Timestamp = "Timestamp" EmailAddress = "Email Address" GeneratedAlias = "Generated Alias" IgnoreResubmittedFile = "Ignore Resubmitted File" # Not persisted: ExtractedTo = "Extracted To" @dataclass class DynamicColumns: filename: str ChangedCsvContents = bool def parse_timestamp(timestamp: str): return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S") def canonical_simfile_filename(sm: Simfile) -> str: return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip()) ################ # Script logic # ################ def assert_valid_file_paths(args: AnonymizeEntriesArgs): assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file" assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory" def load_csv_contents(args: AnonymizeEntriesArgs): with open(args.csv, "r") as csvfile: return list(csv.DictReader(csvfile)) def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]: def extract_alias_parts(csv: "_csv._reader"): return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0))) with open(csvpath, "r") as csvfile: alias_parts = extract_alias_parts(csv.reader(csvfile)) print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts") return alias_parts def assert_known_google_forms_columns_present(csv_contents: CsvContents): assert ( KnownColumns.Timestamp in csv_contents[0] ), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column" assert ( KnownColumns.EmailAddress in csv_contents[0] ), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column" def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns: maybe_filename_columns = [ column for (column, value) in csv_contents[0].items() if value.endswith(".zip") ] assert ( len(maybe_filename_columns) != 0 ), 'First data row of provided CSV file has no cell ending in ".zip"' assert ( len(maybe_filename_columns) == 1 ), 'First data row of provided CSV file has multiple cells ending in ".zip"' filename_column = maybe_filename_columns[0] print(f"Detected filename column: {repr(filename_column)}") return DynamicColumns(filename=filename_column) def maybe_generate_aliases( args: AnonymizeEntriesArgs, alias_parts: tuple[list[str], list[str]], csv_contents: CsvContents, ) -> ChangedCsvContents: reuse_aliases = KnownColumns.GeneratedAlias in csv_contents[0] if reuse_aliases: print("Reusing generated aliases") return False else: for row in csv_contents: random = Random( "; ".join([row[KnownColumns.EmailAddress], args.csv, args.files]) ) row[KnownColumns.GeneratedAlias] = ( f"{random.choice(alias_parts[0])} {random.choice(alias_parts[0])}" ) print("Generated an alias for each entry") return True def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvContents: reuse_resubmitted = KnownColumns.IgnoreResubmittedFile in csv_contents[0] if reuse_resubmitted: print("Reusing resubmitted files column") return False else: most_recent_entry_per_user = {} resubmitted_total = 0 for loop_pass in ("find", "mark"): for row in csv_contents: user = row[KnownColumns.EmailAddress] timestamp = parse_timestamp(row[KnownColumns.Timestamp]) if loop_pass == "find": if user in most_recent_entry_per_user: if timestamp > most_recent_entry_per_user[user]: most_recent_entry_per_user[user] = timestamp else: most_recent_entry_per_user[user] = timestamp elif loop_pass == "mark": resubmitted = timestamp < most_recent_entry_per_user[user] row[KnownColumns.IgnoreResubmittedFile] = ( "true" if resubmitted else "" ) if resubmitted: resubmitted_total += 1 print(f"Marked {resubmitted_total} resubmitted files to be ignored") return True def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents): if args.dry_run: print("Dry run - not writing generated columns back to CSV") else: with open(args.csv, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) writer.writeheader() for row in csv_contents: writer.writerow(row) print("Wrote generated columns back to CSV") def extract_entries_to_temporary_folder( args: AnonymizeEntriesArgs, csv_contents: CsvContents, dynamic_columns: DynamicColumns, ) -> TempFS: def find_simfile_dir_zip_path( zip_fs: FS, ) -> tuple[str, SimfileDirectory]: # Check all immediate subdirectories, followed by the root itself root = "/" contents = zip_fs.listdir(root) subdirs = [item for item in contents if zip_fs.isdir(item)] for subdir in subdirs: possible_path = fs.path.join(root, subdir) possible_simfile_dir = SimfileDirectory( possible_path, filesystem=zip_fs, ) if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path: return (possible_path, possible_simfile_dir) raise RuntimeError( "Unable to find a suitable simfile directory in the ZIP. " "Make sure the simfile is no more than one directory deep, " 'e.g. contains "Simfile/simfile.ssc".' ) def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str: zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs) canonical_filename = canonical_simfile_filename(simfile_dir.open()) assert not temp_fs.exists( canonical_filename ), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder" copy_dir(zip_fs, zip_path, temp_fs, canonical_filename) return canonical_filename temp_fs = TempFS(identifier="dimocracy-voucher_anonymized", auto_clean=False) for row in csv_contents: if row[KnownColumns.IgnoreResubmittedFile]: continue zip_absolute_path = os.path.join(args.files, row[dynamic_columns.filename]) if os.path.isfile(zip_absolute_path): with open(zip_absolute_path, "rb") as zip_file: zip_fs = ZipFS(zip_file) row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) else: print("WARNING: {zip_absolute_path} not found - skipping") print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}") return temp_fs def anonymize_entries( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): def maybe_rename_file(absolute_path: str | None, canonical_filename: str): if absolute_path and os.path.basename(absolute_path) != canonical_filename: absolute_canonical_path = os.path.join( os.path.dirname(absolute_path), canonical_filename ) os.rename(absolute_path, absolute_canonical_path) print( f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}" ) def maybe_delete_file(absolute_path: str | None): if absolute_path and os.path.isfile(absolute_path): os.remove(absolute_path) print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}") for row in csv_contents: if row[KnownColumns.IgnoreResubmittedFile]: continue absolute_simfile_dir_path = os.path.join( temp_fs.root_path, row[KnownColumns.ExtractedTo] ) simfile_dir = SimfileDirectory(absolute_simfile_dir_path) canonical_filename = canonical_simfile_filename(simfile_dir.open()) assets = simfile_dir.assets() maybe_rename_file(assets.music, f"{canonical_filename}.ogg") maybe_delete_file(assets.background) maybe_delete_file(assets.banner) maybe_delete_file(assets.cdimage) maybe_delete_file(assets.cdtitle) maybe_delete_file(assets.disc) maybe_delete_file(assets.jacket) if simfile_dir.sm_path: with simfile.mutate(simfile_dir.sm_path) as sm: assert isinstance(sm, SMSimfile) sm.credit = row[KnownColumns.GeneratedAlias] sm.background = "" sm.banner = "" sm.cdtitle = "" sm.genre = "" sm.music = f"{canonical_filename}.ogg" for _chart in sm.charts: sm_chart: SMChart = _chart # typing workaround sm_chart.description = row[KnownColumns.GeneratedAlias] maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm") print( f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm" ) if simfile_dir.ssc_path: with simfile.mutate(simfile_dir.ssc_path) as ssc: assert isinstance(ssc, SSCSimfile) ssc.credit = row[KnownColumns.GeneratedAlias] ssc.music = f"{canonical_filename}.ogg" ssc.background = "" ssc.banner = "" ssc.cdtitle = "" ssc.genre = "" ssc.jacket = "" ssc.cdimage = "" ssc.discimage = "" ssc.labels = "" for _chart in ssc.charts: ssc_chart: SSCChart = _chart # typing workaround ssc_chart.description = "" ssc_chart.chartname = "" ssc_chart.chartstyle = "" ssc_chart.credit = row[KnownColumns.GeneratedAlias] maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc") print( f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc" ) for dir_entry in os.scandir(absolute_simfile_dir_path): if dir_entry.is_file(): if ( dir_entry.name.endswith(".old") or dir_entry.name.endswith(".txt") or dir_entry.name.endswith(".zip") ): # These are definitely safe to delete for distribution os.remove(dir_entry.path) elif ( dir_entry.name.endswith(".ssc") or dir_entry.name.endswith(".sm") or dir_entry.name.endswith(".ogg") ): # These are expected pass else: # Some other extension not listed above print( f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" ) elif dir_entry.is_dir(): if dir_entry.name == "__bias-check": # nine-or-null directories can be removed shutil.rmtree(dir_entry.path) print( f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}" ) else: # Some other subdirectory - maybe mods? print( f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" ) def zip_anonymized_entries( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): print("STUB - zip_anonymized_entries") ############### # Main method # ############### def main(argv: list[str]): args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs()) assert_valid_file_paths(args) alias_parts = load_alias_parts("aliasparts.csv") csv_contents = load_csv_contents(args) assert_known_google_forms_columns_present(csv_contents) dynamic_columns = detect_dynamic_columns(csv_contents) csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents) csv_contents_changed |= maybe_mark_resubmitted_entries(csv_contents) if csv_contents_changed: maybe_save_generated_columns(args, csv_contents) temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) anonymize_entries(args, csv_contents, temp_fs) zip_anonymized_entries(args, csv_contents, temp_fs) if __name__ == "__main__": main(sys.argv)