import argparse import csv import _csv from dataclasses import dataclass from datetime import datetime from decimal import Decimal import enum import os from random import Random import shutil import sys import textwrap from typing import cast from zipfile import ZipFile import fs.path from fs.base import FS from fs.copy import copy_dir from fs.tempfs import TempFS from fs.zipfs import ZipFS from pathvalidate import sanitize_filename import simfile from simfile.dir import SimfilePack, SimfileDirectory from simfile.sm import SMChart, SMSimfile from simfile.ssc import SSCChart, SSCSimfile from simfile.timing import BeatValues, BeatValue from simfile.types import Simfile #################### # Script arguments # #################### class AnonymizeEntriesRawArgs: data_dir: str csv: str | None file_uploads: str | None deanonymized: bool dry_run: bool emails: str output: str | None regenerate: bool seed: str class AnonymizeEntriesArgs(AnonymizeEntriesRawArgs): """Stores the command-line arguments for this script.""" csv: str file_uploads: str output: str def argparser(): """Get an ArgumentParser instance for this command-line script.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent( """\ example: Export your Google Form's CSV & file uploads to the same folder: path/to/folder: ├ form_responses.csv └ file_responses/ ├ Upload A - User 1.zip ├ Upload B - User 2.zip └ etc. Then run the script: poetry shell python ./anonymize_entries.py path/to/folder """ ), ) parser.add_argument( "data_dir", type=str, help="working directory - used to find the form responses CSV, file responses directory, and for output", ) parser.add_argument( "-c", "--csv", type=str, help="override CSV form responses path (defaults to first file matching {data_dir}/*.csv)", ) parser.add_argument( "-f", "--file-uploads", type=str, help="override file responses directory path (defaults to first subdirectory matching {data_dir}/*/*.zip)", ) parser.add_argument( "-o", "--output", type=str, help="override output path (defaults to {data_dir}/output)", ) parser.add_argument( "-d", "--dry-run", action=argparse.BooleanOptionalAction, help="do not create or modify any files", ) parser.add_argument( "-D", "--deanonymized", action=argparse.BooleanOptionalAction, help="skip anonymization of files, simply package them as-is", ) parser.add_argument( "-e", "--emails", type=str, help="limit output to files from the specified emails (comma-separated)", ) parser.add_argument( "-r", "--regenerate", action=argparse.BooleanOptionalAction, help="force-update generated CSV columns", ) parser.add_argument( "-s", "--seed", type=str, help="specify random seed for alias generation (treat this like a password & change it for each round)", ) return parser CsvContents = list[dict[str, str]] ##################### # Utility functions # ##################### class KnownColumns(enum.StrEnum): Timestamp = "Timestamp" EmailAddress = "Email Address" GeneratedAlias = "Generated Alias" IgnoreFile = "Ignore File" # Not persisted: ExtractedTo = "Extracted To" @dataclass class DynamicColumns: filename: str ChangedCsvContents = bool def parse_timestamp(timestamp: str): return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S") def canonical_simfile_filename(sm: Simfile) -> str: return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip()) ################ # Script logic # ################ def process_args(args: AnonymizeEntriesRawArgs) -> AnonymizeEntriesArgs: if not args.csv or not args.file_uploads: assert ( args.data_dir ), "Positional data_dir argument must be provided if --csv and --file-uploads are not both set" for dir_entry in os.scandir(args.data_dir): if not args.csv and dir_entry.is_file() and dir_entry.name.endswith(".csv"): args.csv = dir_entry.path print( f"Using {repr(dir_entry.name)} for form responses (override with --csv)" ) if not args.file_uploads and dir_entry.is_dir(): if any( subdir_entry.name.endswith(".zip") for subdir_entry in os.scandir(dir_entry.path) ): args.file_uploads = dir_entry.path print( f"Using {repr(dir_entry.name)} for file responses (override with --file-uploads)" ) if not args.output: args.output = os.path.join(args.data_dir, "output") print(f"Using {args.output} for output (override with --output)") assert args.csv, "Unable to find a CSV file in the provided directory" assert ( args.file_uploads ), "Unable to find a subdirectory containing ZIP files in the provided directory" return cast(AnonymizeEntriesArgs, args) def assert_valid_file_paths(args: AnonymizeEntriesArgs): assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file" assert os.path.isdir( args.file_uploads ), f"{repr(args.file_uploads)} is not a directory" def load_csv_contents(args: AnonymizeEntriesArgs): with open(args.csv, "r") as csvfile: return list(csv.DictReader(csvfile)) def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]: def extract_alias_parts(csv: "_csv._reader"): return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0))) with open(csvpath, "r", encoding="utf-8") as csvfile: alias_parts = extract_alias_parts(csv.reader(csvfile)) print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts") return alias_parts def assert_known_google_forms_columns_present(csv_contents: CsvContents): assert ( KnownColumns.Timestamp in csv_contents[0] ), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column" assert ( KnownColumns.EmailAddress in csv_contents[0] ), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column" def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns: maybe_filename_columns = [ column for (column, value) in csv_contents[0].items() if value.endswith(".zip") ] assert ( len(maybe_filename_columns) != 0 ), 'First data row of provided CSV file has no cell ending in ".zip"' assert ( len(maybe_filename_columns) == 1 ), 'First data row of provided CSV file has multiple cells ending in ".zip"' filename_column = maybe_filename_columns[0] print(f"Detected filename column: {repr(filename_column)}") return DynamicColumns(filename=filename_column) def maybe_generate_aliases( args: AnonymizeEntriesArgs, alias_parts: tuple[list[str], list[str]], csv_contents: CsvContents, ) -> ChangedCsvContents: reuse_aliases = ( not args.regenerate and KnownColumns.GeneratedAlias in csv_contents[0] ) if reuse_aliases: print("Reusing generated aliases") return False with open("aliases/usedaliases.txt", "r", encoding="utf-8") as usedaliases_file: usedaliases = set( line.rstrip().lower() for line in usedaliases_file if line.count(" ") == 1 ) with open("aliases/suswords.txt", "r", encoding="utf-8") as suswords_file: suswords = set(line.rstrip() for line in suswords_file) alias_to_email_address = {} seed = args.seed or args.csv for row in csv_contents: rnd = Random(",".join([row[KnownColumns.EmailAddress], seed])) while True: random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}" if ( random_alias in alias_to_email_address and alias_to_email_address[random_alias] != row[KnownColumns.EmailAddress] ): print( f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} is already being used by {alias_to_email_address[random_alias]}" ) elif random_alias in usedaliases: print( f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} has already been used" ) elif any( random_part in suswords for random_part in random_alias.split(" ") ): print( f"WARNING: alias for {row[KnownColumns.EmailAddress]} {repr(random_alias)} contains a sus word" ) break else: break row[KnownColumns.GeneratedAlias] = random_alias print("Generated an alias for each entry") return True def maybe_mark_resubmitted_entries( args: AnonymizeEntriesArgs, csv_contents: CsvContents ) -> ChangedCsvContents: reuse_resubmitted = ( not args.regenerate and KnownColumns.IgnoreFile in csv_contents[0] ) if reuse_resubmitted: print("Reusing resubmitted files column") return False else: most_recent_entry_per_user = {} resubmitted_total = 0 for loop_pass in ("find", "mark"): for row in csv_contents: user = row[KnownColumns.EmailAddress] timestamp = parse_timestamp(row[KnownColumns.Timestamp]) if loop_pass == "find": if user in most_recent_entry_per_user: if timestamp > most_recent_entry_per_user[user]: most_recent_entry_per_user[user] = timestamp else: most_recent_entry_per_user[user] = timestamp elif loop_pass == "mark": resubmitted = timestamp < most_recent_entry_per_user[user] row[KnownColumns.IgnoreFile] = "resubmitted" if resubmitted else "" if resubmitted: resubmitted_total += 1 s = "" if resubmitted_total == 1 else "s" print(f"Marked {resubmitted_total} resubmitted file{s} to be ignored") return True def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents): if args.dry_run: print("Dry run - not writing generated columns back to CSV") else: with open(args.csv, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) writer.writeheader() for row in csv_contents: writer.writerow(row) print("Wrote generated columns back to CSV") def maybe_mark_unspecified_emails( args: AnonymizeEntriesArgs, csv_contents: CsvContents ): if not args.emails: return unspecified_total = 0 specified_total = 0 emails = set(args.emails.split(",")) for row in csv_contents: if not row[KnownColumns.IgnoreFile]: if row[KnownColumns.EmailAddress] not in emails: row[KnownColumns.IgnoreFile] = "unspecified" unspecified_total += 1 else: specified_total += 1 assert specified_total > 0, "No responses were found from the specified emails" s = "s" if specified_total != 1 else "" print( f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others" ) def extract_entries_to_temporary_folder( args: AnonymizeEntriesArgs, csv_contents: CsvContents, dynamic_columns: DynamicColumns, ) -> TempFS: def find_simfile_dir_zip_path( zip_fs: FS, ) -> tuple[str, SimfileDirectory]: # Check all immediate subdirectories, followed by the root itself root = "/" contents = zip_fs.listdir(root) subdirs = [item for item in contents if zip_fs.isdir(item)] for subdir in subdirs: possible_path = fs.path.join(root, subdir) possible_simfile_dir = SimfileDirectory( possible_path, filesystem=zip_fs, ) if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path: return (possible_path, possible_simfile_dir) raise RuntimeError( "Unable to find a suitable simfile directory in the ZIP. " "Make sure the simfile is no more than one directory deep, " 'e.g. contains "Simfile/simfile.ssc".' ) def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str: zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs) canonical_filename = canonical_simfile_filename(simfile_dir.open()) assert not temp_fs.exists( canonical_filename ), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder" copy_dir(zip_fs, zip_path, temp_fs, canonical_filename) return canonical_filename temp_fs = TempFS(identifier="dimocracy-voucher") for row in csv_contents: if row[KnownColumns.IgnoreFile]: continue zip_absolute_path = os.path.join( args.file_uploads, row[dynamic_columns.filename] ) if os.path.isfile(zip_absolute_path): with open(zip_absolute_path, "rb") as zip_file: zip_fs = ZipFS(zip_file) row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) else: print("WARNING: {zip_absolute_path} not found - skipping") print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}") return temp_fs def maybe_anonymize_entries( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): if args.deanonymized: print("Deanonymized - skipping anonymization step") return def maybe_rename_file(absolute_path: str | None, canonical_filename: str): if absolute_path and os.path.basename(absolute_path) != canonical_filename: absolute_canonical_path = os.path.join( os.path.dirname(absolute_path), canonical_filename ) os.rename(absolute_path, absolute_canonical_path) print( f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}" ) def maybe_delete_file(absolute_path: str | None): if absolute_path and os.path.isfile(absolute_path): os.remove(absolute_path) print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}") def anonymize_bpms(bpm_str: str | None) -> str: bpm_values = BeatValues.from_str(bpm_str) bpm_values.append( BeatValue( beat=bpm_values[-1].beat + 10000, value=bpm_values[-1].value + Decimal("0.001"), ) ) print(f"Anonymized BPMs from {repr(bpm_str)} to {repr(str(bpm_values))}") return str(bpm_values) for row in csv_contents: if row[KnownColumns.IgnoreFile]: continue absolute_simfile_dir_path = os.path.join( temp_fs.root_path, row[KnownColumns.ExtractedTo] ) simfile_dir = SimfileDirectory(absolute_simfile_dir_path) canonical_filename = canonical_simfile_filename(simfile_dir.open()) assets = simfile_dir.assets() maybe_rename_file(assets.music, f"{canonical_filename}.ogg") maybe_delete_file(assets.background) maybe_delete_file(assets.banner) maybe_delete_file(assets.cdimage) maybe_delete_file(assets.cdtitle) maybe_delete_file(assets.disc) maybe_delete_file(assets.jacket) if simfile_dir.sm_path: with simfile.mutate(simfile_dir.sm_path) as sm: assert isinstance(sm, SMSimfile) sm.credit = row[KnownColumns.GeneratedAlias] sm.background = "" sm.banner = "" sm.cdtitle = "" sm.genre = "" sm.music = f"{canonical_filename}.ogg" sm.bpms = anonymize_bpms(sm.bpms) for _chart in sm.charts: sm_chart: SMChart = _chart # typing workaround sm_chart.description = row[KnownColumns.GeneratedAlias] maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm") print( f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm" ) if simfile_dir.ssc_path: with simfile.mutate(simfile_dir.ssc_path) as ssc: assert isinstance(ssc, SSCSimfile) ssc.credit = row[KnownColumns.GeneratedAlias] ssc.music = f"{canonical_filename}.ogg" ssc.background = "" ssc.banner = "" ssc.cdtitle = "" ssc.genre = "" ssc.jacket = "" ssc.cdimage = "" ssc.discimage = "" ssc.labels = "" ssc.bpms = anonymize_bpms(ssc.bpms) for _chart in ssc.charts: ssc_chart: SSCChart = _chart # typing workaround ssc_chart.description = "" ssc_chart.chartname = "" ssc_chart.chartstyle = "" ssc_chart.credit = row[KnownColumns.GeneratedAlias] if ssc_chart.bpms: ssc_chart.bpms = anonymize_bpms(ssc_chart.bpms) maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc") print( f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc" ) for dir_entry in os.scandir(absolute_simfile_dir_path): if dir_entry.is_file(): if ( dir_entry.name.endswith(".old") or dir_entry.name.endswith(".txt") or dir_entry.name.endswith(".zip") ): # These are definitely safe to delete for distribution os.remove(dir_entry.path) elif ( dir_entry.name.endswith(".ssc") or dir_entry.name.endswith(".sm") or dir_entry.name.endswith(".ogg") ): # These are expected pass else: # Some other extension not listed above print( f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" ) elif dir_entry.is_dir(): if dir_entry.name == "__bias-check": # nine-or-null directories can be removed shutil.rmtree(dir_entry.path) print( f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}" ) else: # Some other subdirectory - maybe mods? print( f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" ) def maybe_save_anonymized_files( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): if args.dry_run: print("Dry run - not saving files") return timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") de = "de" if args.deanonymized else "" output_path = f"{args.output}/{de}anonymized-{timestamp}" shutil.copytree(temp_fs.root_path, output_path) print(f"Saved to {os.path.abspath(output_path)}") ############### # Main method # ############### def main(argv: list[str]): raw_args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesRawArgs()) args = process_args(raw_args) assert_valid_file_paths(args) alias_parts = load_alias_parts("aliases/aliasparts.csv") csv_contents = load_csv_contents(args) assert_known_google_forms_columns_present(csv_contents) dynamic_columns = detect_dynamic_columns(csv_contents) # Generate & save CSV columns csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents) csv_contents_changed |= maybe_mark_resubmitted_entries(args, csv_contents) if csv_contents_changed: maybe_save_generated_columns(args, csv_contents) # Generate temporary CSV columns maybe_mark_unspecified_emails(args, csv_contents) temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) maybe_anonymize_entries(args, csv_contents, temp_fs) maybe_save_anonymized_files(args, csv_contents, temp_fs) if __name__ == "__main__": main(sys.argv)