177 lines
5.1 KiB
Python
177 lines
5.1 KiB
Python
import argparse
|
|
import csv
|
|
import _csv
|
|
import os
|
|
from random import Random
|
|
import sys
|
|
from tempfile import TemporaryDirectory
|
|
from typing import Optional
|
|
from zipfile import ZipFile
|
|
|
|
from fs.zipfs import ZipFS
|
|
import simfile
|
|
|
|
|
|
####################
|
|
# Script arguments #
|
|
####################
|
|
|
|
|
|
class AnonymizeEntriesArgs:
|
|
"""Stores the command-line arguments for this script."""
|
|
|
|
csv: str
|
|
files: str
|
|
dry_run: bool
|
|
|
|
|
|
def argparser():
|
|
"""Get an ArgumentParser instance for this command-line script."""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("csv", type=str, help="path to the CSV file of form responses")
|
|
parser.add_argument(
|
|
"files", type=str, help="path to the directory of file responses"
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--dry-run",
|
|
action=argparse.BooleanOptionalAction,
|
|
help="preview changes without writing the file",
|
|
)
|
|
return parser
|
|
|
|
|
|
CsvContents = list[dict[str, str]]
|
|
|
|
|
|
################
|
|
# Script logic #
|
|
################
|
|
|
|
|
|
def assert_valid_file_paths(args: AnonymizeEntriesArgs):
|
|
assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file"
|
|
assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory"
|
|
|
|
|
|
def load_csv_contents(args: AnonymizeEntriesArgs):
|
|
with open(args.csv, "r") as csvfile:
|
|
return list(csv.DictReader(csvfile))
|
|
|
|
|
|
def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]:
|
|
def extract_alias_parts(csv: "_csv._reader"):
|
|
return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0)))
|
|
|
|
with open(csvpath, "r") as csvfile:
|
|
alias_parts = extract_alias_parts(csv.reader(csvfile))
|
|
|
|
print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts")
|
|
|
|
return alias_parts
|
|
|
|
|
|
def assert_known_google_forms_columns_present(csv_contents: CsvContents):
|
|
assert (
|
|
"Timestamp" in csv_contents[0]
|
|
), 'Provided CSV file does not have a "Timestamp" column'
|
|
assert (
|
|
"Email Address" in csv_contents[0]
|
|
), 'Provided CSV file does not have an "Email Address" column'
|
|
|
|
|
|
def detect_filename_column(csv_contents: CsvContents) -> str:
|
|
maybe_filename_columns = [
|
|
column for (column, value) in csv_contents[0].items() if value.endswith(".zip")
|
|
]
|
|
assert (
|
|
len(maybe_filename_columns) != 0
|
|
), 'First data row of provided CSV file has no cell ending in ".zip"'
|
|
assert (
|
|
len(maybe_filename_columns) == 1
|
|
), 'First data row of provided CSV file has multiple cells ending in ".zip"'
|
|
filename_column = maybe_filename_columns[0]
|
|
print(f"Detected filename column: {repr(filename_column)}")
|
|
return filename_column
|
|
|
|
|
|
def maybe_generate_and_persist_aliases(
|
|
args: AnonymizeEntriesArgs,
|
|
alias_parts: tuple[list[str], list[str]],
|
|
csv_contents: CsvContents,
|
|
):
|
|
reuse_aliases = "Generated Alias" in csv_contents[0]
|
|
|
|
if reuse_aliases:
|
|
print("Reusing generated aliases")
|
|
else:
|
|
for row in csv_contents:
|
|
random = Random("; ".join([row["Email Address"], args.csv, args.files]))
|
|
row["Generated Alias"] = (
|
|
f"{random.choice(alias_parts[0])} {random.choice(alias_parts[0])}"
|
|
)
|
|
print("Generated an alias for each entry")
|
|
|
|
if args.dry_run:
|
|
print("Dry run - not writing generated aliases back to CSV")
|
|
else:
|
|
with open(args.csv, "w", newline="") as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys())
|
|
writer.writeheader()
|
|
for row in csv_contents:
|
|
writer.writerow(row)
|
|
print("Wrote generated aliases back to CSV")
|
|
|
|
return 0
|
|
|
|
|
|
def maybe_remove_and_persist_resubmitted_entries(csv_contents: CsvContents):
|
|
print("STUB - maybe_remove_and_persist_resubmitted_entries")
|
|
|
|
|
|
def extract_entries_to_temporary_folder(
|
|
args: AnonymizeEntriesArgs, csv_contents: CsvContents
|
|
):
|
|
print("STUB - extract_entries_to_temporary_folder")
|
|
|
|
|
|
def anonymize_entries(
|
|
args: AnonymizeEntriesArgs,
|
|
csv_contents: CsvContents,
|
|
temp_dir: Optional[TemporaryDirectory],
|
|
):
|
|
print("STUB - anonymize_entries")
|
|
|
|
|
|
def zip_anonymized_entries(
|
|
args: AnonymizeEntriesArgs,
|
|
csv_contents: CsvContents,
|
|
temp_dir: Optional[TemporaryDirectory],
|
|
):
|
|
print("STUB - zip_anonymized_entries")
|
|
|
|
|
|
###############
|
|
# Main method #
|
|
###############
|
|
|
|
|
|
def main(argv: list[str]):
|
|
args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs())
|
|
assert_valid_file_paths(args)
|
|
alias_parts = load_alias_parts("aliasparts.csv")
|
|
csv_contents = load_csv_contents(args)
|
|
assert_known_google_forms_columns_present(csv_contents)
|
|
filename_column = detect_filename_column(csv_contents)
|
|
maybe_generate_and_persist_aliases(args, alias_parts, csv_contents)
|
|
|
|
# Everything above is implemented; everything below is a stub
|
|
|
|
maybe_remove_and_persist_resubmitted_entries(csv_contents)
|
|
temp_folder = extract_entries_to_temporary_folder(args, csv_contents)
|
|
anonymize_entries(args, csv_contents, temp_folder)
|
|
zip_anonymized_entries(args, csv_contents, temp_folder)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|