dimocracy-voucher/anonymize_entries.py

575 lines
20 KiB
Python

import argparse
import csv
import _csv
from dataclasses import dataclass
from datetime import datetime
import enum
import os
from random import Random
import shutil
import sys
import textwrap
from typing import cast
from zipfile import ZipFile
import fs.path
from fs.base import FS
from fs.copy import copy_dir
from fs.tempfs import TempFS
from fs.zipfs import ZipFS
from pathvalidate import sanitize_filename
import simfile
from simfile.dir import SimfilePack, SimfileDirectory
from simfile.sm import SMChart, SMSimfile
from simfile.ssc import SSCChart, SSCSimfile
from simfile.types import Simfile
####################
# Script arguments #
####################
class AnonymizeEntriesRawArgs:
data_dir: str
csv: str | None
file_uploads: str | None
deanonymized: bool
dry_run: bool
emails: str
output: str | None
regenerate: bool
seed: str
class AnonymizeEntriesArgs(AnonymizeEntriesRawArgs):
"""Stores the command-line arguments for this script."""
csv: str
file_uploads: str
output: str
def argparser():
"""Get an ArgumentParser instance for this command-line script."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(
"""\
example:
path/to/folder:
├ form_responses.csv
└ file_responses/
├ Upload A - User 1.zip
├ Upload B - User 2.zip
└ etc.
python ./anonymize_entries.py path/to/folder
OR
python ./anonymize_entries.py -c path/to/folder/form_responses.csv -f path/to/folder/file_responses
"""
),
)
parser.add_argument(
"data_dir",
type=str,
help="working directory - used to find the form responses CSV, file responses directory, and for output",
)
parser.add_argument(
"-c",
"--csv",
type=str,
help="override CSV form responses path (defaults to first file matching {data_dir}/*.csv)",
)
parser.add_argument(
"-f",
"--file-uploads",
type=str,
help="override file responses directory path (defaults to first subdirectory matching {data_dir}/*/*.zip)",
)
parser.add_argument(
"-o",
"--output",
type=str,
help="override output path (defaults to {data_dir}/output)",
)
parser.add_argument(
"-d",
"--dry-run",
action=argparse.BooleanOptionalAction,
help="do not create or modify any files",
)
parser.add_argument(
"-D",
"--deanonymized",
action=argparse.BooleanOptionalAction,
help="skip anonymization of files, simply package them as-is",
)
parser.add_argument(
"-e",
"--emails",
type=str,
help="limit output to files from the specified emails (comma-separated)",
)
parser.add_argument(
"-r",
"--regenerate",
action=argparse.BooleanOptionalAction,
help="force-update generated CSV columns",
)
parser.add_argument(
"-s",
"--seed",
type=str,
help="specify random seed for alias generation (treat this like a password & change it for each round)",
)
return parser
CsvContents = list[dict[str, str]]
#####################
# Utility functions #
#####################
class KnownColumns(enum.StrEnum):
Timestamp = "Timestamp"
EmailAddress = "Email Address"
GeneratedAlias = "Generated Alias"
IgnoreFile = "Ignore File"
# Not persisted:
ExtractedTo = "Extracted To"
@dataclass
class DynamicColumns:
filename: str
ChangedCsvContents = bool
def parse_timestamp(timestamp: str):
return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S")
def canonical_simfile_filename(sm: Simfile) -> str:
return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip())
################
# Script logic #
################
def process_args(args: AnonymizeEntriesRawArgs) -> AnonymizeEntriesArgs:
if not args.csv or not args.file_uploads:
assert (
args.data_dir
), "Positional data_dir argument must be provided if --csv and --file-uploads are not both set"
for dir_entry in os.scandir(args.data_dir):
if not args.csv and dir_entry.is_file() and dir_entry.name.endswith(".csv"):
args.csv = dir_entry.path
print(
f"Using {repr(dir_entry.name)} for form responses (override with --csv)"
)
if not args.file_uploads and dir_entry.is_dir():
if any(
subdir_entry.name.endswith(".zip")
for subdir_entry in os.scandir(dir_entry.path)
):
args.file_uploads = dir_entry.path
print(
f"Using {repr(dir_entry.name)} for file responses (override with --file-uploads)"
)
if not args.output:
args.output = os.path.join(args.data_dir, "output")
print(f"Using {args.output} for output (override with --output)")
assert args.csv, "Unable to find a CSV file in the provided directory"
assert (
args.file_uploads
), "Unable to find a subdirectory containing ZIP files in the provided directory"
return cast(AnonymizeEntriesArgs, args)
def assert_valid_file_paths(args: AnonymizeEntriesArgs):
assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file"
assert os.path.isdir(
args.file_uploads
), f"{repr(args.file_uploads)} is not a directory"
def load_csv_contents(args: AnonymizeEntriesArgs):
with open(args.csv, "r") as csvfile:
return list(csv.DictReader(csvfile))
def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]:
def extract_alias_parts(csv: "_csv._reader"):
return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0)))
with open(csvpath, "r") as csvfile:
alias_parts = extract_alias_parts(csv.reader(csvfile))
print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts")
return alias_parts
def assert_known_google_forms_columns_present(csv_contents: CsvContents):
assert (
KnownColumns.Timestamp in csv_contents[0]
), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column"
assert (
KnownColumns.EmailAddress in csv_contents[0]
), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column"
def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns:
maybe_filename_columns = [
column for (column, value) in csv_contents[0].items() if value.endswith(".zip")
]
assert (
len(maybe_filename_columns) != 0
), 'First data row of provided CSV file has no cell ending in ".zip"'
assert (
len(maybe_filename_columns) == 1
), 'First data row of provided CSV file has multiple cells ending in ".zip"'
filename_column = maybe_filename_columns[0]
print(f"Detected filename column: {repr(filename_column)}")
return DynamicColumns(filename=filename_column)
def maybe_generate_aliases(
args: AnonymizeEntriesArgs,
alias_parts: tuple[list[str], list[str]],
csv_contents: CsvContents,
) -> ChangedCsvContents:
reuse_aliases = (
not args.regenerate and KnownColumns.GeneratedAlias in csv_contents[0]
)
if reuse_aliases:
print("Reusing generated aliases")
return False
alias_to_email_address = {}
seed = args.seed or args.csv
for row in csv_contents:
rnd = Random(",".join([row[KnownColumns.EmailAddress], seed]))
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}"
while (
random_alias in alias_to_email_address
and alias_to_email_address[random_alias] != row[KnownColumns.EmailAddress]
):
print(
f"WARNING: rerolling alias for {row[KnownColumns.EmailAddress]} due to collision with {alias_to_email_address[random_alias]}"
)
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}"
row[KnownColumns.GeneratedAlias] = random_alias
print("Generated an alias for each entry")
return True
def maybe_mark_resubmitted_entries(
args: AnonymizeEntriesArgs, csv_contents: CsvContents
) -> ChangedCsvContents:
reuse_resubmitted = (
not args.regenerate and KnownColumns.IgnoreFile in csv_contents[0]
)
if reuse_resubmitted:
print("Reusing resubmitted files column")
return False
else:
most_recent_entry_per_user = {}
resubmitted_total = 0
for loop_pass in ("find", "mark"):
for row in csv_contents:
user = row[KnownColumns.EmailAddress]
timestamp = parse_timestamp(row[KnownColumns.Timestamp])
if loop_pass == "find":
if user in most_recent_entry_per_user:
if timestamp > most_recent_entry_per_user[user]:
most_recent_entry_per_user[user] = timestamp
else:
most_recent_entry_per_user[user] = timestamp
elif loop_pass == "mark":
resubmitted = timestamp < most_recent_entry_per_user[user]
row[KnownColumns.IgnoreFile] = "resubmitted" if resubmitted else ""
if resubmitted:
resubmitted_total += 1
s = "" if resubmitted_total == 1 else "s"
print(f"Marked {resubmitted_total} resubmitted file{s} to be ignored")
return True
def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents):
if args.dry_run:
print("Dry run - not writing generated columns back to CSV")
else:
with open(args.csv, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys())
writer.writeheader()
for row in csv_contents:
writer.writerow(row)
print("Wrote generated columns back to CSV")
def maybe_mark_unspecified_emails(
args: AnonymizeEntriesArgs, csv_contents: CsvContents
):
if not args.emails:
return
unspecified_total = 0
specified_total = 0
emails = set(args.emails.split(","))
for row in csv_contents:
if not row[KnownColumns.IgnoreFile]:
if row[KnownColumns.EmailAddress] not in emails:
row[KnownColumns.IgnoreFile] = "unspecified"
unspecified_total += 1
else:
specified_total += 1
assert specified_total > 0, "No responses were found from the specified emails"
s = "s" if specified_total != 1 else ""
print(
f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others"
)
def extract_entries_to_temporary_folder(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
dynamic_columns: DynamicColumns,
) -> TempFS:
def find_simfile_dir_zip_path(
zip_fs: FS,
) -> tuple[str, SimfileDirectory]:
# Check all immediate subdirectories, followed by the root itself
root = "/"
contents = zip_fs.listdir(root)
subdirs = [item for item in contents if zip_fs.isdir(item)]
for subdir in subdirs:
possible_path = fs.path.join(root, subdir)
possible_simfile_dir = SimfileDirectory(
possible_path,
filesystem=zip_fs,
)
if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path:
return (possible_path, possible_simfile_dir)
raise RuntimeError(
"Unable to find a suitable simfile directory in the ZIP. "
"Make sure the simfile is no more than one directory deep, "
'e.g. contains "Simfile/simfile.ssc".'
)
def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str:
zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs)
canonical_filename = canonical_simfile_filename(simfile_dir.open())
assert not temp_fs.exists(
canonical_filename
), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder"
copy_dir(zip_fs, zip_path, temp_fs, canonical_filename)
return canonical_filename
temp_fs = TempFS(identifier="dimocracy-voucher")
for row in csv_contents:
if row[KnownColumns.IgnoreFile]:
continue
zip_absolute_path = os.path.join(
args.file_uploads, row[dynamic_columns.filename]
)
if os.path.isfile(zip_absolute_path):
with open(zip_absolute_path, "rb") as zip_file:
zip_fs = ZipFS(zip_file)
row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs)
else:
print("WARNING: {zip_absolute_path} not found - skipping")
print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}")
return temp_fs
def maybe_anonymize_entries(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
if args.deanonymized:
print("Deanonymized - skipping anonymization step")
return
def maybe_rename_file(absolute_path: str | None, canonical_filename: str):
if absolute_path and os.path.basename(absolute_path) != canonical_filename:
absolute_canonical_path = os.path.join(
os.path.dirname(absolute_path), canonical_filename
)
os.rename(absolute_path, absolute_canonical_path)
print(
f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}"
)
def maybe_delete_file(absolute_path: str | None):
if absolute_path and os.path.isfile(absolute_path):
os.remove(absolute_path)
print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}")
for row in csv_contents:
if row[KnownColumns.IgnoreFile]:
continue
absolute_simfile_dir_path = os.path.join(
temp_fs.root_path, row[KnownColumns.ExtractedTo]
)
simfile_dir = SimfileDirectory(absolute_simfile_dir_path)
canonical_filename = canonical_simfile_filename(simfile_dir.open())
assets = simfile_dir.assets()
maybe_rename_file(assets.music, f"{canonical_filename}.ogg")
maybe_delete_file(assets.background)
maybe_delete_file(assets.banner)
maybe_delete_file(assets.cdimage)
maybe_delete_file(assets.cdtitle)
maybe_delete_file(assets.disc)
maybe_delete_file(assets.jacket)
if simfile_dir.sm_path:
with simfile.mutate(simfile_dir.sm_path) as sm:
assert isinstance(sm, SMSimfile)
sm.credit = row[KnownColumns.GeneratedAlias]
sm.background = ""
sm.banner = ""
sm.cdtitle = ""
sm.genre = ""
sm.music = f"{canonical_filename}.ogg"
for _chart in sm.charts:
sm_chart: SMChart = _chart # typing workaround
sm_chart.description = row[KnownColumns.GeneratedAlias]
maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm")
print(
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm"
)
if simfile_dir.ssc_path:
with simfile.mutate(simfile_dir.ssc_path) as ssc:
assert isinstance(ssc, SSCSimfile)
ssc.credit = row[KnownColumns.GeneratedAlias]
ssc.music = f"{canonical_filename}.ogg"
ssc.background = ""
ssc.banner = ""
ssc.cdtitle = ""
ssc.genre = ""
ssc.jacket = ""
ssc.cdimage = ""
ssc.discimage = ""
ssc.labels = ""
for _chart in ssc.charts:
ssc_chart: SSCChart = _chart # typing workaround
ssc_chart.description = ""
ssc_chart.chartname = ""
ssc_chart.chartstyle = ""
ssc_chart.credit = row[KnownColumns.GeneratedAlias]
maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc")
print(
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc"
)
for dir_entry in os.scandir(absolute_simfile_dir_path):
if dir_entry.is_file():
if (
dir_entry.name.endswith(".old")
or dir_entry.name.endswith(".txt")
or dir_entry.name.endswith(".zip")
):
# These are definitely safe to delete for distribution
os.remove(dir_entry.path)
elif (
dir_entry.name.endswith(".ssc")
or dir_entry.name.endswith(".sm")
or dir_entry.name.endswith(".ogg")
):
# These are expected
pass
else:
# Some other extension not listed above
print(
f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
)
elif dir_entry.is_dir():
if dir_entry.name == "__bias-check":
# nine-or-null directories can be removed
shutil.rmtree(dir_entry.path)
print(
f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}"
)
else:
# Some other subdirectory - maybe mods?
print(
f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
)
def maybe_save_anonymized_files(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
if args.dry_run:
print("Dry run - not saving files")
return
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
de = "de" if args.deanonymized else ""
output_path = f"{args.output}/{de}anonymized-{timestamp}"
shutil.copytree(temp_fs.root_path, output_path)
print(f"Saved to {os.path.abspath(output_path)}")
###############
# Main method #
###############
def main(argv: list[str]):
raw_args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesRawArgs())
args = process_args(raw_args)
assert_valid_file_paths(args)
alias_parts = load_alias_parts("aliasparts.csv")
csv_contents = load_csv_contents(args)
assert_known_google_forms_columns_present(csv_contents)
dynamic_columns = detect_dynamic_columns(csv_contents)
# Generate & save CSV columns
csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents)
csv_contents_changed |= maybe_mark_resubmitted_entries(args, csv_contents)
if csv_contents_changed:
maybe_save_generated_columns(args, csv_contents)
# Generate temporary CSV columns
maybe_mark_unspecified_emails(args, csv_contents)
temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns)
maybe_anonymize_entries(args, csv_contents, temp_fs)
maybe_save_anonymized_files(args, csv_contents, temp_fs)
if __name__ == "__main__":
main(sys.argv)