diff --git a/anonymize_entries.py b/anonymize_entries.py index 65e942d..01587e7 100644 --- a/anonymize_entries.py +++ b/anonymize_entries.py @@ -21,10 +21,11 @@ from fs.zipfs import ZipFS from pathvalidate import sanitize_filename import simfile from simfile.dir import SimfilePack, SimfileDirectory +from simfile.notes import NoteData from simfile.sm import SMChart, SMSimfile from simfile.ssc import SSCChart, SSCSimfile from simfile.timing import BeatValues, BeatValue -from simfile.types import Simfile +from simfile.types import Simfile, Chart #################### @@ -38,7 +39,7 @@ class AnonymizeEntriesRawArgs: file_uploads: str | None deanonymized: bool dry_run: bool - emails: str + users: str output: str | None regenerate: bool seed: str @@ -112,10 +113,10 @@ def argparser(): help="skip anonymization of files, simply package them as-is", ) parser.add_argument( - "-e", - "--emails", + "-u", + "--users", type=str, - help="limit output to files from the specified emails (comma-separated)", + help="limit output to files from the specified users (comma-separated)", ) parser.add_argument( "-r", @@ -142,7 +143,7 @@ CsvContents = list[dict[str, str]] class KnownColumns(enum.StrEnum): Timestamp = "Timestamp" - EmailAddress = "Email Address" + UserId = "Your gamer tag/alias: (e.g. dimo)" GeneratedAlias = "Generated Alias" IgnoreFile = "Ignore File" # Not persisted: @@ -211,7 +212,7 @@ def assert_valid_file_paths(args: AnonymizeEntriesArgs): def load_csv_contents(args: AnonymizeEntriesArgs): - with open(args.csv, "r") as csvfile: + with open(args.csv, "r", encoding="utf-8") as csvfile: return list(csv.DictReader(csvfile)) @@ -232,8 +233,8 @@ def assert_known_google_forms_columns_present(csv_contents: CsvContents): KnownColumns.Timestamp in csv_contents[0] ), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column" assert ( - KnownColumns.EmailAddress in csv_contents[0] - ), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column" + KnownColumns.UserId in csv_contents[0] + ), f"Provided CSV file does not have an {repr(KnownColumns.UserId)} column" def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns: @@ -272,33 +273,31 @@ def maybe_generate_aliases( with open("aliases/suswords.txt", "r", encoding="utf-8") as suswords_file: suswords = set(line.rstrip() for line in suswords_file) - alias_to_email_address = {} + alias_to_user_id = {} seed = args.seed or args.csv for row in csv_contents: - rnd = Random(",".join([row[KnownColumns.EmailAddress], seed])) + rnd = Random(",".join([row[KnownColumns.UserId], seed])) while True: random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}" if ( - random_alias in alias_to_email_address - and alias_to_email_address[random_alias] - != row[KnownColumns.EmailAddress] + random_alias in alias_to_user_id + and alias_to_user_id[random_alias] != row[KnownColumns.UserId] ): print( - f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} is already being used by {alias_to_email_address[random_alias]}" + f"Rerolling alias for {row[KnownColumns.UserId]} because {repr(random_alias)} is already being used by {alias_to_user_id[random_alias]}" ) - elif random_alias in usedaliases: + elif random_alias.lower() in usedaliases: print( - f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} has already been used" + f"Rerolling alias for {row[KnownColumns.UserId]} because {repr(random_alias)} has already been used" ) elif any( random_part in suswords for random_part in random_alias.split(" ") ): print( - f"WARNING: alias for {row[KnownColumns.EmailAddress]} {repr(random_alias)} contains a sus word" + f"WARNING: alias for {row[KnownColumns.UserId]} {repr(random_alias)} contains a sus word" ) - break else: break row[KnownColumns.GeneratedAlias] = random_alias @@ -321,7 +320,7 @@ def maybe_mark_resubmitted_entries( resubmitted_total = 0 for loop_pass in ("find", "mark"): for row in csv_contents: - user = row[KnownColumns.EmailAddress] + user = row[KnownColumns.UserId] timestamp = parse_timestamp(row[KnownColumns.Timestamp]) if loop_pass == "find": if user in most_recent_entry_per_user: @@ -343,7 +342,7 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo if args.dry_run: print("Dry run - not writing generated columns back to CSV") else: - with open(args.csv, "w", newline="") as csvfile: + with open(args.csv, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) writer.writeheader() for row in csv_contents: @@ -351,29 +350,29 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo print("Wrote generated columns back to CSV") -def maybe_mark_unspecified_emails( +def maybe_mark_unspecified_user_ids( args: AnonymizeEntriesArgs, csv_contents: CsvContents ): - if not args.emails: + if not args.users: return unspecified_total = 0 specified_total = 0 - emails = set(args.emails.split(",")) + users = set(args.users.split(",")) for row in csv_contents: if not row[KnownColumns.IgnoreFile]: - if row[KnownColumns.EmailAddress] not in emails: + if row[KnownColumns.UserId] not in users: row[KnownColumns.IgnoreFile] = "unspecified" unspecified_total += 1 else: specified_total += 1 - assert specified_total > 0, "No responses were found from the specified emails" + assert specified_total > 0, "No responses were found from the specified users" s = "s" if specified_total != 1 else "" print( - f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others" + f"Processing {specified_total} file{s} for specified users & ignoring {unspecified_total} others" ) @@ -389,7 +388,7 @@ def extract_entries_to_temporary_folder( # Check all immediate subdirectories, followed by the root itself root = "/" contents = zip_fs.listdir(root) - subdirs = [item for item in contents if zip_fs.isdir(item)] + subdirs = [item for item in contents if zip_fs.isdir(item)] + [root] for subdir in subdirs: possible_path = fs.path.join(root, subdir) @@ -401,7 +400,7 @@ def extract_entries_to_temporary_folder( return (possible_path, possible_simfile_dir) raise RuntimeError( - "Unable to find a suitable simfile directory in the ZIP. " + "Unable to find a suitable simfile directory in ZIP. " "Make sure the simfile is no more than one directory deep, " 'e.g. contains "Simfile/simfile.ssc".' ) @@ -418,17 +417,23 @@ def extract_entries_to_temporary_folder( temp_fs = TempFS(identifier="dimocracy-voucher") for row in csv_contents: - if row[KnownColumns.IgnoreFile]: - continue - zip_absolute_path = os.path.join( - args.file_uploads, row[dynamic_columns.filename] - ) - if os.path.isfile(zip_absolute_path): - with open(zip_absolute_path, "rb") as zip_file: - zip_fs = ZipFS(zip_file) - row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) - else: - print("WARNING: {zip_absolute_path} not found - skipping") + try: + if row[KnownColumns.IgnoreFile]: + continue + zip_absolute_path = os.path.join( + args.file_uploads, row[dynamic_columns.filename] + ) + if os.path.isfile(zip_absolute_path): + with open(zip_absolute_path, "rb") as zip_file: + zip_fs = ZipFS(zip_file) + row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) + else: + print("WARNING: {zip_absolute_path} not found - skipping") + except: + print( + f"Exception encountered while processing row {row[KnownColumns.UserId]}" + ) + raise print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}") return temp_fs @@ -469,6 +474,39 @@ def maybe_anonymize_entries( print(f"Anonymized BPMs from {repr(bpm_str)} to {repr(str(bpm_values))}") return str(bpm_values) + def clean_up_difficulties(sf: Simfile): + charts_to_remove: list[Chart] = [] + chart_with_notes = None + + for _chart in sf.charts: + chart: Chart = _chart # typing workaround + + notedata = NoteData(_chart) + if next(iter(notedata), None) is None: + charts_to_remove.append(_chart) + continue + + if chart_with_notes is not None: + raise RuntimeError( + f"{canonical_filename} contains multiple charts with notes" + ) + chart_with_notes = chart + + if chart.difficulty != "Challenge": + print( + f"WARNING: forced difficulty of chart in {canonical_filename} to Challenge" + ) + chart.difficulty = "Challenge" + + if chart_with_notes is None: + raise RuntimeError(f"{canonical_filename} has no charts with notes") + + for chart_to_remove in charts_to_remove: + print( + f"WARNING: removing {chart_to_remove.difficulty} chart with no notes from {canonical_filename}" + ) + sm.charts.remove(chart_to_remove) + for row in csv_contents: if row[KnownColumns.IgnoreFile]: continue @@ -498,6 +536,8 @@ def maybe_anonymize_entries( sm.genre = "" sm.music = f"{canonical_filename}.ogg" sm.bpms = anonymize_bpms(sm.bpms) + clean_up_difficulties(sm) + for _chart in sm.charts: sm_chart: SMChart = _chart # typing workaround sm_chart.description = row[KnownColumns.GeneratedAlias] @@ -520,6 +560,7 @@ def maybe_anonymize_entries( ssc.discimage = "" ssc.labels = "" ssc.bpms = anonymize_bpms(ssc.bpms) + clean_up_difficulties(ssc) for _chart in ssc.charts: ssc_chart: SSCChart = _chart # typing workaround ssc_chart.description = "" @@ -537,8 +578,11 @@ def maybe_anonymize_entries( if dir_entry.is_file(): if ( dir_entry.name.endswith(".old") + or dir_entry.name.endswith(".sm~") + or dir_entry.name.endswith(".ssc~") or dir_entry.name.endswith(".txt") or dir_entry.name.endswith(".zip") + or dir_entry.name == ".DS_Store" ): # These are definitely safe to delete for distribution os.remove(dir_entry.path) @@ -604,7 +648,7 @@ def main(argv: list[str]): maybe_save_generated_columns(args, csv_contents) # Generate temporary CSV columns - maybe_mark_unspecified_emails(args, csv_contents) + maybe_mark_unspecified_user_ids(args, csv_contents) temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) maybe_anonymize_entries(args, csv_contents, temp_fs)