dimocracy-voucher/anonymize_entries.py
2024-08-11 17:05:43 -07:00

417 lines
15 KiB
Python

import argparse
import csv
import _csv
from dataclasses import dataclass
from datetime import datetime
import enum
import os
from random import Random
import shutil
import sys
from zipfile import ZipFile
import fs.path
from fs.base import FS
from fs.copy import copy_dir
from fs.tempfs import TempFS
from fs.zipfs import ZipFS
from pathvalidate import sanitize_filename
import simfile
from simfile.dir import SimfilePack, SimfileDirectory
from simfile.sm import SMChart, SMSimfile
from simfile.ssc import SSCChart, SSCSimfile
from simfile.types import Simfile
####################
# Script arguments #
####################
class AnonymizeEntriesArgs:
"""Stores the command-line arguments for this script."""
csv: str
files: str
dry_run: bool
def argparser():
"""Get an ArgumentParser instance for this command-line script."""
parser = argparse.ArgumentParser()
parser.add_argument("csv", type=str, help="path to the CSV file of form responses")
parser.add_argument(
"files", type=str, help="path to the directory of file responses"
)
parser.add_argument(
"-d",
"--dry-run",
action=argparse.BooleanOptionalAction,
help="preview changes without writing the file",
)
return parser
CsvContents = list[dict[str, str]]
#####################
# Utility functions #
#####################
class KnownColumns(enum.StrEnum):
Timestamp = "Timestamp"
EmailAddress = "Email Address"
GeneratedAlias = "Generated Alias"
IgnoreResubmittedFile = "Ignore Resubmitted File"
# Not persisted:
ExtractedTo = "Extracted To"
@dataclass
class DynamicColumns:
filename: str
ChangedCsvContents = bool
def parse_timestamp(timestamp: str):
return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S")
def canonical_simfile_filename(sm: Simfile) -> str:
return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip())
################
# Script logic #
################
def assert_valid_file_paths(args: AnonymizeEntriesArgs):
assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file"
assert os.path.isdir(args.files), f"{repr(args.files)} is not a directory"
def load_csv_contents(args: AnonymizeEntriesArgs):
with open(args.csv, "r") as csvfile:
return list(csv.DictReader(csvfile))
def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]:
def extract_alias_parts(csv: "_csv._reader"):
return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0)))
with open(csvpath, "r") as csvfile:
alias_parts = extract_alias_parts(csv.reader(csvfile))
print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts")
return alias_parts
def assert_known_google_forms_columns_present(csv_contents: CsvContents):
assert (
KnownColumns.Timestamp in csv_contents[0]
), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column"
assert (
KnownColumns.EmailAddress in csv_contents[0]
), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column"
def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns:
maybe_filename_columns = [
column for (column, value) in csv_contents[0].items() if value.endswith(".zip")
]
assert (
len(maybe_filename_columns) != 0
), 'First data row of provided CSV file has no cell ending in ".zip"'
assert (
len(maybe_filename_columns) == 1
), 'First data row of provided CSV file has multiple cells ending in ".zip"'
filename_column = maybe_filename_columns[0]
print(f"Detected filename column: {repr(filename_column)}")
return DynamicColumns(filename=filename_column)
def maybe_generate_aliases(
args: AnonymizeEntriesArgs,
alias_parts: tuple[list[str], list[str]],
csv_contents: CsvContents,
) -> ChangedCsvContents:
reuse_aliases = KnownColumns.GeneratedAlias in csv_contents[0]
if reuse_aliases:
print("Reusing generated aliases")
return False
alias_to_email_address = {}
for row in csv_contents:
rnd = Random("; ".join([row[KnownColumns.EmailAddress], args.csv, args.files]))
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}"
while (
random_alias in alias_to_email_address
and alias_to_email_address[random_alias] != row[KnownColumns.EmailAddress]
):
print(
f"WARNING: rerolling alias for {row[KnownColumns.EmailAddress]} due to collision with {alias_to_email_address[random_alias]}"
)
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}"
row[KnownColumns.GeneratedAlias] = random_alias
print("Generated an alias for each entry")
return True
def maybe_mark_resubmitted_entries(csv_contents: CsvContents) -> ChangedCsvContents:
reuse_resubmitted = KnownColumns.IgnoreResubmittedFile in csv_contents[0]
if reuse_resubmitted:
print("Reusing resubmitted files column")
return False
else:
most_recent_entry_per_user = {}
resubmitted_total = 0
for loop_pass in ("find", "mark"):
for row in csv_contents:
user = row[KnownColumns.EmailAddress]
timestamp = parse_timestamp(row[KnownColumns.Timestamp])
if loop_pass == "find":
if user in most_recent_entry_per_user:
if timestamp > most_recent_entry_per_user[user]:
most_recent_entry_per_user[user] = timestamp
else:
most_recent_entry_per_user[user] = timestamp
elif loop_pass == "mark":
resubmitted = timestamp < most_recent_entry_per_user[user]
row[KnownColumns.IgnoreResubmittedFile] = (
"true" if resubmitted else ""
)
if resubmitted:
resubmitted_total += 1
print(f"Marked {resubmitted_total} resubmitted files to be ignored")
return True
def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents):
if args.dry_run:
print("Dry run - not writing generated columns back to CSV")
else:
with open(args.csv, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys())
writer.writeheader()
for row in csv_contents:
writer.writerow(row)
print("Wrote generated columns back to CSV")
def extract_entries_to_temporary_folder(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
dynamic_columns: DynamicColumns,
) -> TempFS:
def find_simfile_dir_zip_path(
zip_fs: FS,
) -> tuple[str, SimfileDirectory]:
# Check all immediate subdirectories, followed by the root itself
root = "/"
contents = zip_fs.listdir(root)
subdirs = [item for item in contents if zip_fs.isdir(item)]
for subdir in subdirs:
possible_path = fs.path.join(root, subdir)
possible_simfile_dir = SimfileDirectory(
possible_path,
filesystem=zip_fs,
)
if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path:
return (possible_path, possible_simfile_dir)
raise RuntimeError(
"Unable to find a suitable simfile directory in the ZIP. "
"Make sure the simfile is no more than one directory deep, "
'e.g. contains "Simfile/simfile.ssc".'
)
def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str:
zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs)
canonical_filename = canonical_simfile_filename(simfile_dir.open())
assert not temp_fs.exists(
canonical_filename
), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder"
copy_dir(zip_fs, zip_path, temp_fs, canonical_filename)
return canonical_filename
temp_fs = TempFS(identifier="dimocracy-voucher_anonymized")
for row in csv_contents:
if row[KnownColumns.IgnoreResubmittedFile]:
continue
zip_absolute_path = os.path.join(args.files, row[dynamic_columns.filename])
if os.path.isfile(zip_absolute_path):
with open(zip_absolute_path, "rb") as zip_file:
zip_fs = ZipFS(zip_file)
row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs)
else:
print("WARNING: {zip_absolute_path} not found - skipping")
print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}")
return temp_fs
def anonymize_entries(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
def maybe_rename_file(absolute_path: str | None, canonical_filename: str):
if absolute_path and os.path.basename(absolute_path) != canonical_filename:
absolute_canonical_path = os.path.join(
os.path.dirname(absolute_path), canonical_filename
)
os.rename(absolute_path, absolute_canonical_path)
print(
f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}"
)
def maybe_delete_file(absolute_path: str | None):
if absolute_path and os.path.isfile(absolute_path):
os.remove(absolute_path)
print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}")
for row in csv_contents:
if row[KnownColumns.IgnoreResubmittedFile]:
continue
absolute_simfile_dir_path = os.path.join(
temp_fs.root_path, row[KnownColumns.ExtractedTo]
)
simfile_dir = SimfileDirectory(absolute_simfile_dir_path)
canonical_filename = canonical_simfile_filename(simfile_dir.open())
assets = simfile_dir.assets()
maybe_rename_file(assets.music, f"{canonical_filename}.ogg")
maybe_delete_file(assets.background)
maybe_delete_file(assets.banner)
maybe_delete_file(assets.cdimage)
maybe_delete_file(assets.cdtitle)
maybe_delete_file(assets.disc)
maybe_delete_file(assets.jacket)
if simfile_dir.sm_path:
with simfile.mutate(simfile_dir.sm_path) as sm:
assert isinstance(sm, SMSimfile)
sm.credit = row[KnownColumns.GeneratedAlias]
sm.background = ""
sm.banner = ""
sm.cdtitle = ""
sm.genre = ""
sm.music = f"{canonical_filename}.ogg"
for _chart in sm.charts:
sm_chart: SMChart = _chart # typing workaround
sm_chart.description = row[KnownColumns.GeneratedAlias]
maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm")
print(
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm"
)
if simfile_dir.ssc_path:
with simfile.mutate(simfile_dir.ssc_path) as ssc:
assert isinstance(ssc, SSCSimfile)
ssc.credit = row[KnownColumns.GeneratedAlias]
ssc.music = f"{canonical_filename}.ogg"
ssc.background = ""
ssc.banner = ""
ssc.cdtitle = ""
ssc.genre = ""
ssc.jacket = ""
ssc.cdimage = ""
ssc.discimage = ""
ssc.labels = ""
for _chart in ssc.charts:
ssc_chart: SSCChart = _chart # typing workaround
ssc_chart.description = ""
ssc_chart.chartname = ""
ssc_chart.chartstyle = ""
ssc_chart.credit = row[KnownColumns.GeneratedAlias]
maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc")
print(
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc"
)
for dir_entry in os.scandir(absolute_simfile_dir_path):
if dir_entry.is_file():
if (
dir_entry.name.endswith(".old")
or dir_entry.name.endswith(".txt")
or dir_entry.name.endswith(".zip")
):
# These are definitely safe to delete for distribution
os.remove(dir_entry.path)
elif (
dir_entry.name.endswith(".ssc")
or dir_entry.name.endswith(".sm")
or dir_entry.name.endswith(".ogg")
):
# These are expected
pass
else:
# Some other extension not listed above
print(
f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
)
elif dir_entry.is_dir():
if dir_entry.name == "__bias-check":
# nine-or-null directories can be removed
shutil.rmtree(dir_entry.path)
print(
f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}"
)
else:
# Some other subdirectory - maybe mods?
print(
f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
)
def save_anonymized_files(
args: AnonymizeEntriesArgs,
csv_contents: CsvContents,
temp_fs: TempFS,
):
if args.dry_run:
print("Dry run - not saving anonymized files")
return
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f"./output/anonymized-{timestamp}"
shutil.copytree(temp_fs.root_path, output_path)
print(f"Saved to {os.path.abspath(output_path)}")
###############
# Main method #
###############
def main(argv: list[str]):
args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesArgs())
assert_valid_file_paths(args)
alias_parts = load_alias_parts("aliasparts.csv")
csv_contents = load_csv_contents(args)
assert_known_google_forms_columns_present(csv_contents)
dynamic_columns = detect_dynamic_columns(csv_contents)
csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents)
csv_contents_changed |= maybe_mark_resubmitted_entries(csv_contents)
if csv_contents_changed:
maybe_save_generated_columns(args, csv_contents)
temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns)
anonymize_entries(args, csv_contents, temp_fs)
save_anonymized_files(args, csv_contents, temp_fs)
if __name__ == "__main__":
main(sys.argv)