nicer cli args

This commit is contained in:
Ash Garcia 2024-08-11 22:59:51 -07:00
parent 2b1bf885a1
commit d029642e08

View file

@ -8,6 +8,8 @@ import os
from random import Random
import shutil
import sys
import textwrap
from typing import cast
from zipfile import ZipFile
import fs.path
@ -28,27 +30,104 @@ from simfile.types import Simfile
####################
class AnonymizeEntriesArgs:
class AnonymizeEntriesRawArgs:
data_dir: str | None
csv: str | None
file_uploads: str | None
deanonymized: bool
dry_run: bool
emails: str
output: str
regenerate: bool
seed: str
class AnonymizeEntriesArgs(AnonymizeEntriesRawArgs):
"""Stores the command-line arguments for this script."""
csv: str
files: str
dry_run: bool
file_uploads: str
def argparser():
"""Get an ArgumentParser instance for this command-line script."""
parser = argparse.ArgumentParser()
parser.add_argument("csv", type=str, help="path to the CSV file of form responses")
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(
"""\
example:
path/to/folder:
form_responses.csv
file_responses/
Upload A - User 1.zip
Upload B - User 2.zip
etc.
python ./anonymize_entries.py path/to/folder
OR
python ./anonymize_entries.py -c path/to/folder/form_responses.csv -f path/to/folder/file_responses
"""
),
)
parser.add_argument(
"files", type=str, help="path to the directory of file responses"
"data_dir",
nargs="?",
type=str,
help="directory containing both the CSV form data and the file responses (uploads)",
)
parser.add_argument(
"-c",
"--csv",
type=str,
help="override path to the CSV file of form responses",
)
parser.add_argument(
"-f",
"--file-uploads",
type=str,
help="override path to the directory of file responses (uploads)",
)
parser.add_argument(
"-d",
"--dry-run",
action=argparse.BooleanOptionalAction,
help="preview changes without writing the file",
help="do not create or modify any files",
)
parser.add_argument(
"-D",
"--deanonymized",
action=argparse.BooleanOptionalAction,
help="skip anonymization of files, simply package them as-is",
)
parser.add_argument(
"-e",
"--emails",
type=str,
help="limit output to files from the specified emails (comma-separated)",
)
parser.add_argument(
"-o",
"--output",
type=str,
default="output/",
help="output directory",
)
parser.add_argument(
"-r",
"--regenerate",
action=argparse.BooleanOptionalAction,
help="force-update generated CSV columns",
)
parser.add_argument(
"-s",
"--seed",
type=str,
help="specify random seed for alias generation (treat this like a password & change it for each round)",
)
return parser
@ -63,7 +142,7 @@ class KnownColumns(enum.StrEnum):
Timestamp = "Timestamp"
EmailAddress = "Email Address"
GeneratedAlias = "Generated Alias"
IgnoreResubmittedFile = "Ignore Resubmitted File"
IgnoreFile = "Ignore File"
# Not persisted:
ExtractedTo = "Extracted To"
@ -89,9 +168,34 @@ def canonical_simfile_filename(sm: Simfile) -> str:
################
def process_args(args: AnonymizeEntriesRawArgs) -> AnonymizeEntriesArgs:
if not args.csv or not args.file_uploads:
assert (
args.data_dir
), "Positional data_dir argument must be provided if --csv and --file-uploads are not both set"
for dir_entry in os.scandir(args.data_dir):
if not args.csv and dir_entry.is_file() and dir_entry.name.endswith(".csv"):
args.csv = dir_entry.path
if not args.file_uploads and dir_entry.is_dir():
if any(
subdir_entry.name.endswith(".zip")
for subdir_entry in os.scandir(dir_entry.path)
):
args.file_uploads = dir_entry.path
assert args.csv, "Unable to find a CSV file in the provided directory"
assert (
args.file_uploads
), "Unable to find a subdirectory containing ZIP files in the provided directory"
return cast(AnonymizeEntriesArgs, args)
def assert_valid_file_paths(args: AnonymizeEntriesArgs):
assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file"
assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory"
assert os.path.isdir(
args.file_uploads
), f"{repr(args.file_uploads)} is not a directory"
def load_csv_contents(args: AnonymizeEntriesArgs):
@ -140,7 +244,9 @@ def maybe_generate_aliases(
alias_parts: tuple[list[str], list[str]],
csv_contents: CsvContents,
) -> ChangedCsvContents:
reuse_aliases = KnownColumns.GeneratedAlias in csv_contents[0]
reuse_aliases = (
not args.regenerate and KnownColumns.GeneratedAlias in csv_contents[0]
)
if reuse_aliases:
print("Reusing generated aliases")
@ -148,8 +254,10 @@ def maybe_generate_aliases(
alias_to_email_address = {}
seed = args.seed or args.csv
for row in csv_contents:
rnd = Random("; ".join([row[KnownColumns.EmailAddress], args.csv, args.files]))
rnd = Random(",".join([row[KnownColumns.EmailAddress], seed]))
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}"
while (
random_alias in alias_to_email_address
@ -165,8 +273,12 @@ def maybe_generate_aliases(
return True
def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvContents:
reuse_resubmitted = KnownColumns.IgnoreResubmittedFile in csv_contents[0]
def maybe_mark_resubmitted_entries(
args: AnonymizeEntriesArgs, csv_contents: CsvContents
) -> ChangedCsvContents:
reuse_resubmitted = (
not args.regenerate and KnownColumns.IgnoreFile in csv_contents[0]
)
if reuse_resubmitted:
print("Reusing resubmitted files column")
return False
@ -185,9 +297,7 @@ def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvConte
most_recent_entry_per_user[user] = timestamp
elif loop_pass == "mark":
resubmitted = timestamp < most_recent_entry_per_user[user]
row[KnownColumns.IgnoreResubmittedFile] = (
"true" if resubmitted else ""
)
row[KnownColumns.IgnoreFile] = "resubmitted" if resubmitted else ""
if resubmitted:
resubmitted_total += 1
print(f"Marked {resubmitted_total} resubmitted files to be ignored")
@ -206,6 +316,32 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo
print("Wrote generated columns back to CSV")
def maybe_mark_unspecified_emails(
args: AnonymizeEntriesArgs, csv_contents: CsvContents
):
if not args.emails:
return
unspecified_total = 0
specified_total = 0
emails = set(args.emails.split(","))
for row in csv_contents:
if not row[KnownColumns.IgnoreFile]:
if row[KnownColumns.EmailAddress] not in emails:
row[KnownColumns.IgnoreFile] = "unspecified"
unspecified_total += 1
else:
specified_total += 1
assert specified_total > 0, "No responses were found from the specified emails"
s = "s" if specified_total != 1 else ""
print(
f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others"
)
def extract_entries_to_temporary_folder(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
@ -244,12 +380,14 @@ def extract_entries_to_temporary_folder(
copy_dir(zip_fs, zip_path, temp_fs, canonical_filename)
return canonical_filename
temp_fs = TempFS(identifier="dimocracy-voucher_anonymized")
temp_fs = TempFS(identifier="dimocracy-voucher")
for row in csv_contents:
if row[KnownColumns.IgnoreResubmittedFile]:
if row[KnownColumns.IgnoreFile]:
continue
zip_absolute_path = os.path.join(args.files, row[dynamic_columns.filename])
zip_absolute_path = os.path.join(
args.file_uploads, row[dynamic_columns.filename]
)
if os.path.isfile(zip_absolute_path):
with open(zip_absolute_path, "rb") as zip_file:
zip_fs = ZipFS(zip_file)
@ -261,11 +399,15 @@ def extract_entries_to_temporary_folder(
return temp_fs
def anonymize_entries(
def maybe_anonymize_entries(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
if args.deanonymized:
print("Deanonymized - skipping anonymization step")
return
def maybe_rename_file(absolute_path: str | None, canonical_filename: str):
if absolute_path and os.path.basename(absolute_path) != canonical_filename:
absolute_canonical_path = os.path.join(
@ -282,7 +424,7 @@ def anonymize_entries(
print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}")
for row in csv_contents:
if row[KnownColumns.IgnoreResubmittedFile]:
if row[KnownColumns.IgnoreFile]:
continue
absolute_simfile_dir_path = os.path.join(
@ -376,16 +518,17 @@ def anonymize_entries(
)
def save_anonymized_files(
def maybe_save_anonymized_files(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
if args.dry_run:
print("Dry run - not saving anonymized files")
print("Dry run - not saving files")
return
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f"./output/anonymized-{timestamp}"
de = "de" if args.deanonymized else ""
output_path = f"{args.output}/{de}anonymized-{timestamp}"
shutil.copytree(temp_fs.root_path, output_path)
print(f"Saved to {os.path.abspath(output_path)}")
@ -396,21 +539,26 @@ def save_anonymized_files(
def main(argv: list[str]):
args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs())
raw_args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesRawArgs())
args = process_args(raw_args)
assert_valid_file_paths(args)
alias_parts = load_alias_parts("aliasparts.csv")
csv_contents = load_csv_contents(args)
assert_known_google_forms_columns_present(csv_contents)
dynamic_columns = detect_dynamic_columns(csv_contents)
# Generate & save CSV columns
csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents)
csv_contents_changed |= maybe_mark_resubmitted_entries(csv_contents)
csv_contents_changed |= maybe_mark_resubmitted_entries(args, csv_contents)
if csv_contents_changed:
maybe_save_generated_columns(args, csv_contents)
# Generate temporary CSV columns
maybe_mark_unspecified_emails(args, csv_contents)
temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns)
anonymize_entries(args, csv_contents, temp_fs)
save_anonymized_files(args, csv_contents, temp_fs)
maybe_anonymize_entries(args, csv_contents, temp_fs)
maybe_save_anonymized_files(args, csv_contents, temp_fs)
if __name__ == "__main__":