From d91e90b145744da64c142a468abefb05f9d1b2dc Mon Sep 17 00:00:00 2001 From: Ash Garcia Date: Sun, 11 Aug 2024 16:49:09 -0700 Subject: [PATCH] more implementation work --- anonymize-entries.py | 177 ------------------- anonymize_entries.py | 403 +++++++++++++++++++++++++++++++++++++++++++ poetry.lock | 17 +- pyproject.toml | 1 + 4 files changed, 420 insertions(+), 178 deletions(-) delete mode 100644 anonymize-entries.py create mode 100644 anonymize_entries.py diff --git a/anonymize-entries.py b/anonymize-entries.py deleted file mode 100644 index cbcd662..0000000 --- a/anonymize-entries.py +++ /dev/null @@ -1,177 +0,0 @@ -import argparse -import csv -import _csv -import os -from random import Random -import sys -from tempfile import TemporaryDirectory -from typing import Optional -from zipfile import ZipFile - -from fs.zipfs import ZipFS -import simfile - - -#################### -# Script arguments # -#################### - - -class AnonymizeEntriesArgs: - """Stores the command-line arguments for this script.""" - - csv: str - files: str - dry_run: bool - - -def argparser(): - """Get an ArgumentParser instance for this command-line script.""" - parser = argparse.ArgumentParser() - parser.add_argument("csv", type=str, help="path to the CSV file of form responses") - parser.add_argument( - "files", type=str, help="path to the directory of file responses" - ) - parser.add_argument( - "-d", - "--dry-run", - action=argparse.BooleanOptionalAction, - help="preview changes without writing the file", - ) - return parser - - -CsvContents = list[dict[str, str]] - - -################ -# Script logic # -################ - - -def assert_valid_file_paths(args: AnonymizeEntriesArgs): - assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file" - assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory" - - -def load_csv_contents(args: AnonymizeEntriesArgs): - with open(args.csv, "r") as csvfile: - return list(csv.DictReader(csvfile)) - - -def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]: - def extract_alias_parts(csv: "_csv._reader"): - return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0))) - - with open(csvpath, "r") as csvfile: - alias_parts = extract_alias_parts(csv.reader(csvfile)) - - print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts") - - return alias_parts - - -def assert_known_google_forms_columns_present(csv_contents: CsvContents): - assert ( - "Timestamp" in csv_contents[0] - ), 'Provided CSV file does not have a "Timestamp" column' - assert ( - "Email Address" in csv_contents[0] - ), 'Provided CSV file does not have an "Email Address" column' - - -def detect_filename_column(csv_contents: CsvContents) -> str: - maybe_filename_columns = [ - column for (column, value) in csv_contents[0].items() if value.endswith(".zip") - ] - assert ( - len(maybe_filename_columns) != 0 - ), 'First data row of provided CSV file has no cell ending in ".zip"' - assert ( - len(maybe_filename_columns) == 1 - ), 'First data row of provided CSV file has multiple cells ending in ".zip"' - filename_column = maybe_filename_columns[0] - print(f"Detected filename column: {repr(filename_column)}") - return filename_column - - -def maybe_generate_and_persist_aliases( - args: AnonymizeEntriesArgs, - alias_parts: tuple[list[str], list[str]], - csv_contents: CsvContents, -): - reuse_aliases = "Generated Alias" in csv_contents[0] - - if reuse_aliases: - print("Reusing generated aliases") - else: - for row in csv_contents: - random = Random("; ".join([row["Email Address"], args.csv, args.files])) - row["Generated Alias"] = ( - f"{random.choice(alias_parts[0])} {random.choice(alias_parts[0])}" - ) - print("Generated an alias for each entry") - - if args.dry_run: - print("Dry run - not writing generated aliases back to CSV") - else: - with open(args.csv, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) - writer.writeheader() - for row in csv_contents: - writer.writerow(row) - print("Wrote generated aliases back to CSV") - - return 0 - - -def maybe_remove_and_persist_resubmitted_entries(csv_contents: CsvContents): - print("STUB - maybe_remove_and_persist_resubmitted_entries") - - -def extract_entries_to_temporary_folder( - args: AnonymizeEntriesArgs, csv_contents: CsvContents -): - print("STUB - extract_entries_to_temporary_folder") - - -def anonymize_entries( - args: AnonymizeEntriesArgs, - csv_contents: CsvContents, - temp_dir: Optional[TemporaryDirectory], -): - print("STUB - anonymize_entries") - - -def zip_anonymized_entries( - args: AnonymizeEntriesArgs, - csv_contents: CsvContents, - temp_dir: Optional[TemporaryDirectory], -): - print("STUB - zip_anonymized_entries") - - -############### -# Main method # -############### - - -def main(argv: list[str]): - args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs()) - assert_valid_file_paths(args) - alias_parts = load_alias_parts("aliasparts.csv") - csv_contents = load_csv_contents(args) - assert_known_google_forms_columns_present(csv_contents) - filename_column = detect_filename_column(csv_contents) - maybe_generate_and_persist_aliases(args, alias_parts, csv_contents) - - # Everything above is implemented; everything below is a stub - - maybe_remove_and_persist_resubmitted_entries(csv_contents) - temp_folder = extract_entries_to_temporary_folder(args, csv_contents) - anonymize_entries(args, csv_contents, temp_folder) - zip_anonymized_entries(args, csv_contents, temp_folder) - - -if __name__ == "__main__": - main(sys.argv) diff --git a/anonymize_entries.py b/anonymize_entries.py new file mode 100644 index 0000000..79f846a --- /dev/null +++ b/anonymize_entries.py @@ -0,0 +1,403 @@ +import argparse +import csv +import _csv +from dataclasses import dataclass +from datetime import datetime +import enum +import os +from random import Random +import shutil +import sys +from zipfile import ZipFile + +import fs.path +from fs.base import FS +from fs.copy import copy_dir +from fs.tempfs import TempFS +from fs.zipfs import ZipFS +from pathvalidate import sanitize_filename +import simfile +from simfile.dir import SimfilePack, SimfileDirectory +from simfile.sm import SMChart, SMSimfile +from simfile.ssc import SSCChart, SSCSimfile +from simfile.types import Simfile + + +#################### +# Script arguments # +#################### + + +class AnonymizeEntriesArgs: + """Stores the command-line arguments for this script.""" + + csv: str + files: str + dry_run: bool + + +def argparser(): + """Get an ArgumentParser instance for this command-line script.""" + parser = argparse.ArgumentParser() + parser.add_argument("csv", type=str, help="path to the CSV file of form responses") + parser.add_argument( + "files", type=str, help="path to the directory of file responses" + ) + parser.add_argument( + "-d", + "--dry-run", + action=argparse.BooleanOptionalAction, + help="preview changes without writing the file", + ) + return parser + + +CsvContents = list[dict[str, str]] + +##################### +# Utility functions # +##################### + + +class KnownColumns(enum.StrEnum): + Timestamp = "Timestamp" + EmailAddress = "Email Address" + GeneratedAlias = "Generated Alias" + IgnoreResubmittedFile = "Ignore Resubmitted File" + # Not persisted: + ExtractedTo = "Extracted To" + + +@dataclass +class DynamicColumns: + filename: str + + +ChangedCsvContents = bool + + +def parse_timestamp(timestamp: str): + return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S") + + +def canonical_simfile_filename(sm: Simfile) -> str: + return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip()) + + +################ +# Script logic # +################ + + +def assert_valid_file_paths(args: AnonymizeEntriesArgs): + assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file" + assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory" + + +def load_csv_contents(args: AnonymizeEntriesArgs): + with open(args.csv, "r") as csvfile: + return list(csv.DictReader(csvfile)) + + +def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]: + def extract_alias_parts(csv: "_csv._reader"): + return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0))) + + with open(csvpath, "r") as csvfile: + alias_parts = extract_alias_parts(csv.reader(csvfile)) + + print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts") + + return alias_parts + + +def assert_known_google_forms_columns_present(csv_contents: CsvContents): + assert ( + KnownColumns.Timestamp in csv_contents[0] + ), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column" + assert ( + KnownColumns.EmailAddress in csv_contents[0] + ), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column" + + +def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns: + maybe_filename_columns = [ + column for (column, value) in csv_contents[0].items() if value.endswith(".zip") + ] + assert ( + len(maybe_filename_columns) != 0 + ), 'First data row of provided CSV file has no cell ending in ".zip"' + assert ( + len(maybe_filename_columns) == 1 + ), 'First data row of provided CSV file has multiple cells ending in ".zip"' + filename_column = maybe_filename_columns[0] + print(f"Detected filename column: {repr(filename_column)}") + return DynamicColumns(filename=filename_column) + + +def maybe_generate_aliases( + args: AnonymizeEntriesArgs, + alias_parts: tuple[list[str], list[str]], + csv_contents: CsvContents, +) -> ChangedCsvContents: + reuse_aliases = KnownColumns.GeneratedAlias in csv_contents[0] + + if reuse_aliases: + print("Reusing generated aliases") + return False + else: + for row in csv_contents: + random = Random( + "; ".join([row[KnownColumns.EmailAddress], args.csv, args.files]) + ) + row[KnownColumns.GeneratedAlias] = ( + f"{random.choice(alias_parts[0])} {random.choice(alias_parts[0])}" + ) + print("Generated an alias for each entry") + return True + + +def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvContents: + reuse_resubmitted = KnownColumns.IgnoreResubmittedFile in csv_contents[0] + if reuse_resubmitted: + print("Reusing resubmitted files column") + return False + else: + most_recent_entry_per_user = {} + resubmitted_total = 0 + for loop_pass in ("find", "mark"): + for row in csv_contents: + user = row[KnownColumns.EmailAddress] + timestamp = parse_timestamp(row[KnownColumns.Timestamp]) + if loop_pass == "find": + if user in most_recent_entry_per_user: + if timestamp > most_recent_entry_per_user[user]: + most_recent_entry_per_user[user] = timestamp + else: + most_recent_entry_per_user[user] = timestamp + elif loop_pass == "mark": + resubmitted = timestamp < most_recent_entry_per_user[user] + row[KnownColumns.IgnoreResubmittedFile] = ( + "true" if resubmitted else "" + ) + if resubmitted: + resubmitted_total += 1 + print(f"Marked {resubmitted_total} resubmitted files to be ignored") + return True + + +def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents): + if args.dry_run: + print("Dry run - not writing generated columns back to CSV") + else: + with open(args.csv, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) + writer.writeheader() + for row in csv_contents: + writer.writerow(row) + print("Wrote generated columns back to CSV") + + +def extract_entries_to_temporary_folder( + args: AnonymizeEntriesArgs, + csv_contents: CsvContents, + dynamic_columns: DynamicColumns, +) -> TempFS: + + def find_simfile_dir_zip_path( + zip_fs: FS, + ) -> tuple[str, SimfileDirectory]: + # Check all immediate subdirectories, followed by the root itself + root = "/" + contents = zip_fs.listdir(root) + subdirs = [item for item in contents if zip_fs.isdir(item)] + + for subdir in subdirs: + possible_path = fs.path.join(root, subdir) + possible_simfile_dir = SimfileDirectory( + possible_path, + filesystem=zip_fs, + ) + if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path: + return (possible_path, possible_simfile_dir) + + raise RuntimeError( + "Unable to find a suitable simfile directory in the ZIP. " + "Make sure the simfile is no more than one directory deep, " + 'e.g. contains "Simfile/simfile.ssc".' + ) + + def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str: + zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs) + canonical_filename = canonical_simfile_filename(simfile_dir.open()) + assert not temp_fs.exists( + canonical_filename + ), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder" + copy_dir(zip_fs, zip_path, temp_fs, canonical_filename) + return canonical_filename + + temp_fs = TempFS(identifier="dimocracy-voucher_anonymized", auto_clean=False) + + for row in csv_contents: + if row[KnownColumns.IgnoreResubmittedFile]: + continue + zip_absolute_path = os.path.join(args.files, row[dynamic_columns.filename]) + if os.path.isfile(zip_absolute_path): + with open(zip_absolute_path, "rb") as zip_file: + zip_fs = ZipFS(zip_file) + row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) + else: + print("WARNING: {zip_absolute_path} not found - skipping") + + print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}") + return temp_fs + + +def anonymize_entries( + args: AnonymizeEntriesArgs, + csv_contents: CsvContents, + temp_fs: TempFS, +): + def maybe_rename_file(absolute_path: str | None, canonical_filename: str): + if absolute_path and os.path.basename(absolute_path) != canonical_filename: + absolute_canonical_path = os.path.join( + os.path.dirname(absolute_path), canonical_filename + ) + os.rename(absolute_path, absolute_canonical_path) + print( + f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}" + ) + + def maybe_delete_file(absolute_path: str | None): + if absolute_path and os.path.isfile(absolute_path): + os.remove(absolute_path) + print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}") + + for row in csv_contents: + if row[KnownColumns.IgnoreResubmittedFile]: + continue + + absolute_simfile_dir_path = os.path.join( + temp_fs.root_path, row[KnownColumns.ExtractedTo] + ) + simfile_dir = SimfileDirectory(absolute_simfile_dir_path) + canonical_filename = canonical_simfile_filename(simfile_dir.open()) + + assets = simfile_dir.assets() + maybe_rename_file(assets.music, f"{canonical_filename}.ogg") + maybe_delete_file(assets.background) + maybe_delete_file(assets.banner) + maybe_delete_file(assets.cdimage) + maybe_delete_file(assets.cdtitle) + maybe_delete_file(assets.disc) + maybe_delete_file(assets.jacket) + + if simfile_dir.sm_path: + with simfile.mutate(simfile_dir.sm_path) as sm: + assert isinstance(sm, SMSimfile) + sm.credit = row[KnownColumns.GeneratedAlias] + sm.background = "" + sm.banner = "" + sm.cdtitle = "" + sm.genre = "" + sm.music = f"{canonical_filename}.ogg" + for _chart in sm.charts: + sm_chart: SMChart = _chart # typing workaround + sm_chart.description = row[KnownColumns.GeneratedAlias] + maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm") + print( + f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm" + ) + + if simfile_dir.ssc_path: + with simfile.mutate(simfile_dir.ssc_path) as ssc: + assert isinstance(ssc, SSCSimfile) + ssc.credit = row[KnownColumns.GeneratedAlias] + ssc.music = f"{canonical_filename}.ogg" + ssc.background = "" + ssc.banner = "" + ssc.cdtitle = "" + ssc.genre = "" + ssc.jacket = "" + ssc.cdimage = "" + ssc.discimage = "" + ssc.labels = "" + for _chart in ssc.charts: + ssc_chart: SSCChart = _chart # typing workaround + ssc_chart.description = "" + ssc_chart.chartname = "" + ssc_chart.chartstyle = "" + ssc_chart.credit = row[KnownColumns.GeneratedAlias] + maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc") + print( + f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc" + ) + + for dir_entry in os.scandir(absolute_simfile_dir_path): + if dir_entry.is_file(): + if ( + dir_entry.name.endswith(".old") + or dir_entry.name.endswith(".txt") + or dir_entry.name.endswith(".zip") + ): + # These are definitely safe to delete for distribution + os.remove(dir_entry.path) + elif ( + dir_entry.name.endswith(".ssc") + or dir_entry.name.endswith(".sm") + or dir_entry.name.endswith(".ogg") + ): + # These are expected + pass + else: + # Some other extension not listed above + print( + f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" + ) + elif dir_entry.is_dir(): + if dir_entry.name == "__bias-check": + # nine-or-null directories can be removed + shutil.rmtree(dir_entry.path) + print( + f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}" + ) + else: + # Some other subdirectory - maybe mods? + print( + f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone" + ) + + +def zip_anonymized_entries( + args: AnonymizeEntriesArgs, + csv_contents: CsvContents, + temp_fs: TempFS, +): + print("STUB - zip_anonymized_entries") + + +############### +# Main method # +############### + + +def main(argv: list[str]): + args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs()) + assert_valid_file_paths(args) + alias_parts = load_alias_parts("aliasparts.csv") + csv_contents = load_csv_contents(args) + assert_known_google_forms_columns_present(csv_contents) + dynamic_columns = detect_dynamic_columns(csv_contents) + + csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents) + csv_contents_changed |= maybe_mark_resubmitted_entries(csv_contents) + if csv_contents_changed: + maybe_save_generated_columns(args, csv_contents) + + temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) + anonymize_entries(args, csv_contents, temp_fs) + zip_anonymized_entries(args, csv_contents, temp_fs) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/poetry.lock b/poetry.lock index e3fa936..1888a9b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -143,6 +143,21 @@ files = [ {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, ] +[[package]] +name = "pathvalidate" +version = "3.2.0" +description = "pathvalidate is a Python library to sanitize/validate a string such as filenames/file-paths/etc." +optional = false +python-versions = ">=3.7" +files = [ + {file = "pathvalidate-3.2.0-py3-none-any.whl", hash = "sha256:cc593caa6299b22b37f228148257997e2fa850eea2daf7e4cc9205cef6908dee"}, + {file = "pathvalidate-3.2.0.tar.gz", hash = "sha256:5e8378cf6712bff67fbe7a8307d99fa8c1a0cb28aa477056f8fc374f0dff24ad"}, +] + +[package.extras] +docs = ["Sphinx (>=2.4)", "sphinx-rtd-theme (>=1.2.2)", "urllib3 (<2)"] +test = ["Faker (>=1.0.8)", "allpairspy (>=2)", "click (>=6.2)", "pytest (>=6.0.1)", "pytest-discord (>=0.1.4)", "pytest-md-report (>=0.4.1)"] + [[package]] name = "platformdirs" version = "4.2.2" @@ -204,4 +219,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "130aa8886dcc190a88f8094e7e74aefbddad10ce60ab3d63d74c126bf6b5cc04" +content-hash = "cb0ec3bd6d2ec3fb7296e6dd4801035c044c88cc2e7da894954d5805603b9fee" diff --git a/pyproject.toml b/pyproject.toml index c23d92e..510eaa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.11" simfile = "^2.1.1" +pathvalidate = "^3.2.0" [tool.poetry.group.dev.dependencies] black = "^24.8.0"