Source code for neo_jax.control

"""Control file parsing for NEO_JAX."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional


[docs] @dataclass(frozen=True) class ControlParams: in_file: str out_file: str fluxs_arr: Optional[List[int]] theta_n: int phi_n: int max_m_mode: int max_n_mode: int npart: int multra: int acc_req: float no_bins: int nstep_per: int nstep_min: int nstep_max: int calc_nstep_max: int eout_swi: int lab_swi: int inp_swi: int ref_swi: int write_progress: int write_output_files: int spline_test: int write_integrate: int write_diagnostic: int calc_cur: int cur_file: str npart_cur: int alpha_cur: float write_cur_inte: int
def _read_lines(path: Path) -> List[str]: lines: List[str] = [] for raw in path.read_text().splitlines(): line = raw.strip() if line: lines.append(line) return lines def _parse_lines(lines: List[str]) -> ControlParams: idx = 0 in_file = lines[idx] idx += 1 out_file = lines[idx] idx += 1 no_fluxs = int(lines[idx]) idx += 1 fluxs_arr: Optional[List[int]] if no_fluxs <= 0: if idx < len(lines): idx += 1 fluxs_arr = None else: fluxs_arr = [int(v) for v in lines[idx].split()] idx += 1 theta_n = int(lines[idx]); idx += 1 phi_n = int(lines[idx]); idx += 1 max_m_mode = int(lines[idx]); idx += 1 max_n_mode = int(lines[idx]); idx += 1 npart = int(lines[idx]); idx += 1 multra = int(lines[idx]); idx += 1 acc_req = float(lines[idx]); idx += 1 no_bins = int(lines[idx]); idx += 1 nstep_per = int(lines[idx]); idx += 1 nstep_min = int(lines[idx]); idx += 1 nstep_max = int(lines[idx]); idx += 1 calc_nstep_max = int(lines[idx]); idx += 1 eout_swi = int(lines[idx]); idx += 1 lab_swi = int(lines[idx]); idx += 1 inp_swi = int(lines[idx]); idx += 1 ref_swi = int(lines[idx]); idx += 1 write_progress = int(lines[idx]); idx += 1 write_output_files = int(lines[idx]); idx += 1 spline_test = int(lines[idx]); idx += 1 write_integrate = int(lines[idx]); idx += 1 write_diagnostic = int(lines[idx]); idx += 1 idx += 3 calc_cur = int(lines[idx]); idx += 1 cur_file = lines[idx]; idx += 1 npart_cur = int(lines[idx]); idx += 1 alpha_cur = float(lines[idx]); idx += 1 write_cur_inte = int(lines[idx]); idx += 1 return ControlParams( in_file=in_file, out_file=out_file, fluxs_arr=fluxs_arr, theta_n=theta_n, phi_n=phi_n, max_m_mode=max_m_mode, max_n_mode=max_n_mode, npart=npart, multra=multra, acc_req=acc_req, no_bins=no_bins, nstep_per=nstep_per, nstep_min=nstep_min, nstep_max=nstep_max, calc_nstep_max=calc_nstep_max, eout_swi=eout_swi, lab_swi=lab_swi, inp_swi=inp_swi, ref_swi=ref_swi, write_progress=write_progress, write_output_files=write_output_files, spline_test=spline_test, write_integrate=write_integrate, write_diagnostic=write_diagnostic, calc_cur=calc_cur, cur_file=cur_file, npart_cur=npart_cur, alpha_cur=alpha_cur, write_cur_inte=write_cur_inte, ) def read_control(path: str | Path) -> ControlParams: lines = _read_lines(Path(path)) last_err: Exception | None = None for offset in range(0, 6): try: return _parse_lines(lines[offset:]) except (ValueError, IndexError) as exc: last_err = exc continue raise ValueError(f"Failed to parse control file {path}") from last_err