from contextlib import contextmanager import dataclasses from datetime import datetime import os import sys import traceback from typing import Iterator, Optional from . import __version__ import msdparser import simfile from simfile.dir import SimfilePack from simfile.notes import NoteData from simfile.notes.group import group_notes, SameBeatNotes from simfile.notes.timed import time_notes from simfile.ssc import SSCChart from simfile.timing.engine import TimingData from simfile.timing.displaybpm import displaybpm from simfile.types import Simfile, Chart from .args import SmoketestArgs from .storage import * @dataclasses.dataclass(frozen=True) class SimfileContext: run: Run path: str kind: Optional[str] = None simfile_title: Optional[str] = None chart_stepstype: Optional[str] = None chart_meter: Optional[str] = None chart_index: Optional[int] = None def report_error(self, action: str): print(f"{self.path} ({self.kind}): {traceback.format_exc()}") simfile_object, _ = SimfileObject.get_or_create( path=self.path, kind=self.kind, simfile_title=self.simfile_title, chart_stepstype=self.chart_stepstype, chart_meter=self.chart_meter, chart_index=self.chart_index, ) SimfileError.create( action=action, traceback=traceback.format_exc(), simfile_object=simfile_object, run=self.run, ) @contextmanager def _update(self, **kwargs) -> Iterator["SimfileContext"]: yield dataclasses.replace(self, **kwargs) @contextmanager def object(self, kind, **kwargs) -> Iterator["SimfileContext"]: with self._update(kind=kind, **kwargs) as context: SimfileObject.get_or_create( kind=kind, path=context.path, simfile_title=context.simfile_title, chart_stepstype=context.chart_stepstype, chart_meter=context.chart_meter, chart_index=context.chart_index, ) yield context @contextmanager def perform(self, action: str, **kwargs) -> Iterator["SimfileContext"]: with self._update(**kwargs) as context: try: yield context except Exception: context.report_error(action) class SmoketestRun: args: SmoketestArgs run: Run def __init__(self, args: SmoketestArgs): self.args = args self.run = Run.create( created=datetime.now(), simfile_version=simfile.__version__, msdparser_version=msdparser.__version__, smoketest_version=__version__, ) def process_songs_dir(self, songs_dir: str): for entry in os.scandir(songs_dir): if entry.is_dir(): self.process_pack(entry.path) def process_pack(self, pack_dir: str): root_context = SimfileContext(run=self.run, path=pack_dir) with root_context.object("SimfilePack", path=pack_dir) as context: print(f"Processing pack {pack_dir}") pack = SimfilePack(pack_dir) with context.perform("simfile_dirs") as context: for simfile_dir in pack.simfile_dirs(): if simfile_dir.sm_path: self._open_simfile(context, simfile_dir.sm_path) if simfile_dir.ssc_path: self._open_simfile(context, simfile_dir.ssc_path) def _open_simfile(self, context: SimfileContext, simfile_path: str): if self.args.new_only: if SimfileObject.get_or_none(SimfileObject.path == simfile_path): return elif self.args.error_only: if not ( SimfileError.select() .join(SimfileObject) .where(SimfileObject.path == simfile_path) # type: ignore .get_or_none() ): return with context.perform("simfile.open", path=simfile_path) as context: sim = simfile.open(simfile_path) with context.object( kind=sim.__class__.__name__, simfile_title=sim.title, ) as context: self._test_simfile(context, sim) def _test_simfile(self, context: SimfileContext, sim: Simfile): with context.perform("TimingData") as context: TimingData(sim) with context.perform("displaybpm") as context: displaybpm(sim) with context.perform("charts") as context: for c, chart_ in enumerate(sim.charts): chart: Chart = chart_ # typing workaround with context.object( chart.__class__.__name__, chart_stepstype=chart.stepstype, chart_meter=chart.meter, chart_index=c, ) as context: self._test_chart(context, sim, chart) def _test_chart(self, context: SimfileContext, sim: Simfile, chart: Chart): td = None if isinstance(chart, SSCChart): with context.perform("TimingData+ssc") as context: td = TimingData(sim, chart) with context.perform("displaybpm+ssc") as context: displaybpm(sim) else: # We've already performed this action. Don't double-report any error try: td = TimingData(sim) except Exception: pass with context.perform("NoteData") as context: nd = NoteData(chart) with context.perform("group_notes") as context: for _ in group_notes( iter(nd), join_heads_to_tails=True, same_beat_notes=SameBeatNotes.JOIN_ALL, ): pass if td: with context.perform("time_notes") as context: for _ in time_notes(nd, td): pass