Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43722 Implement Transformed EFD service #72

Draft
wants to merge 113 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
39a44ee
Implement initial EFD transformations
glaubervila May 27, 2024
56032c4
Update missing config values
rcboufleur May 27, 2024
85c99c7
Fix exception for unimplemented dialect
rcboufleur May 27, 2024
445a646
Refactor Summary for column operations
rcboufleur May 27, 2024
239552b
Refactor Aggregate class to Summary for better clarity and consistency
rcboufleur May 27, 2024
e013d78
Refactor ATAOS_correctionOffsets_w function to use mean
rcboufleur May 27, 2024
8d3aac4
Update ExposureEfd schema for config_LATISS
glaubervila May 27, 2024
62cce83
Add VisitEFD table and fix empty values
glaubervila May 27, 2024
b0a241c
Batch upsert transactions; add DAO docstrings
glaubervila Jun 3, 2024
5e70961
Refactor configuration file loading/validation
glaubervila Jun 3, 2024
c9600d6
Update InfluxDB API for topic queries
glaubervila Jun 24, 2024
677b569
Fix lint
glaubervila Jun 24, 2024
91eeabb
Insert additional config parameters
rcboufleur Jun 24, 2024
e3fe1e1
Add Dockerfile for Efd Transform
glaubervila Jun 27, 2024
db3553d
Add main command to Dockerfile
glaubervila Jun 27, 2024
4b45179
Implement InfluxDB packed time series retrieval using API query
rcboufleur Aug 12, 2024
dea0588
Fix lint
rcboufleur Aug 12, 2024
c50dc15
Implement new config formats and validation
rcboufleur Aug 21, 2024
f40b4fc
Implement unique topic queries
rcboufleur Aug 21, 2024
203a81f
Change base image to w_2024_33
glaubervila Aug 21, 2024
a474636
Use env variables for usdf_efd API
glaubervila Aug 21, 2024
46b6df5
Add copy of SQLite test database
glaubervila Aug 21, 2024
3f8bf52
Refactor config transformations
rcboufleur Aug 26, 2024
8982aa6
Remove template envs
glaubervila Aug 26, 2024
68f1612
Refactor Dockerfile
glaubervila Aug 26, 2024
91fbaf1
Reorganize transform_efd and test files
glaubervila Aug 26, 2024
06dc944
Update Dockerfile to the new structure
glaubervila Aug 26, 2024
6b25a3a
Account for computation warnings
rcboufleur Aug 26, 2024
4667712
Fix lint
rcboufleur Aug 26, 2024
9f58225
Add ESS accelerometer fields
rcboufleur Aug 26, 2024
43c6606
Implement query segmentation to account for large number of topic fields
rcboufleur Aug 26, 2024
1d9005d
Fix typo in configuration file
rcboufleur Aug 26, 2024
e39dda8
Make start/end dates optional
glaubervila Sep 30, 2024
8434016
Update summary functions
rcboufleur Oct 14, 2024
445a52e
Fix time format definitions
rcboufleur Oct 14, 2024
4b0552c
Change config file path
glaubervila Oct 14, 2024
69bd2c4
Test PostgreSQL consdb connection
glaubervila Oct 15, 2024
8c948eb
Fix missing schema name in transformed_efd inserts
glaubervila Oct 16, 2024
b0b3909
Add logs and test database tables access
glaubervila Oct 16, 2024
f85f6e5
Fix SQLAlchemy table retrieval
glaubervila Oct 16, 2024
3d2e100
Fix time-aware index comparisons
rcboufleur Oct 16, 2024
0a1b265
Fix SQLAlchemy table for SQLite
glaubervila Oct 16, 2024
f5cb8e8
Refactor column and topic mapping
rcboufleur Oct 17, 2024
055e502
Fix lint and resolve conflicts
glaubervila Oct 18, 2024
4e57f01
Create schema generator from config
rcboufleur Oct 24, 2024
7ca7d07
Update temporary files
rcboufleur Oct 24, 2024
8702d1b
Add column to generate_schema.py
rcboufleur Oct 24, 2024
b998958
Add column to generate_schema
rcboufleur Oct 24, 2024
9b7afef
Add queue manager for periodic execution control
glaubervila Oct 24, 2024
52743f9
Implement minor code changes
glaubervila Oct 24, 2024
613f538
Include last-minute transformation function
rcboufleur Oct 24, 2024
08a33b0
Generate new config files
rcboufleur Oct 24, 2024
2fdaf62
Update instruments in testing files
rcboufleur Oct 25, 2024
a91b219
Update schema structures
rcboufleur Oct 25, 2024
7d480a2
Add a new execution task when all existing tasks were succesfully run
glaubervila Oct 25, 2024
de88cef
Fix bug in creating new tasks
glaubervila Oct 25, 2024
8c19b23
Update bash run script
glaubervila Nov 6, 2024
5075d2e
Add permissions to file
glaubervila Nov 6, 2024
3445821
Fix Dockerfile
glaubervila Nov 6, 2024
5028cc4
Implement diferent schemas per instrument
glaubervila Nov 7, 2024
e4b4327
Update schema handling by instrument
rcboufleur Nov 7, 2024
1136432
Fix YAML extensions
rcboufleur Nov 7, 2024
f2ae460
Fix hardcoded Queue manager schema
glaubervila Nov 7, 2024
204d14d
Add timewindow to queue manager
glaubervila Nov 8, 2024
972bdb8
Add Timewindow column YAML files
rcboufleur Nov 8, 2024
8fde69a
Update schema YAML file generator
rcboufleur Nov 8, 2024
21030f7
Update test files
rcboufleur Nov 8, 2024
1b48399
Update Dockerfile
rcboufleur Nov 22, 2024
5f343f7
Update configuration files
rcboufleur Dec 8, 2024
2ccb099
Update schema YAML files
rcboufleur Dec 8, 2024
6cdabbc
Implement unpivoted tables
rcboufleur Dec 8, 2024
c4c80c1
Update column name in unpivoted tables
rcboufleur Dec 8, 2024
05db0ee
Fix lint
rcboufleur Dec 8, 2024
950746b
Update transformations file
rcboufleur Dec 8, 2024
76facd1
Fix column name bug in unpivoted data
rcboufleur Dec 8, 2024
3ec3bff
Realocate local test files
rcboufleur Dec 8, 2024
041e37a
Fix pytest for config_model
rcboufleur Dec 8, 2024
a97b44e
Fix bug in processing unpivoted results
rcboufleur Dec 8, 2024
647d2a0
Update local test files
rcboufleur Dec 9, 2024
b958aa5
Fix trailing spaces
rcboufleur Dec 9, 2024
10d5c2a
Fix configuration files and schemas
rcboufleur Dec 9, 2024
23de7fd
Update Dockerfile
rcboufleur Dec 9, 2024
c47e9f0
Update import path for future compatibility
rcboufleur Jan 16, 2025
ea5ea41
Fix typo in column name
rcboufleur Jan 16, 2025
4a7c2a7
Refactor pytest for config_model
rcboufleur Jan 16, 2025
a8ae1fe
Implement pytest for generate_schema
rcboufleur Jan 16, 2025
96ac4be
Implement pytest for queue_manager
rcboufleur Jan 16, 2025
5cd4684
Implement pytest for summary methods
rcboufleur Jan 16, 2025
eb73c39
Update temporary files used in local testing
rcboufleur Jan 16, 2025
e730892
Update Dockerfile configurations
rcboufleur Jan 16, 2025
d8cbbe3
Update PYTHONPATH in Dockerfile
rcboufleur Jan 16, 2025
2349cae
Fix LATISS schema tables
rcboufleur Jan 16, 2025
e8cdbc1
Fix lint errors
rcboufleur Jan 16, 2025
649a1d5
Update temporary files (local run)
rcboufleur Jan 16, 2025
b6bc6d0
Fix linting issues across the codebase
rcboufleur Jan 17, 2025
90dd9d8
Fix pre-commit issues across the transform_efd codebase
rcboufleur Jan 17, 2025
02d67ca
Add bulter_repo column to scheduler table
rcboufleur Jan 27, 2025
968cd21
Extend queue_manager and TransformDB methods
rcboufleur Jan 27, 2025
e7abad1
Refactor and update jobs and cronjobs workflow
rcboufleur Jan 27, 2025
7a78c91
Update schema yaml files
rcboufleur Jan 27, 2025
a4c906d
Fix efdtransform entry at .github/workflows/build.yaml
rcboufleur Jan 27, 2025
363946e
Pause cronjob from picking up failed jobs
rcboufleur Jan 30, 2025
511b670
Implement mutable tables, task duplication handling, and a new query …
rcboufleur Feb 3, 2025
93729c1
Ignore local testing files in git
rcboufleur Mar 17, 2025
22c966f
Refactor file names and locations
rcboufleur Mar 17, 2025
6be23c8
Add Alembic initial migration
rcboufleur Mar 17, 2025
36e31cb
Stop tracking .vscode directory
rcboufleur Mar 24, 2025
b4e930b
Update python version in pre commit config
rcboufleur Mar 24, 2025
5b21612
Conform EOF standards
rcboufleur Mar 24, 2025
011339c
Refactor environment variables in Dockerfile to use one per line with…
rcboufleur Mar 24, 2025
f60a3f1
Add license preamble to all Python files
rcboufleur Mar 24, 2025
aa6390b
Add @classmethod to validate_tables_when_unpivoted
rcboufleur Mar 24, 2025
bad8fbd
Refactor 'con' as 'connexion' for clarity and consistency. Fix flake8…
rcboufleur Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor file names and locations
rcboufleur committed Mar 24, 2025
commit 22c966f147939400d6fed697b47d07aed380d9d4
4 changes: 4 additions & 0 deletions alembic.ini
Original file line number Diff line number Diff line change
@@ -136,3 +136,7 @@ consdb.schema_name = cdb_startrackernarrow
[startrackerfast]
script_location = alembic/startrackerfast
consdb.schema_name = cdb_startrackerfast

