177 lines
6 KiB
Python
177 lines
6 KiB
Python
from contextlib import contextmanager
|
|
import dataclasses
|
|
from datetime import datetime
|
|
import os
|
|
import sys
|
|
import traceback
|
|
from typing import Iterator, Optional
|
|
|
|
from . import __version__
|
|
import msdparser
|
|
import simfile
|
|
from simfile.dir import SimfilePack
|
|
from simfile.notes import NoteData
|
|
from simfile.notes.group import group_notes, SameBeatNotes
|
|
from simfile.notes.timed import time_notes
|
|
from simfile.ssc import SSCChart
|
|
from simfile.timing.engine import TimingData
|
|
from simfile.timing.displaybpm import displaybpm
|
|
from simfile.types import Simfile, Chart
|
|
|
|
from .args import SmoketestArgs
|
|
from .storage import *
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class SimfileContext:
|
|
run: Run
|
|
path: str
|
|
kind: Optional[str] = None
|
|
simfile_title: Optional[str] = None
|
|
chart_stepstype: Optional[str] = None
|
|
chart_meter: Optional[str] = None
|
|
chart_index: Optional[int] = None
|
|
|
|
def report_error(self, action: str):
|
|
print(f"{self.path} ({self.kind}): {traceback.format_exc()}")
|
|
simfile_object, _ = SimfileObject.get_or_create(
|
|
path=self.path,
|
|
kind=self.kind,
|
|
simfile_title=self.simfile_title,
|
|
chart_stepstype=self.chart_stepstype,
|
|
chart_meter=self.chart_meter,
|
|
chart_index=self.chart_index,
|
|
)
|
|
SimfileError.create(
|
|
action=action,
|
|
traceback=traceback.format_exc(),
|
|
simfile_object=simfile_object,
|
|
run=self.run,
|
|
)
|
|
|
|
@contextmanager
|
|
def _update(self, **kwargs) -> Iterator["SimfileContext"]:
|
|
yield dataclasses.replace(self, **kwargs)
|
|
|
|
@contextmanager
|
|
def object(self, kind, **kwargs) -> Iterator["SimfileContext"]:
|
|
with self._update(kind=kind, **kwargs) as context:
|
|
SimfileObject.get_or_create(
|
|
kind=kind,
|
|
path=context.path,
|
|
simfile_title=context.simfile_title,
|
|
chart_stepstype=context.chart_stepstype,
|
|
chart_meter=context.chart_meter,
|
|
chart_index=context.chart_index,
|
|
)
|
|
yield context
|
|
|
|
@contextmanager
|
|
def perform(self, action: str, **kwargs) -> Iterator["SimfileContext"]:
|
|
with self._update(**kwargs) as context:
|
|
try:
|
|
yield context
|
|
except Exception:
|
|
context.report_error(action)
|
|
|
|
|
|
class SmoketestRun:
|
|
args: SmoketestArgs
|
|
run: Run
|
|
|
|
def __init__(self, args: SmoketestArgs):
|
|
self.args = args
|
|
self.run = Run.create(
|
|
created=datetime.now(),
|
|
simfile_version=simfile.__version__,
|
|
msdparser_version=msdparser.__version__,
|
|
smoketest_version=__version__,
|
|
)
|
|
|
|
def process_songs_dir(self, songs_dir: str):
|
|
for entry in os.scandir(songs_dir):
|
|
if entry.is_dir():
|
|
self.process_pack(entry.path)
|
|
|
|
def process_pack(self, pack_dir: str):
|
|
root_context = SimfileContext(run=self.run, path=pack_dir)
|
|
with root_context.object("SimfilePack", path=pack_dir) as context:
|
|
print(f"Processing pack {pack_dir}")
|
|
pack = SimfilePack(pack_dir)
|
|
|
|
with context.perform("simfile_dirs") as context:
|
|
for simfile_dir in pack.simfile_dirs():
|
|
if simfile_dir.sm_path:
|
|
self._open_simfile(context, simfile_dir.sm_path)
|
|
if simfile_dir.ssc_path:
|
|
self._open_simfile(context, simfile_dir.ssc_path)
|
|
|
|
def _open_simfile(self, context: SimfileContext, simfile_path: str):
|
|
if self.args.new_only:
|
|
if SimfileObject.get_or_none(SimfileObject.path == simfile_path):
|
|
return
|
|
elif self.args.error_only:
|
|
if not (
|
|
SimfileError.select()
|
|
.join(SimfileObject)
|
|
.where(SimfileObject.path == simfile_path) # type: ignore
|
|
.get_or_none()
|
|
):
|
|
return
|
|
|
|
with context.perform("simfile.open", path=simfile_path) as context:
|
|
sim = simfile.open(simfile_path)
|
|
|
|
with context.object(
|
|
kind=sim.__class__.__name__,
|
|
simfile_title=sim.title,
|
|
) as context:
|
|
self._test_simfile(context, sim)
|
|
|
|
def _test_simfile(self, context: SimfileContext, sim: Simfile):
|
|
with context.perform("TimingData") as context:
|
|
TimingData(sim)
|
|
|
|
with context.perform("displaybpm") as context:
|
|
displaybpm(sim)
|
|
|
|
with context.perform("charts") as context:
|
|
for c, chart_ in enumerate(sim.charts):
|
|
chart: Chart = chart_ # typing workaround
|
|
with context.object(
|
|
chart.__class__.__name__,
|
|
chart_stepstype=chart.stepstype,
|
|
chart_meter=chart.meter,
|
|
chart_index=c,
|
|
) as context:
|
|
self._test_chart(context, sim, chart)
|
|
|
|
def _test_chart(self, context: SimfileContext, sim: Simfile, chart: Chart):
|
|
td = None
|
|
if isinstance(chart, SSCChart):
|
|
with context.perform("TimingData+ssc") as context:
|
|
td = TimingData(chart)
|
|
with context.perform("displaybpm+ssc") as context:
|
|
displaybpm(sim)
|
|
else:
|
|
# We've already performed this action. Don't double-report any error
|
|
try:
|
|
td = TimingData(sim)
|
|
except Exception:
|
|
pass
|
|
|
|
with context.perform("NoteData") as context:
|
|
nd = NoteData(chart)
|
|
|
|
with context.perform("group_notes") as context:
|
|
for _ in group_notes(
|
|
iter(nd),
|
|
join_heads_to_tails=True,
|
|
same_beat_notes=SameBeatNotes.JOIN_ALL,
|
|
):
|
|
pass
|
|
|
|
if td:
|
|
with context.perform("time_notes") as context:
|
|
for _ in time_notes(nd, td):
|
|
pass
|