d2024r1 changes

This commit is contained in:
Ash Garcia 2024-09-23 21:10:31 -07:00
parent 275f27155d
commit 38a486131d

View file

@ -21,10 +21,11 @@ from fs.zipfs import ZipFS
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
import simfile import simfile
from simfile.dir import SimfilePack, SimfileDirectory from simfile.dir import SimfilePack, SimfileDirectory
from simfile.notes import NoteData
from simfile.sm import SMChart, SMSimfile from simfile.sm import SMChart, SMSimfile
from simfile.ssc import SSCChart, SSCSimfile from simfile.ssc import SSCChart, SSCSimfile
from simfile.timing import BeatValues, BeatValue from simfile.timing import BeatValues, BeatValue
from simfile.types import Simfile from simfile.types import Simfile, Chart
#################### ####################
@ -38,7 +39,7 @@ class AnonymizeEntriesRawArgs:
file_uploads: str | None file_uploads: str | None
deanonymized: bool deanonymized: bool
dry_run: bool dry_run: bool
emails: str users: str
output: str | None output: str | None
regenerate: bool regenerate: bool
seed: str seed: str
@ -112,10 +113,10 @@ def argparser():
help="skip anonymization of files, simply package them as-is", help="skip anonymization of files, simply package them as-is",
) )
parser.add_argument( parser.add_argument(
"-e", "-u",
"--emails", "--users",
type=str, type=str,
help="limit output to files from the specified emails (comma-separated)", help="limit output to files from the specified users (comma-separated)",
) )
parser.add_argument( parser.add_argument(
"-r", "-r",
@ -142,7 +143,7 @@ CsvContents = list[dict[str, str]]
class KnownColumns(enum.StrEnum): class KnownColumns(enum.StrEnum):
Timestamp = "Timestamp" Timestamp = "Timestamp"
EmailAddress = "Email Address" UserId = "Your gamer tag/alias: (e.g. dimo)"
GeneratedAlias = "Generated Alias" GeneratedAlias = "Generated Alias"
IgnoreFile = "Ignore File" IgnoreFile = "Ignore File"
# Not persisted: # Not persisted:
@ -211,7 +212,7 @@ def assert_valid_file_paths(args: AnonymizeEntriesArgs):
def load_csv_contents(args: AnonymizeEntriesArgs): def load_csv_contents(args: AnonymizeEntriesArgs):
with open(args.csv, "r") as csvfile: with open(args.csv, "r", encoding="utf-8") as csvfile:
return list(csv.DictReader(csvfile)) return list(csv.DictReader(csvfile))
@ -232,8 +233,8 @@ def assert_known_google_forms_columns_present(csv_contents: CsvContents):
KnownColumns.Timestamp in csv_contents[0] KnownColumns.Timestamp in csv_contents[0]
), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column" ), f"Provided CSV file does not have a {repr(KnownColumns.Timestamp)} column"
assert ( assert (
KnownColumns.EmailAddress in csv_contents[0] KnownColumns.UserId in csv_contents[0]
), f"Provided CSV file does not have an {repr(KnownColumns.EmailAddress)} column" ), f"Provided CSV file does not have an {repr(KnownColumns.UserId)} column"
def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns: def detect_dynamic_columns(csv_contents: CsvContents) -> DynamicColumns:
@ -272,33 +273,31 @@ def maybe_generate_aliases(
with open("aliases/suswords.txt", "r", encoding="utf-8") as suswords_file: with open("aliases/suswords.txt", "r", encoding="utf-8") as suswords_file:
suswords = set(line.rstrip() for line in suswords_file) suswords = set(line.rstrip() for line in suswords_file)
alias_to_email_address = {} alias_to_user_id = {}
seed = args.seed or args.csv seed = args.seed or args.csv
for row in csv_contents: for row in csv_contents:
rnd = Random(",".join([row[KnownColumns.EmailAddress], seed])) rnd = Random(",".join([row[KnownColumns.UserId], seed]))
while True: while True:
random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}" random_alias = f"{rnd.choice(alias_parts[0])} {rnd.choice(alias_parts[1])}"
if ( if (
random_alias in alias_to_email_address random_alias in alias_to_user_id
and alias_to_email_address[random_alias] and alias_to_user_id[random_alias] != row[KnownColumns.UserId]
!= row[KnownColumns.EmailAddress]
): ):
print( print(
f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} is already being used by {alias_to_email_address[random_alias]}" f"Rerolling alias for {row[KnownColumns.UserId]} because {repr(random_alias)} is already being used by {alias_to_user_id[random_alias]}"
) )
elif random_alias in usedaliases: elif random_alias.lower() in usedaliases:
print( print(
f"Rerolling alias for {row[KnownColumns.EmailAddress]} because {repr(random_alias)} has already been used" f"Rerolling alias for {row[KnownColumns.UserId]} because {repr(random_alias)} has already been used"
) )
elif any( elif any(
random_part in suswords for random_part in random_alias.split(" ") random_part in suswords for random_part in random_alias.split(" ")
): ):
print( print(
f"WARNING: alias for {row[KnownColumns.EmailAddress]} {repr(random_alias)} contains a sus word" f"WARNING: alias for {row[KnownColumns.UserId]} {repr(random_alias)} contains a sus word"
) )
break
else: else:
break break
row[KnownColumns.GeneratedAlias] = random_alias row[KnownColumns.GeneratedAlias] = random_alias
@ -321,7 +320,7 @@ def maybe_mark_resubmitted_entries(
resubmitted_total = 0 resubmitted_total = 0
for loop_pass in ("find", "mark"): for loop_pass in ("find", "mark"):
for row in csv_contents: for row in csv_contents:
user = row[KnownColumns.EmailAddress] user = row[KnownColumns.UserId]
timestamp = parse_timestamp(row[KnownColumns.Timestamp]) timestamp = parse_timestamp(row[KnownColumns.Timestamp])
if loop_pass == "find": if loop_pass == "find":
if user in most_recent_entry_per_user: if user in most_recent_entry_per_user:
@ -343,7 +342,7 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo
if args.dry_run: if args.dry_run:
print("Dry run - not writing generated columns back to CSV") print("Dry run - not writing generated columns back to CSV")
else: else:
with open(args.csv, "w", newline="") as csvfile: with open(args.csv, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys()) writer = csv.DictWriter(csvfile, fieldnames=csv_contents[0].keys())
writer.writeheader() writer.writeheader()
for row in csv_contents: for row in csv_contents:
@ -351,29 +350,29 @@ def maybe_save_generated_columns(args: AnonymizeEntriesArgs, csv_contents: CsvCo
print("Wrote generated columns back to CSV") print("Wrote generated columns back to CSV")
def maybe_mark_unspecified_emails( def maybe_mark_unspecified_user_ids(
args: AnonymizeEntriesArgs, csv_contents: CsvContents args: AnonymizeEntriesArgs, csv_contents: CsvContents
): ):
if not args.emails: if not args.users:
return return
unspecified_total = 0 unspecified_total = 0
specified_total = 0 specified_total = 0
emails = set(args.emails.split(",")) users = set(args.users.split(","))
for row in csv_contents: for row in csv_contents:
if not row[KnownColumns.IgnoreFile]: if not row[KnownColumns.IgnoreFile]:
if row[KnownColumns.EmailAddress] not in emails: if row[KnownColumns.UserId] not in users:
row[KnownColumns.IgnoreFile] = "unspecified" row[KnownColumns.IgnoreFile] = "unspecified"
unspecified_total += 1 unspecified_total += 1
else: else:
specified_total += 1 specified_total += 1
assert specified_total > 0, "No responses were found from the specified emails" assert specified_total > 0, "No responses were found from the specified users"
s = "s" if specified_total != 1 else "" s = "s" if specified_total != 1 else ""
print( print(
f"Processing {specified_total} file{s} for specified emails & ignoring {unspecified_total} others" f"Processing {specified_total} file{s} for specified users & ignoring {unspecified_total} others"
) )
@ -389,7 +388,7 @@ def extract_entries_to_temporary_folder(
# Check all immediate subdirectories, followed by the root itself # Check all immediate subdirectories, followed by the root itself
root = "/" root = "/"
contents = zip_fs.listdir(root) contents = zip_fs.listdir(root)
subdirs = [item for item in contents if zip_fs.isdir(item)] subdirs = [item for item in contents if zip_fs.isdir(item)] + [root]
for subdir in subdirs: for subdir in subdirs:
possible_path = fs.path.join(root, subdir) possible_path = fs.path.join(root, subdir)
@ -401,7 +400,7 @@ def extract_entries_to_temporary_folder(
return (possible_path, possible_simfile_dir) return (possible_path, possible_simfile_dir)
raise RuntimeError( raise RuntimeError(
"Unable to find a suitable simfile directory in the ZIP. " "Unable to find a suitable simfile directory in ZIP. "
"Make sure the simfile is no more than one directory deep, " "Make sure the simfile is no more than one directory deep, "
'e.g. contains "Simfile/simfile.ssc".' 'e.g. contains "Simfile/simfile.ssc".'
) )
@ -418,17 +417,23 @@ def extract_entries_to_temporary_folder(
temp_fs = TempFS(identifier="dimocracy-voucher") temp_fs = TempFS(identifier="dimocracy-voucher")
for row in csv_contents: for row in csv_contents:
if row[KnownColumns.IgnoreFile]: try:
continue if row[KnownColumns.IgnoreFile]:
zip_absolute_path = os.path.join( continue
args.file_uploads, row[dynamic_columns.filename] zip_absolute_path = os.path.join(
) args.file_uploads, row[dynamic_columns.filename]
if os.path.isfile(zip_absolute_path): )
with open(zip_absolute_path, "rb") as zip_file: if os.path.isfile(zip_absolute_path):
zip_fs = ZipFS(zip_file) with open(zip_absolute_path, "rb") as zip_file:
row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs) zip_fs = ZipFS(zip_file)
else: row[KnownColumns.ExtractedTo] = extract_simfile_dir(zip_fs, temp_fs)
print("WARNING: {zip_absolute_path} not found - skipping") else:
print("WARNING: {zip_absolute_path} not found - skipping")
except:
print(
f"Exception encountered while processing row {row[KnownColumns.UserId]}"
)
raise
print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}") print(f"Extracted latest submissions to temporary directory {temp_fs.root_path}")
return temp_fs return temp_fs
@ -469,6 +474,39 @@ def maybe_anonymize_entries(
print(f"Anonymized BPMs from {repr(bpm_str)} to {repr(str(bpm_values))}") print(f"Anonymized BPMs from {repr(bpm_str)} to {repr(str(bpm_values))}")
return str(bpm_values) return str(bpm_values)
def clean_up_difficulties(sf: Simfile):
charts_to_remove: list[Chart] = []
chart_with_notes = None
for _chart in sf.charts:
chart: Chart = _chart # typing workaround
notedata = NoteData(_chart)
if next(iter(notedata), None) is None:
charts_to_remove.append(_chart)
continue
if chart_with_notes is not None:
raise RuntimeError(
f"{canonical_filename} contains multiple charts with notes"
)
chart_with_notes = chart
if chart.difficulty != "Challenge":
print(
f"WARNING: forced difficulty of chart in {canonical_filename} to Challenge"
)
chart.difficulty = "Challenge"
if chart_with_notes is None:
raise RuntimeError(f"{canonical_filename} has no charts with notes")
for chart_to_remove in charts_to_remove:
print(
f"WARNING: removing {chart_to_remove.difficulty} chart with no notes from {canonical_filename}"
)
sm.charts.remove(chart_to_remove)
for row in csv_contents: for row in csv_contents:
if row[KnownColumns.IgnoreFile]: if row[KnownColumns.IgnoreFile]:
continue continue
@ -498,6 +536,8 @@ def maybe_anonymize_entries(
sm.genre = "" sm.genre = ""
sm.music = f"{canonical_filename}.ogg" sm.music = f"{canonical_filename}.ogg"
sm.bpms = anonymize_bpms(sm.bpms) sm.bpms = anonymize_bpms(sm.bpms)
clean_up_difficulties(sm)
for _chart in sm.charts: for _chart in sm.charts:
sm_chart: SMChart = _chart # typing workaround sm_chart: SMChart = _chart # typing workaround
sm_chart.description = row[KnownColumns.GeneratedAlias] sm_chart.description = row[KnownColumns.GeneratedAlias]
@ -520,6 +560,7 @@ def maybe_anonymize_entries(
ssc.discimage = "" ssc.discimage = ""
ssc.labels = "" ssc.labels = ""
ssc.bpms = anonymize_bpms(ssc.bpms) ssc.bpms = anonymize_bpms(ssc.bpms)
clean_up_difficulties(ssc)
for _chart in ssc.charts: for _chart in ssc.charts:
ssc_chart: SSCChart = _chart # typing workaround ssc_chart: SSCChart = _chart # typing workaround
ssc_chart.description = "" ssc_chart.description = ""
@ -537,8 +578,11 @@ def maybe_anonymize_entries(
if dir_entry.is_file(): if dir_entry.is_file():
if ( if (
dir_entry.name.endswith(".old") dir_entry.name.endswith(".old")
or dir_entry.name.endswith(".sm~")
or dir_entry.name.endswith(".ssc~")
or dir_entry.name.endswith(".txt") or dir_entry.name.endswith(".txt")
or dir_entry.name.endswith(".zip") or dir_entry.name.endswith(".zip")
or dir_entry.name == ".DS_Store"
): ):
# These are definitely safe to delete for distribution # These are definitely safe to delete for distribution
os.remove(dir_entry.path) os.remove(dir_entry.path)
@ -604,7 +648,7 @@ def main(argv: list[str]):
maybe_save_generated_columns(args, csv_contents) maybe_save_generated_columns(args, csv_contents)
# Generate temporary CSV columns # Generate temporary CSV columns
maybe_mark_unspecified_emails(args, csv_contents) maybe_mark_unspecified_user_ids(args, csv_contents)
temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns) temp_fs = extract_entries_to_temporary_folder(args, csv_contents, dynamic_columns)
maybe_anonymize_entries(args, csv_contents, temp_fs) maybe_anonymize_entries(args, csv_contents, temp_fs)