diff --git a/anonymize_entries.py b/anonymize_entries.py index 64c11e3..ecab1aa 100644 --- a/anonymize_entries.py +++ b/anonymize_entries.py @@ -8,6 +8,8 @@ import os from random import Random import shutil import sys +import textwrap +from typing import cast from zipfile import ZipFile import fs.path @@ -28,27 +30,104 @@ from simfile.types import Simfile #################### -class AnonymizeEntriesArgs: +class AnonymizeEntriesRawArgs: + data_dir: str | None + csv: str | None + file_uploads: str | None + deanonymized: bool + dry_run: bool + emails: str + output: str + regenerate: bool + seed: str + + +class AnonymizeEntriesArgs(AnonymizeEntriesRawArgs): """Stores the command-line arguments for this script.""" csv: str - files: str - dry_run: bool + file_uploads: str def argparser(): """Get an ArgumentParser instance for this command-line script.""" - parser = argparse.ArgumentParser() - parser.add_argument("csv", type=str, help="path to the CSV file of form responses") + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent( + """\ + example: + + path/to/folder: + ├ form_responses.csv + └ file_responses/ + ├ Upload A - User 1.zip + ├ Upload B - User 2.zip + └ etc. + + python ./anonymize_entries.py path/to/folder + + OR + + python ./anonymize_entries.py -c path/to/folder/form_responses.csv -f path/to/folder/file_responses + """ + ), + ) parser.add_argument( - "files", type=str, help="path to the directory of file responses" + "data_dir", + nargs="?", + type=str, + help="directory containing both the CSV form data and the file responses (uploads)", + ) + parser.add_argument( + "-c", + "--csv", + type=str, + help="override path to the CSV file of form responses", + ) + parser.add_argument( + "-f", + "--file-uploads", + type=str, + help="override path to the directory of file responses (uploads)", ) parser.add_argument( "-d", "--dry-run", action=argparse.BooleanOptionalAction, - help="preview changes without writing the file", + help="do not create or modify any files", ) + parser.add_argument( + "-D", + "--deanonymized", + action=argparse.BooleanOptionalAction, + help="skip anonymization of files, simply package them as-is", + ) + parser.add_argument( + "-e", + "--emails", + type=str, + help="limit output to files from the specified emails (comma-separated)", + ) + parser.add_argument( + "-o", + "--output", + type=str, + default="output/", + help="output directory", + ) + parser.add_argument( + "-r", + "--regenerate", + action=argparse.BooleanOptionalAction, + help="force-update generated CSV columns", + ) + parser.add_argument( + "-s", + "--seed", + type=str, + help="specify random seed for alias generation (treat this like a password & change it for each round)", + ) + return parser @@ -63,7 +142,7 @@ class KnownColumns(enum.StrEnum): Timestamp = "Timestamp" EmailAddress = "Email Address" GeneratedAlias = "Generated Alias" - IgnoreResubmittedFile = "Ignore Resubmitted File" + IgnoreFile = "Ignore File" # Not persisted: ExtractedTo = "Extracted To" @@ -89,9 +168,34 @@ def canonical_simfile_filename(sm: Simfile) -> str: ################ +def process_args(args: AnonymizeEntriesRawArgs) -> AnonymizeEntriesArgs: + if not args.csv or not args.file_uploads: + assert ( + args.data_dir + ), "Positional data_dir argument must be provided if --csv and --file-uploads are not both set" + for dir_entry in os.scandir(args.data_dir): + if not args.csv and dir_entry.is_file() and dir_entry.name.endswith(".csv"): + args.csv = dir_entry.path + if not args.file_uploads and dir_entry.is_dir(): + if any( + subdir_entry.name.endswith(".zip") + for subdir_entry in os.scandir(dir_entry.path) + ): + args.file_uploads = dir_entry.path + + assert args.csv, "Unable to find a CSV file in the provided directory" + assert ( + args.file_uploads + ), "Unable to find a subdirectory containing ZIP files in the provided directory" + + return cast(AnonymizeEntriesArgs, args) + + def assert_valid_file_paths(args: AnonymizeEntriesArgs): assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file" - assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory" + assert os.path.isdir( + args.file_uploads + ), f"{repr(args.file_uploads)} is not a directory" def load_csv_contents(args: AnonymizeEntriesArgs): @@ -140,7 +244,9 @@ def maybe_generate_aliases( alias_parts: tuple[list[str], list[str]], csv_contents: CsvContents, ) -> ChangedCsvContents: - reuse_aliases = KnownColumns.GeneratedAlias in csv_contents[0] + reuse_aliases = ( + not args.regenerate and KnownColumns.GeneratedAlias in csv_contents[0] + ) if reuse_aliases: print("Reusing generated aliases") @@ -148,8 +254,10 @@ def maybe_generate_aliases( alias_to_email_address = {} + seed = args.seed or args.csv + for row in csv_contents: - rnd = Random("; ".join([row[KnownColumns.EmailAddress], args.csv, args.files])) + rnd = Random(",".join([row[KnownColumns.EmailAddress], seed])) random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}" while ( random_alias in alias_to_email_address @@ -165,8 +273,12 @@ def maybe_generate_aliases( return True -def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvContents: - reuse_resubmitted = KnownColumns.IgnoreResubmittedFile in csv_contents[0] +def maybe_mark_resubmitted_entries( + args: AnonymizeEntriesArgs, csv_contents: CsvContents +) -> ChangedCsvContents: + reuse_resubmitted = ( + not args.regenerate and KnownColumns.IgnoreFile in csv_contents[0] + ) if reuse_resubmitted: print("Reusing resubmitted files column") return False @@ -185,9 +297,7 @@ def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvConte most_recent_entry_per_user[user] = timestamp elif loop_pass == "mark": resubmitted = timestamp < most_recent_entry_per_user[user] - row[KnownColumns.IgnoreResubmittedFile] = ( - "true" if resubmitted else "" - ) + row[KnownColumns.IgnoreFile] = "resubmitted" if resubmitted else "" if resubmitted: resubmitted_total += 1 print(f"Marked {resubmitted_total} resubmitted files to be ignored") @@ -206,6 +316,32 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo print("Wrote generated columns back to CSV") +def maybe_mark_unspecified_emails( + args: AnonymizeEntriesArgs, csv_contents: CsvContents +): + if not args.emails: + return + + unspecified_total = 0 + specified_total = 0 + emails = set(args.emails.split(",")) + + for row in csv_contents: + if not row[KnownColumns.IgnoreFile]: + if row[KnownColumns.EmailAddress] not in emails: + row[KnownColumns.IgnoreFile] = "unspecified" + unspecified_total += 1 + else: + specified_total += 1 + + assert specified_total > 0, "No responses were found from the specified emails" + + s = "s" if specified_total != 1 else "" + print( + f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others" + ) + + def extract_entries_to_temporary_folder( args: AnonymizeEntriesArgs, csv_contents: CsvContents, @@ -244,12 +380,14 @@ def extract_entries_to_temporary_folder( copy_dir(zip_fs, zip_path, temp_fs, canonical_filename) return canonical_filename - temp_fs = TempFS(identifier="dimocracy-voucher_anonymized") + temp_fs = TempFS(identifier="dimocracy-voucher") for row in csv_contents: - if row[KnownColumns.IgnoreResubmittedFile]: + if row[KnownColumns.IgnoreFile]: continue - zip_absolute_path = os.path.join(args.files, row[dynamic_columns.filename]) + zip_absolute_path = os.path.join( + args.file_uploads, row[dynamic_columns.filename] + ) if os.path.isfile(zip_absolute_path): with open(zip_absolute_path, "rb") as zip_file: zip_fs = ZipFS(zip_file) @@ -261,11 +399,15 @@ def extract_entries_to_temporary_folder( return temp_fs -def anonymize_entries( +def maybe_anonymize_entries( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): + if args.deanonymized: + print("Deanonymized - skipping anonymization step") + return + def maybe_rename_file(absolute_path: str | None, canonical_filename: str): if absolute_path and os.path.basename(absolute_path) != canonical_filename: absolute_canonical_path = os.path.join( @@ -282,7 +424,7 @@ def anonymize_entries( print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}") for row in csv_contents: - if row[KnownColumns.IgnoreResubmittedFile]: + if row[KnownColumns.IgnoreFile]: continue absolute_simfile_dir_path = os.path.join( @@ -376,16 +518,17 @@ def anonymize_entries( ) -def save_anonymized_files( +def maybe_save_anonymized_files( args: AnonymizeEntriesArgs, csv_contents: CsvContents, temp_fs: TempFS, ): if args.dry_run: - print("Dry run - not saving anonymized files") + print("Dry run - not saving files") return timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") - output_path = f"./output/anonymized-{timestamp}" + de = "de" if args.deanonymized else "" + output_path = f"{args.output}/{de}anonymized-{timestamp}" shutil.copytree(temp_fs.root_path, output_path) print(f"Saved to {os.path.abspath(output_path)}") @@ -396,21 +539,26 @@ def save_anonymized_files( def main(argv: list[str]): - args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs()) + raw_args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesRawArgs()) + args = process_args(raw_args) assert_valid_file_paths(args) alias_parts = load_alias_parts("aliasparts.csv") csv_contents = load_csv_contents(args) assert_known_google_forms_columns_present(csv_contents) dynamic_columns = detect_dynamic_columns(csv_contents) + # Generate & save CSV columns csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents) - csv_contents_changed |= maybe_mark_resubmitted_entries(csv_contents) + csv_contents_changed |= maybe_mark_resubmitted_entries(args, csv_contents) if csv_contents_changed: maybe_save_generated_columns(args, csv_contents) + # Generate temporary CSV columns + maybe_mark_unspecified_emails(args, csv_contents) + temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) - anonymize_entries(args, csv_contents, temp_fs) - save_anonymized_files(args, csv_contents, temp_fs) + maybe_anonymize_entries(args, csv_contents, temp_fs) + maybe_save_anonymized_files(args, csv_contents, temp_fs) if __name__ == "__main__":