simfile-smoketest/smoketest/runner.py
2022-07-22 17:48:29 -07:00

177 lines
6 KiB
Python

from contextlib import contextmanager
import dataclasses
from datetime import datetime
import os
import sys
import traceback
from typing import Iterator, Optional
from . import __version__
import msdparser
import simfile
from simfile.dir import SimfilePack
from simfile.notes import NoteData
from simfile.notes.group import group_notes, SameBeatNotes
from simfile.notes.timed import time_notes
from simfile.ssc import SSCChart
from simfile.timing.engine import TimingData
from simfile.timing.displaybpm import displaybpm
from simfile.types import Simfile, Chart
from .args import SmoketestArgs
from .storage import *
@dataclasses.dataclass(frozen=True)
class SimfileContext:
run: Run
path: str
kind: Optional[str] = None
simfile_title: Optional[str] = None
chart_stepstype: Optional[str] = None
chart_meter: Optional[str] = None
chart_index: Optional[int] = None
def report_error(self, action: str):
print(f"{self.path} ({self.kind}): {traceback.format_exc()}")
simfile_object, _ = SimfileObject.get_or_create(
path=self.path,
kind=self.kind,
simfile_title=self.simfile_title,
chart_stepstype=self.chart_stepstype,
chart_meter=self.chart_meter,
chart_index=self.chart_index,
)
SimfileError.create(
action=action,
traceback=traceback.format_exc(),
simfile_object=simfile_object,
run=self.run,
)
@contextmanager
def _update(self, **kwargs) -> Iterator["SimfileContext"]:
yield dataclasses.replace(self, **kwargs)
@contextmanager
def object(self, kind, **kwargs) -> Iterator["SimfileContext"]:
with self._update(kind=kind, **kwargs) as context:
SimfileObject.get_or_create(
kind=kind,
path=context.path,
simfile_title=context.simfile_title,
chart_stepstype=context.chart_stepstype,
chart_meter=context.chart_meter,
chart_index=context.chart_index,
)
yield context
@contextmanager
def perform(self, action: str, **kwargs) -> Iterator["SimfileContext"]:
with self._update(**kwargs) as context:
try:
yield context
except Exception:
context.report_error(action)
class SmoketestRun:
args: SmoketestArgs
run: Run
def __init__(self, args: SmoketestArgs):
self.args = args
self.run = Run.create(
created=datetime.now(),
simfile_version=simfile.__version__,
msdparser_version=msdparser.__version__,
smoketest_version=__version__,
)
def process_songs_dir(self, songs_dir: str):
for entry in os.scandir(songs_dir):
if entry.is_dir():
self.process_pack(entry.path)
def process_pack(self, pack_dir: str):
root_context = SimfileContext(run=self.run, path=pack_dir)
with root_context.object("SimfilePack", path=pack_dir) as context:
print(f"Processing pack {pack_dir}")
pack = SimfilePack(pack_dir)
with context.perform("simfile_dirs") as context:
for simfile_dir in pack.simfile_dirs():
if simfile_dir.sm_path:
self._open_simfile(context, simfile_dir.sm_path)
if simfile_dir.ssc_path:
self._open_simfile(context, simfile_dir.ssc_path)
def _open_simfile(self, context: SimfileContext, simfile_path: str):
if self.args.new_only:
if SimfileObject.get_or_none(SimfileObject.path == simfile_path):
return
elif self.args.error_only:
if not (
SimfileError.select()
.join(SimfileObject)
.where(SimfileObject.path == simfile_path) # type: ignore
.get_or_none()
):
return
with context.perform("simfile.open", path=simfile_path) as context:
sim = simfile.open(simfile_path)
with context.object(
kind=sim.__class__.__name__,
simfile_title=sim.title,
) as context:
self._test_simfile(context, sim)
def _test_simfile(self, context: SimfileContext, sim: Simfile):
with context.perform("TimingData") as context:
TimingData(sim)
with context.perform("displaybpm") as context:
displaybpm(sim)
with context.perform("charts") as context:
for c, chart_ in enumerate(sim.charts):
chart: Chart = chart_ # typing workaround
with context.object(
chart.__class__.__name__,
chart_stepstype=chart.stepstype,
chart_meter=chart.meter,
chart_index=c,
) as context:
self._test_chart(context, sim, chart)
def _test_chart(self, context: SimfileContext, sim: Simfile, chart: Chart):
td = None
if isinstance(chart, SSCChart):
with context.perform("TimingData+ssc") as context:
td = TimingData(sim, chart)
with context.perform("displaybpm+ssc") as context:
displaybpm(sim)
else:
# We've already performed this action. Don't double-report any error
try:
td = TimingData(sim)
except Exception:
pass
with context.perform("NoteData") as context:
nd = NoteData(chart)
with context.perform("group_notes") as context:
for _ in group_notes(
iter(nd),
join_heads_to_tails=True,
same_beat_notes=SameBeatNotes.JOIN_ALL,
):
pass
if td:
with context.perform("time_notes") as context:
for _ in time_notes(nd, td):
pass