[transformed_efd_latiss]
script_location = alembic/transformed_efd_latiss
consdb.schema_name = efd_latiss
212 changes: 212 additions & 0 deletions alembic/transformed_efd_latiss/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import logging
import os
import shutil
import sys
from logging.config import fileConfig
from pathlib import Path

import yaml
from dotenv import load_dotenv
from felis.datamodel import Schema
from felis.metadata import MetaDataBuilder
from lsst.consdb.transformed_efd.generate_schema_from_config import generate_schema
from sqlalchemy import engine_from_config, pool

from alembic import context
from alembic.script import ScriptDirectory

# Load environment variables
# TODO: Substitute by the default env values
load_dotenv()

# Global variables
new_revision_id = None
schema_name = None

# Configuration paths
CONFIG_LATISS_PATH = Path(os.getenv("CONFIG_LATISS_PATH"))
SNAPSHOT_DIR = os.path.join(os.path.dirname(__file__), "versions", "config_snapshots")

# Ensure snapshot directory exists
os.makedirs(SNAPSHOT_DIR, exist_ok=True)

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)

logger = logging.getLogger("alembic.env")

# Set the consdb connection string in the config, overriding the dummy value
consdb_url = os.getenv("CONSDB_URL")
if consdb_url is None:
raise ValueError("CONSDB_URL not found in environment")
config.set_main_option("sqlalchemy.url", consdb_url)
logger.info(f"Using connection string: {consdb_url}")

