Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K-Modulation Importer: fixed main function call #482

Merged
merged 5 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# OMC3 Changelog

#### 2025-03-03 - v0.21.1 - _fscarlier_

- Fixed:
- Correct function call in `omc3.kmod_importer` when running as script.
- Fixed k-mod import for single-IP imports.

#### 2025-01-23 - v0.21.0 - _jdilly_, _fscarlier_, _fesoubel_

- Fixed:
Expand Down
2 changes: 1 addition & 1 deletion omc3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
__title__ = "omc3"
__description__ = "An accelerator physics tools package for the OMC team at CERN."
__url__ = "https://github.com/pylhc/omc3"
__version__ = "0.21.0"
__version__ = "0.21.1"
__author__ = "pylhc"
__author_email__ = "pylhc@github.com"
__license__ = "MIT"
Expand Down
111 changes: 59 additions & 52 deletions omc3/kmod_importer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""
Full Import of K-Modulation Results
-----------------------------------

Expand All @@ -9,11 +9,11 @@
The results are first sorted by IP and averaged. The averaged results are
written into a sub-folder of the given `output_dir`.

If data for both beams is present, these averages are then used to calculate the
luminosity imbalance between each combination of IPs.
If data for both beams is present, these averages are then used to calculate the
luminosity imbalance between each combination of IPs.
These results are again written out into the same sub-folder of the given `output_dir`.

Finally, the averaged results for the given `beam` are then written out into
Finally, the averaged results for the given `beam` are then written out into
the `beta_kmod` and `betastar` tfs-files in the `output_dir`.


Expand All @@ -23,7 +23,7 @@

- **meas_paths** *(PathOrStr)*:

Directories of K-modulation results to import.
Directories of K-modulation results to import.
These need to be the paths to the root-folders containing B1 and B2 sub-dirs.


Expand All @@ -35,7 +35,7 @@
- **beam** *(int)*:

Beam for which to import.


- **output_dir** *(PathOrStr)*:

