565 lines
19 KiB
Python
565 lines
19 KiB
Python
import argparse
|
|
import csv
|
|
import _csv
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import enum
|
|
import os
|
|
from random import Random
|
|
import shutil
|
|
import sys
|
|
import textwrap
|
|
from typing import cast
|
|
from zipfile import ZipFile
|
|
|
|
import fs.path
|
|
from fs.base import FS
|
|
from fs.copy import copy_dir
|
|
from fs.tempfs import TempFS
|
|
from fs.zipfs import ZipFS
|
|
from pathvalidate import sanitize_filename
|
|
import simfile
|
|
from simfile.dir import SimfilePack, SimfileDirectory
|
|
from simfile.sm import SMChart, SMSimfile
|
|
from simfile.ssc import SSCChart, SSCSimfile
|
|
from simfile.types import Simfile
|
|
|
|
|
|
####################
|
|
# Script arguments #
|
|
####################
|
|
|
|
|
|
class AnonymizeEntriesRawArgs:
|
|
data_dir: str | None
|
|
csv: str | None
|
|
file_uploads: str | None
|
|
deanonymized: bool
|
|
dry_run: bool
|
|
emails: str
|
|
output: str
|
|
regenerate: bool
|
|
seed: str
|
|
|
|
|
|
class AnonymizeEntriesArgs(AnonymizeEntriesRawArgs):
|
|
"""Stores the command-line arguments for this script."""
|
|
|
|
csv: str
|
|
file_uploads: str
|
|
|
|
|
|
def argparser():
|
|
"""Get an ArgumentParser instance for this command-line script."""
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=textwrap.dedent(
|
|
"""\
|
|
example:
|
|
|
|
path/to/folder:
|
|
├ form_responses.csv
|
|
└ file_responses/
|
|
├ Upload A - User 1.zip
|
|
├ Upload B - User 2.zip
|
|
└ etc.
|
|
|
|
python ./anonymize_entries.py path/to/folder
|
|
|
|
OR
|
|
|
|
python ./anonymize_entries.py -c path/to/folder/form_responses.csv -f path/to/folder/file_responses
|
|
"""
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"data_dir",
|
|
nargs="?",
|
|
type=str,
|
|
help="directory containing both the CSV form data and the file responses (uploads)",
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--csv",
|
|
type=str,
|
|
help="override path to the CSV file of form responses",
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--file-uploads",
|
|
type=str,
|
|
help="override path to the directory of file responses (uploads)",
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--dry-run",
|
|
action=argparse.BooleanOptionalAction,
|
|
help="do not create or modify any files",
|
|
)
|
|
parser.add_argument(
|
|
"-D",
|
|
"--deanonymized",
|
|
action=argparse.BooleanOptionalAction,
|
|
help="skip anonymization of files, simply package them as-is",
|
|
)
|
|
parser.add_argument(
|
|
"-e",
|
|
"--emails",
|
|
type=str,
|
|
help="limit output to files from the specified emails (comma-separated)",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
type=str,
|
|
default="output/",
|
|
help="output directory",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--regenerate",
|
|
action=argparse.BooleanOptionalAction,
|
|
help="force-update generated CSV columns",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--seed",
|
|
type=str,
|
|
help="specify random seed for alias generation (treat this like a password & change it for each round)",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
CsvContents = list[dict[str, str]]
|
|
|
|
#####################
|
|
# Utility functions #
|
|
#####################
|
|
|
|
|
|
class KnownColumns(enum.StrEnum):
|
|
Timestamp = "Timestamp"
|
|
EmailAddress = "Email Address"
|
|
GeneratedAlias = "Generated Alias"
|
|
IgnoreFile = "Ignore File"
|
|
# Not persisted:
|
|
ExtractedTo = "Extracted To"
|
|
|
|
|
|
@dataclass
|
|
class DynamicColumns:
|
|
filename: str
|
|
|
|
|
|
ChangedCsvContents = bool
|
|
|
|
|
|
def parse_timestamp(timestamp: str):
|
|
return datetime.strptime(timestamp, "%m/%d/%Y %H:%M:%S")
|
|
|
|
|
|
def canonical_simfile_filename(sm: Simfile) -> str:
|
|
return sanitize_filename(f"{sm.title} {sm.subtitle or ''}".rstrip())
|
|
|
|
|
|
################
|
|
# Script logic #
|
|
################
|
|
|
|
|
|
def process_args(args: AnonymizeEntriesRawArgs) -> AnonymizeEntriesArgs:
|
|
if not args.csv or not args.file_uploads:
|
|
assert (
|
|
args.data_dir
|
|
), "Positional data_dir argument must be provided if --csv and --file-uploads are not both set"
|
|
for dir_entry in os.scandir(args.data_dir):
|
|
if not args.csv and dir_entry.is_file() and dir_entry.name.endswith(".csv"):
|
|
args.csv = dir_entry.path
|
|
if not args.file_uploads and dir_entry.is_dir():
|
|
if any(
|
|
subdir_entry.name.endswith(".zip")
|
|
for subdir_entry in os.scandir(dir_entry.path)
|
|
):
|
|
args.file_uploads = dir_entry.path
|
|
|
|
assert args.csv, "Unable to find a CSV file in the provided directory"
|
|
assert (
|
|
args.file_uploads
|
|
), "Unable to find a subdirectory containing ZIP files in the provided directory"
|
|
|
|
return cast(AnonymizeEntriesArgs, args)
|
|
|
|
|
|
def assert_valid_file_paths(args: AnonymizeEntriesArgs):
|
|
assert os.path.isfile(args.csv), f"{repr(args.csv)} is not a file"
|
|
assert os.path.isdir(
|
|
args.file_uploads
|
|
), f"{repr(args.file_uploads)} is not a directory"
|
|
|
|
|
|
def load_csv_contents(args: AnonymizeEntriesArgs):
|
|
with open(args.csv, "r") as csvfile:
|
|
return list(csv.DictReader(csvfile))
|
|
|
|
|
|
def load_alias_parts(csvpath: str) -> tuple[list[str], list[str]]:
|
|
def extract_alias_parts(csv: "_csv._reader"):
|
|
return tuple(zip(*((line[0], line[1]) for n, line in enumerate(csv) if n > 0)))
|
|
|
|
with open(csvpath, "r") as csvfile:
|
|
alias_parts = extract_alias_parts(csv.reader(csvfile))
|
|
|
|
print(f"Loaded {sum(len(part) for part in alias_parts)} alias parts")
|
|
|
|
return alias_parts
|
|
|
|
|
|
def assert_known_google_forms_columns_present(csv_contents: CsvContents):
|
|
assert (
|
|
KnownColumns.Timestamp in csv_contents[0]
|
|
), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column"
|
|
assert (
|
|
KnownColumns.EmailAddress in csv_contents[0]
|
|
), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column"
|
|
|
|
|
|
def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns:
|
|
maybe_filename_columns = [
|
|
column for (column, value) in csv_contents[0].items() if value.endswith(".zip")
|
|
]
|
|
assert (
|
|
len(maybe_filename_columns) != 0
|
|
), 'First data row of provided CSV file has no cell ending in ".zip"'
|
|
assert (
|
|
len(maybe_filename_columns) == 1
|
|
), 'First data row of provided CSV file has multiple cells ending in ".zip"'
|
|
filename_column = maybe_filename_columns[0]
|
|
print(f"Detected filename column: {repr(filename_column)}")
|
|
return DynamicColumns(filename=filename_column)
|
|
|
|
|
|
def maybe_generate_aliases(
|
|
args: AnonymizeEntriesArgs,
|
|
alias_parts: tuple[list[str], list[str]],
|
|
csv_contents: CsvContents,
|
|
) -> ChangedCsvContents:
|
|
reuse_aliases = (
|
|
not args.regenerate and KnownColumns.GeneratedAlias in csv_contents[0]
|
|
)
|
|
|
|
if reuse_aliases:
|
|
print("Reusing generated aliases")
|
|
return False
|
|
|
|
alias_to_email_address = {}
|
|
|
|
seed = args.seed or args.csv
|
|
|
|
for row in csv_contents:
|
|
rnd = Random(",".join([row[KnownColumns.EmailAddress], seed]))
|
|
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}"
|
|
while (
|
|
random_alias in alias_to_email_address
|
|
and alias_to_email_address[random_alias] != row[KnownColumns.EmailAddress]
|
|
):
|
|
print(
|
|
f"WARNING: rerolling alias for {row[KnownColumns.EmailAddress]} due to collision with {alias_to_email_address[random_alias]}"
|
|
)
|
|
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[0])}"
|
|
row[KnownColumns.GeneratedAlias] = random_alias
|
|
|
|
print("Generated an alias for each entry")
|
|
return True
|
|
|
|
|
|
def maybe_mark_resubmitted_entries(
|
|
args: AnonymizeEntriesArgs, csv_contents: CsvContents
|
|
) -> ChangedCsvContents:
|
|
reuse_resubmitted = (
|
|
not args.regenerate and KnownColumns.IgnoreFile in csv_contents[0]
|
|
)
|
|
if reuse_resubmitted:
|
|
print("Reusing resubmitted files column")
|
|
return False
|
|
else:
|
|
most_recent_entry_per_user = {}
|
|
resubmitted_total = 0
|
|
for loop_pass in ("find", "mark"):
|
|
for row in csv_contents:
|
|
user = row[KnownColumns.EmailAddress]
|
|
timestamp = parse_timestamp(row[KnownColumns.Timestamp])
|
|
if loop_pass == "find":
|
|
if user in most_recent_entry_per_user:
|
|
if timestamp > most_recent_entry_per_user[user]:
|
|
most_recent_entry_per_user[user] = timestamp
|
|
else:
|
|
most_recent_entry_per_user[user] = timestamp
|
|
elif loop_pass == "mark":
|
|
resubmitted = timestamp < most_recent_entry_per_user[user]
|
|
row[KnownColumns.IgnoreFile] = "resubmitted" if resubmitted else ""
|
|
if resubmitted:
|
|
resubmitted_total += 1
|
|
print(f"Marked {resubmitted_total} resubmitted files to be ignored")
|
|
return True
|
|
|
|
|
|
def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvContents):
|
|
if args.dry_run:
|
|
print("Dry run - not writing generated columns back to CSV")
|
|
else:
|
|
with open(args.csv, "w", newline="") as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys())
|
|
writer.writeheader()
|
|
for row in csv_contents:
|
|
writer.writerow(row)
|
|
print("Wrote generated columns back to CSV")
|
|
|
|
|
|
def maybe_mark_unspecified_emails(
|
|
args: AnonymizeEntriesArgs, csv_contents: CsvContents
|
|
):
|
|
if not args.emails:
|
|
return
|
|
|
|
unspecified_total = 0
|
|
specified_total = 0
|
|
emails = set(args.emails.split(","))
|
|
|
|
for row in csv_contents:
|
|
if not row[KnownColumns.IgnoreFile]:
|
|
if row[KnownColumns.EmailAddress] not in emails:
|
|
row[KnownColumns.IgnoreFile] = "unspecified"
|
|
unspecified_total += 1
|
|
else:
|
|
specified_total += 1
|
|
|
|
assert specified_total > 0, "No responses were found from the specified emails"
|
|
|
|
s = "s" if specified_total != 1 else ""
|
|
print(
|
|
f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others"
|
|
)
|
|
|
|
|
|
def extract_entries_to_temporary_folder(
|
|
args: AnonymizeEntriesArgs,
|
|
csv_contents: CsvContents,
|
|
dynamic_columns: DynamicColumns,
|
|
) -> TempFS:
|
|
|
|
def find_simfile_dir_zip_path(
|
|
zip_fs: FS,
|
|
) -> tuple[str, SimfileDirectory]:
|
|
# Check all immediate subdirectories, followed by the root itself
|
|
root = "/"
|
|
contents = zip_fs.listdir(root)
|
|
subdirs = [item for item in contents if zip_fs.isdir(item)]
|
|
|
|
for subdir in subdirs:
|
|
possible_path = fs.path.join(root, subdir)
|
|
possible_simfile_dir = SimfileDirectory(
|
|
possible_path,
|
|
filesystem=zip_fs,
|
|
)
|
|
if possible_simfile_dir.sm_path or possible_simfile_dir.ssc_path:
|
|
return (possible_path, possible_simfile_dir)
|
|
|
|
raise RuntimeError(
|
|
"Unable to find a suitable simfile directory in the ZIP. "
|
|
"Make sure the simfile is no more than one directory deep, "
|
|
'e.g. contains "Simfile/simfile.ssc".'
|
|
)
|
|
|
|
def extract_simfile_dir(zip_fs: FS, temp_fs: FS) -> str:
|
|
zip_path, simfile_dir = find_simfile_dir_zip_path(zip_fs)
|
|
canonical_filename = canonical_simfile_filename(simfile_dir.open())
|
|
assert not temp_fs.exists(
|
|
canonical_filename
|
|
), "ERROR: trying to extract {canonical_filename} but it's already present in the temp folder"
|
|
copy_dir(zip_fs, zip_path, temp_fs, canonical_filename)
|
|
return canonical_filename
|
|
|
|
temp_fs = TempFS(identifier="dimocracy-voucher")
|
|
|
|
for row in csv_contents:
|
|
if row[KnownColumns.IgnoreFile]:
|
|
continue
|
|
zip_absolute_path = os.path.join(
|
|
args.file_uploads, row[dynamic_columns.filename]
|
|
)
|
|
if os.path.isfile(zip_absolute_path):
|
|
with open(zip_absolute_path, "rb") as zip_file:
|
|
zip_fs = ZipFS(zip_file)
|
|
row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs)
|
|
else:
|
|
print("WARNING: {zip_absolute_path} not found - skipping")
|
|
|
|
print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}")
|
|
return temp_fs
|
|
|
|
|
|
def maybe_anonymize_entries(
|
|
args: AnonymizeEntriesArgs,
|
|
csv_contents: CsvContents,
|
|
temp_fs: TempFS,
|
|
):
|
|
if args.deanonymized:
|
|
print("Deanonymized - skipping anonymization step")
|
|
return
|
|
|
|
def maybe_rename_file(absolute_path: str | None, canonical_filename: str):
|
|
if absolute_path and os.path.basename(absolute_path) != canonical_filename:
|
|
absolute_canonical_path = os.path.join(
|
|
os.path.dirname(absolute_path), canonical_filename
|
|
)
|
|
os.rename(absolute_path, absolute_canonical_path)
|
|
print(
|
|
f"Renamed {os.path.relpath(absolute_path, temp_fs.root_path)} to {os.path.relpath(absolute_canonical_path, temp_fs.root_path)}"
|
|
)
|
|
|
|
def maybe_delete_file(absolute_path: str | None):
|
|
if absolute_path and os.path.isfile(absolute_path):
|
|
os.remove(absolute_path)
|
|
print(f"Deleted {os.path.relpath(absolute_path, temp_fs.root_path)}")
|
|
|
|
for row in csv_contents:
|
|
if row[KnownColumns.IgnoreFile]:
|
|
continue
|
|
|
|
absolute_simfile_dir_path = os.path.join(
|
|
temp_fs.root_path, row[KnownColumns.ExtractedTo]
|
|
)
|
|
simfile_dir = SimfileDirectory(absolute_simfile_dir_path)
|
|
canonical_filename = canonical_simfile_filename(simfile_dir.open())
|
|
|
|
assets = simfile_dir.assets()
|
|
maybe_rename_file(assets.music, f"{canonical_filename}.ogg")
|
|
maybe_delete_file(assets.background)
|
|
maybe_delete_file(assets.banner)
|
|
maybe_delete_file(assets.cdimage)
|
|
maybe_delete_file(assets.cdtitle)
|
|
maybe_delete_file(assets.disc)
|
|
maybe_delete_file(assets.jacket)
|
|
|
|
if simfile_dir.sm_path:
|
|
with simfile.mutate(simfile_dir.sm_path) as sm:
|
|
assert isinstance(sm, SMSimfile)
|
|
sm.credit = row[KnownColumns.GeneratedAlias]
|
|
sm.background = ""
|
|
sm.banner = ""
|
|
sm.cdtitle = ""
|
|
sm.genre = ""
|
|
sm.music = f"{canonical_filename}.ogg"
|
|
for _chart in sm.charts:
|
|
sm_chart: SMChart = _chart # typing workaround
|
|
sm_chart.description = row[KnownColumns.GeneratedAlias]
|
|
maybe_rename_file(simfile_dir.sm_path, f"{canonical_filename}.sm")
|
|
print(
|
|
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.sm"
|
|
)
|
|
|
|
if simfile_dir.ssc_path:
|
|
with simfile.mutate(simfile_dir.ssc_path) as ssc:
|
|
assert isinstance(ssc, SSCSimfile)
|
|
ssc.credit = row[KnownColumns.GeneratedAlias]
|
|
ssc.music = f"{canonical_filename}.ogg"
|
|
ssc.background = ""
|
|
ssc.banner = ""
|
|
ssc.cdtitle = ""
|
|
ssc.genre = ""
|
|
ssc.jacket = ""
|
|
ssc.cdimage = ""
|
|
ssc.discimage = ""
|
|
ssc.labels = ""
|
|
for _chart in ssc.charts:
|
|
ssc_chart: SSCChart = _chart # typing workaround
|
|
ssc_chart.description = ""
|
|
ssc_chart.chartname = ""
|
|
ssc_chart.chartstyle = ""
|
|
ssc_chart.credit = row[KnownColumns.GeneratedAlias]
|
|
maybe_rename_file(simfile_dir.ssc_path, f"{canonical_filename}.ssc")
|
|
print(
|
|
f"Scrubbed {os.path.relpath(absolute_simfile_dir_path, temp_fs.root_path)}/{canonical_filename}.ssc"
|
|
)
|
|
|
|
for dir_entry in os.scandir(absolute_simfile_dir_path):
|
|
if dir_entry.is_file():
|
|
if (
|
|
dir_entry.name.endswith(".old")
|
|
or dir_entry.name.endswith(".txt")
|
|
or dir_entry.name.endswith(".zip")
|
|
):
|
|
# These are definitely safe to delete for distribution
|
|
os.remove(dir_entry.path)
|
|
elif (
|
|
dir_entry.name.endswith(".ssc")
|
|
or dir_entry.name.endswith(".sm")
|
|
or dir_entry.name.endswith(".ogg")
|
|
):
|
|
# These are expected
|
|
pass
|
|
else:
|
|
# Some other extension not listed above
|
|
print(
|
|
f"WARNING: leaving unexpected file {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
|
|
)
|
|
elif dir_entry.is_dir():
|
|
if dir_entry.name == "__bias-check":
|
|
# nine-or-null directories can be removed
|
|
shutil.rmtree(dir_entry.path)
|
|
print(
|
|
f"Deleted directory {os.path.relpath(dir_entry.path, temp_fs.root_path)}"
|
|
)
|
|
else:
|
|
# Some other subdirectory - maybe mods?
|
|
print(
|
|
f"WARNING: leaving unexpected subdirectory {os.path.relpath(dir_entry.path, temp_fs.root_path)} alone"
|
|
)
|
|
|
|
|
|
def maybe_save_anonymized_files(
|
|
args: AnonymizeEntriesArgs,
|
|
csv_contents: CsvContents,
|
|
temp_fs: TempFS,
|
|
):
|
|
if args.dry_run:
|
|
print("Dry run - not saving files")
|
|
return
|
|
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
de = "de" if args.deanonymized else ""
|
|
output_path = f"{args.output}/{de}anonymized-{timestamp}"
|
|
shutil.copytree(temp_fs.root_path, output_path)
|
|
print(f"Saved to {os.path.abspath(output_path)}")
|
|
|
|
|
|
###############
|
|
# Main method #
|
|
###############
|
|
|
|
|
|
def main(argv: list[str]):
|
|
raw_args = argparser().parse_args(argv[1:], namespace=AnonymizeEntriesRawArgs())
|
|
args = process_args(raw_args)
|
|
assert_valid_file_paths(args)
|
|
alias_parts = load_alias_parts("aliasparts.csv")
|
|
csv_contents = load_csv_contents(args)
|
|
assert_known_google_forms_columns_present(csv_contents)
|
|
dynamic_columns = detect_dynamic_columns(csv_contents)
|
|
|
|
# Generate & save CSV columns
|
|
csv_contents_changed = maybe_generate_aliases(args, alias_parts, csv_contents)
|
|
csv_contents_changed |= maybe_mark_resubmitted_entries(args, csv_contents)
|
|
if csv_contents_changed:
|
|
maybe_save_generated_columns(args, csv_contents)
|
|
|
|
# Generate temporary CSV columns
|
|
maybe_mark_unspecified_emails(args, csv_contents)
|
|
|
|
temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns)
|
|
maybe_anonymize_entries(args, csv_contents, temp_fs)
|
|
maybe_save_anonymized_files(args, csv_contents, temp_fs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|