# Schema setup
schema_name = config.get_main_option("consdb.schema_name")


# Function to generate schema from config
def prepare_schema(config_path):
"""Generate schema from config and return MetaData."""
schema_path = generate_schema(config_path=config_path, instrument="latiss")
return schema_path


# Function to save config snapshot for a migration
def save_config_snapshot(revision):
"""Save config_latiss.yaml snapshot for a migration."""
snapshot_path = os.path.join(SNAPSHOT_DIR, f"{revision}_config_latiss.yaml")
shutil.copy(CONFIG_LATISS_PATH, snapshot_path)
logger.info(f"Saved config snapshot: {snapshot_path}")


# Function to restore config snapshot and create a backup
def restore_config_snapshot(revision):
"""Restore config_latiss.yaml from a migration snapshot and create a backup."""
snapshot_path = os.path.join(SNAPSHOT_DIR, f"{revision}_config_latiss.yaml")

if os.path.exists(snapshot_path):
# Backup existing config file before restore
if os.path.exists(CONFIG_LATISS_PATH):
backup_filename = "previous_" + os.path.basename(CONFIG_LATISS_PATH)
backup_path = os.path.join(os.path.dirname(CONFIG_LATISS_PATH), backup_filename)
shutil.copy(CONFIG_LATISS_PATH, backup_path) # Backup the current config file
logger.info(f"Backup created: {backup_path}")