Expand Down Expand Up @@ -91,34 +91,34 @@
def _get_params():
"""
Creates and returns the parameters for the kmod_output function.

"""
params = EntryPointParameters()
params.add_parameter(
name="meas_paths",
required=True,
nargs='+',
nargs="+",
type=PathOrStr,
help="Directories of K-modulation results to import. "
"These need to be the paths to the root-folders containing B1 and B2 sub-dirs."
"These need to be the paths to the root-folders containing B1 and B2 sub-dirs.",
)
params.add_parameter(
name="model",
required=True,
type=PathOrStr,
help="Path to the model."
help="Path to the model.",
)
params.add_parameter(
name="beam",
required=True,
type=int,
help="Beam for which to import."
help="Beam for which to import.",
)
params.add_parameter(
name="output_dir",
type=PathOrStr,
required=True,
help="Path to the directory where to write the output files."
help="Path to the directory where to write the output files.",
)
params.add_parameter(
name="show_plots",
Expand All @@ -132,32 +132,32 @@ def _get_params():
def import_kmod_results(opt: DotDict) -> None:
"""
Performs the full import procedure of the "raw" K-Modulation results.

Args:
meas_paths (Sequence[Path|str]):
Directories of K-modulation results to import.
These need to be the paths to the root-folders containing B1 and B2 sub-dirs.

model (Path|str):
Path to the model Twiss file.

beam (int):
Beam for which to import.

output_dir (Path|str):
Path to the output directory, i.e. the optics-measurement directory
Path to the output directory, i.e. the optics-measurement directory
into which to import these K-Modulation results.

show_plots (bool):
If True, show the plots. Default: False.


Returns:
Dictionary of kmod-DataFrames by planes.
"""
LOG.info("Starting full K-modulation import.")

# Prepare IO ---
# Prepare IO ---
opt.output_dir = Path(opt.output_dir)
opt.output_dir.mkdir(exist_ok=True)
save_config(opt.output_dir, opt, __file__)
Expand All @@ -166,27 +166,25 @@ def import_kmod_results(opt: DotDict) -> None:
average_output_dir.mkdir(exist_ok=True)

df_model = read_model_df(opt.model)

# Perform averaging and import ---
averaged_results = average_all_results(
meas_paths=opt.meas_paths,
df_model=df_model,
beam=opt.beam,
output_dir=average_output_dir
meas_paths=opt.meas_paths,
df_model=df_model,
beam=opt.beam,
output_dir=average_output_dir,
)

calculate_all_lumi_imbalances(
averaged_results,
df_model=df_model,
output_dir=average_output_dir
averaged_results, df_model=df_model, output_dir=average_output_dir
)

results_list = [
df
for ip in averaged_results.keys()
df
for ip in averaged_results.keys()
for df in (
averaged_results[ip][0], # beta-star results
averaged_results[ip][opt.beam] # bpm results of the specific beam
averaged_results[ip][0], # beta-star results
averaged_results[ip][opt.beam], # bpm results of the specific beam
)
]
import_kmod_data(
Expand All @@ -199,24 +197,25 @@ def import_kmod_results(opt: DotDict) -> None:

# Averaging ---


def average_all_results(
meas_paths: Sequence[Path | str],
df_model: tfs.TfsDataFrame,
beam: int,
output_dir: Path | str,
show_plots: bool = False,
) -> dict[str, dict[int, tfs.TfsDataFrame]]:
""" Averages all kmod results.
"""Averages all kmod results.

Args:
meas_paths (Sequence[Path | str]): Paths to the K-modulation results.
df_model (tfs.TfsDataFrame): DataFrame with the model.
beam (int): Beam for which to average.
meas_paths (Sequence[Path | str]): Paths to the K-modulation results.
df_model (tfs.TfsDataFrame): DataFrame with the model.
beam (int): Beam for which to average.
output_dir (Path | str, optional): Path to the output directory. Defaults to None.
show_plots (bool, optional): If True, show the plots. Defaults to False.

Returns:
dict[int, tfs.TfsDataFrame]: Averaged kmod results, sorted by IP.
dict[int, tfs.TfsDataFrame]: Averaged kmod results, sorted by IP.
"""
sorted_paths = _sort_paths_by_ip(meas_paths, beam)

Expand All @@ -230,16 +229,18 @@ def average_all_results(
meas_paths=paths,
output_dir=output_dir,
plot=True,
show_plots=show_plots
show_plots=show_plots,
)
averaged_results[ip] = average

return averaged_results


def _sort_paths_by_ip(paths: Sequence[str | Path], beam: int) -> dict[str, list[str | Path]]:
""" Sorts the kmod results files by IP.

def _sort_paths_by_ip(
paths: Sequence[str | Path], beam: int
) -> dict[str, list[str | Path]]:
"""Sorts the kmod results files by IP.

Identification of the IP is done by reading the `lsa_results.tfs` files.
"""
sorted_paths = defaultdict(list)
Expand All @@ -255,33 +256,39 @@ def _sort_paths_by_ip(paths: Sequence[str | Path], beam: int) -> dict[str, list[

# Lumi Imbalance ---


def calculate_all_lumi_imbalances(
averaged_results: dict[str, dict[int, tfs.TfsDataFrame]],
averaged_results: dict[str, dict[int, tfs.TfsDataFrame]],
df_model: tfs.TfsDataFrame,
output_dir: Path | str = None
output_dir: Path | str = None,
) -> None:
""" Calculates the luminosity imbalance between two IPs.
"""Calculates the luminosity imbalance between two IPs.

Args:
averaged_results (dict[str, dict[int, tfs.TfsDataFrame]]): Averaged kmod results, sorted by IP.
df_model (tfs.TfsDataFrame): DataFrame with the model.
averaged_results (dict[str, dict[int, tfs.TfsDataFrame]]): Averaged kmod results, sorted by IP.
df_model (tfs.TfsDataFrame): DataFrame with the model.
output_dir (Path | str, optional): Path to the output directory. Defaults to None.

Returns:
tfs.TfsDataFrame: DataFrame with the luminosity imbalance.
"""
sets_of_ips = list(combinations(averaged_results.keys(), 2))
for (ipA, ipB) in sets_of_ips:
for ipA, ipB in sets_of_ips:
LOG.debug(f"Calculating lumi imbalance between {ipA} and {ipB}")
betastar = _get_betastar(df_model, ipA) # does not really matter which IP, for output name only
betastar = _get_betastar(
df_model, ipA
) # does not really matter which IP, for output name only

# Calculate luminosity imbalance
data = {ip.lower(): averaged_results[ip][0] for ip in (ipA, ipB)}
try:
df = calculate_lumi_imbalance(**data, output_dir=output_dir, betastar=betastar)
except KeyError as e:
# Most likely because not all data available (e.g. only one beam).
LOG.debug(f"Could not calculate lumi imbalance between {ipA} and {ipB}. Skipping.", exc_info=e)
LOG.debug(
f"Could not calculate lumi imbalance between {ipA} and {ipB}. Skipping.",
exc_info=e,
)
continue

# Print luminosity imbalance
Expand All @@ -297,4 +304,4 @@ def _get_betastar(df_model: tfs.TfsDataFrame, ip: str) -> list[float, float]:
# Script Mode ------------------------------------------------------------------

if __name__ == "__main__":
import_kmod_data()
import_kmod_results()
2 changes: 1 addition & 1 deletion omc3/scripts/kmod_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def convert_betastar_results(
kmod_results = kmod_results.drop(columns=[BEAM])
except KeyError:
# already as index
kmod_results = kmod_results.loc[beam, :]
kmod_results = kmod_results.loc[[beam], :]

kmod_results = kmod_results.set_index(NAME, drop=True)

Expand Down
42 changes: 23 additions & 19 deletions tests/unit/test_kmod_importer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from pathlib import Path
import pytest
import tfs

from omc3.kmod_importer import AVERAGE_DIR, import_kmod_results
from omc3.optics_measurements.constants import EXT
from tests.conftest import assert_tfsdataframe_equal
from tests.conftest import assert_tfsdataframe_equal, ids_str
from tests.unit.test_kmod_averaging import (
get_all_tfs_filenames as _get_averaged_filenames,
)
Expand All @@ -26,13 +27,15 @@
# Tests ---

@pytest.mark.basic
@pytest.mark.parametrize('beam', [1, 2])
def test_full_kmod_import_beam(tmp_path, beam):
beta=get_betastar_model(beam=beam, ip=1)[0]
@pytest.mark.parametrize('beam', [1, 2], ids=ids_str("b{}"))
@pytest.mark.parametrize('ips', ["1", "15"], ids=ids_str("ip{}"))
def test_full_kmod_import(tmp_path: Path, beam: int, ips: str):
ips = [int(ip) for ip in ips]
beta=get_betastar_model(beam=beam, ip=ips[0])[0]

# Run the import ---
import_kmod_results(
meas_paths=[get_measurement_dir(ip=ip, i_meas=i) for ip in (1, 5) for i in range(1, 3)],
meas_paths=[get_measurement_dir(ip=ip, i_meas=i) for ip in ips for i in range(1, 3)],
beam=beam,
model=get_model_path(beam),
output_dir=tmp_path,
Expand All @@ -44,26 +47,27 @@ def test_full_kmod_import_beam(tmp_path, beam):
average_dir = tmp_path / AVERAGE_DIR

assert average_dir.is_dir()
assert len(list(average_dir.glob("*.pdf"))) == 6 # beta, beat and waist per IP
assert len(list(average_dir.glob(f"*{EXT}"))) == 7 # AV_BPM: 2*BEAM + 2*IP, AV_BETASTAR: 2*IP, Effective: 1
assert len(list(average_dir.glob("*.pdf"))) == 3 * len(ips) # beta, beat and waist per IP
assert len(list(average_dir.glob(f"*{EXT}"))) == 3 * len(ips) + (len(ips) == 2) # AV_BPM: N_BEAM*N_IP, AV_BETASTAR: N_IPs, Effective: 1 (only when both)

# Check the content ---
# averages --
for ip in (1, 5):
for ip in ips:
for out_name in _get_averaged_filenames(ip, beta=beta):
out_file = tfs.read(average_dir / out_name)
ref_file = tfs.read(get_reference_dir(ip, n_files=2) / out_name)
assert_tfsdataframe_equal(out_file, ref_file, check_like=True)

# lumi --
eff_betas = tfs.read(average_dir / _get_lumi_filename(beta))
eff_betas_ref = tfs.read(REFERENCE_DIR / _get_lumi_filename(beta))
assert_tfsdataframe_equal(eff_betas_ref, eff_betas, check_like=True)

# import --
for plane in "xy":
for ref_path in (_get_bpm_reference_path(beam, plane), _get_betastar_reference_path(beam, plane)):
ref_file = tfs.read(ref_path)
out_file = tfs.read(tmp_path / ref_path.name)
assert_tfsdataframe_equal(ref_file, out_file, check_like=True)

if len(ips) > 1:
# lumi --
eff_betas = tfs.read(average_dir / _get_lumi_filename(beta))
eff_betas_ref = tfs.read(REFERENCE_DIR / _get_lumi_filename(beta))
assert_tfsdataframe_equal(eff_betas_ref, eff_betas, check_like=True)

# import (reference created with IP1 and IP5) --
for plane in "xy":
for ref_path in (_get_bpm_reference_path(beam, plane), _get_betastar_reference_path(beam, plane)):
ref_file = tfs.read(ref_path)
out_file = tfs.read(tmp_path / ref_path.name)
assert_tfsdataframe_equal(ref_file, out_file, check_like=True)