# Restore from snapshot
shutil.copy(snapshot_path, CONFIG_LATISS_PATH)
logger.info(f"Restored config from snapshot: {snapshot_path}")
else:
logger.warning(f"Snapshot not found for revision: {revision}")


# Function to get config path (current or snapshot)
def get_config_path(revision, is_downgrade):
"""Return path to config_latiss.yaml (current or snapshot)."""
if is_downgrade:
return os.path.join(SNAPSHOT_DIR, f"{revision}_config_latiss.yaml")
return CONFIG_LATISS_PATH


# Function to include schema based on name and type
def include_name(name, type_, parent_names):
"""Filter schemas by name."""
if type_ == "schema":
return name == schema_name
return True


# Function to include objects based on table name
def include_object(object, name, type_, reflected, compare_to):
"""Exclude specific tables (ccdvisit1, visit1)."""
if type_ == "table" and name in ["ccdvisit1", "visit1"]:
logger.info(f"Excluding table {object.schema}.{name}")
return False
return True


# Function to handle revision directives
def process_revision_directives(context, revision, directives):
"""Store generated revision ID."""
global new_revision_id
script = directives[0]
new_revision_id = script.rev_id # Store the generated revision ID


# Function to resolve revision from label (head or absolute ID)
def resolve_revision(rev_label):
"""Convert Alembic revision label (absolute revision ID or 'head') into a revision ID."""
script = ScriptDirectory.from_config(config)

if rev_label == "head":
try:
return script.get_revisions("head")[0].revision
except IndexError:
raise ValueError("No revisions found in the migration history.")

try:
return script.get_revision(rev_label).revision
except Exception:
raise ValueError(f"Invalid revision: {rev_label}")


# Prepare the schema
schema_path = prepare_schema(CONFIG_LATISS_PATH)
logger.info(f"Using schema path: {schema_path}")

# Load schema data from the generated YAML
yaml_data = yaml.safe_load(open(schema_path, "r"))

# Create schema and metadata objects
schema = Schema.model_validate(yaml_data)
schema_metadata = MetaDataBuilder(schema).build()

# Log schema metadata
logger.info(f"Schema {schema_metadata.schema} loaded successfully")


# Function to run migrations in online mode
def run_migrations_online() -> None:
"""Run migrations in 'online' mode with database connection."""
global new_revision_id

# Handle revision operations (upgrade/downgrade)
if "downgrade" in sys.argv:
target_rev_index = sys.argv.index("downgrade") + 1
if target_rev_index < len(sys.argv):
target_revision_id = resolve_revision(sys.argv[target_rev_index])
restore_config_snapshot(target_revision_id)
else:
raise ValueError("Target revision not specified for downgrade")

if "upgrade" in sys.argv:
target_rev_index = sys.argv.index("upgrade") + 1
if target_rev_index < len(sys.argv):
target_revision_id = resolve_revision(sys.argv[target_rev_index])
restore_config_snapshot(target_revision_id)
else:
raise ValueError("Target revision not specified for upgrade")

# Set up database engine and connection
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=schema_metadata,
include_schemas=True,
include_name=include_name,
include_object=include_object,
version_table=f"{schema_name}_version",
version_table_schema=schema_name,
process_revision_directives=process_revision_directives,
)

# Run migrations within transaction
with context.begin_transaction():
context.run_migrations()

# Log new revision and snapshot if applicable
if new_revision_id:
print(f"New revision ID: {new_revision_id}")
if "--autogenerate" in sys.argv:
logger.info(f"Autogenerating revision: {new_revision_id}")
save_config_snapshot(new_revision_id)


# Main script execution
if context.is_offline_mode():
logger.warning("Offline mode is not supported for this migration.")
else:
run_migrations_online()
26 changes: 26 additions & 0 deletions alembic/transformed_efd_latiss/script.py.mako
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}
Loading