Skip to content

Commit

Permalink
Index driver abstraction (#1020)
Browse files Browse the repository at this point in the history
* Add index driver abstraction layer.

* Add index driver abstraction layer.

* Add license headers to new files.

* Move SQL files into a driver-specific directory.

* Cleanup commits after debugging.

* Cleanup from Ariana's review comments.

* Fix test mocks.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
SpacemanPaul and pre-commit-ci[bot] authored May 23, 2024
1 parent 85d5266 commit f827516
Show file tree
Hide file tree
Showing 70 changed files with 765 additions and 208 deletions.
10 changes: 10 additions & 0 deletions datacube_ows/index/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0

from .api import ows_index, AbortRun, CoordRange, LayerSignature, LayerExtent


__all__ = ["ows_index", "AbortRun", "CoordRange", "LayerSignature", "LayerExtent"]
179 changes: 179 additions & 0 deletions datacube_ows/index/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0

import dataclasses
from abc import ABC, abstractmethod
from datetime import datetime, date
from typing import NamedTuple, Iterable, Type
from uuid import UUID

from datacube import Datacube
from datacube.index.abstract import AbstractIndex
from datacube.model import Product, Dataset
from odc.geo import Geometry, CRS

from datacube_ows.config_utils import CFG_DICT, ConfigException

TYPE_CHECKING = False
if TYPE_CHECKING:
from datacube_ows.ows_configuration import OWSNamedLayer

class AbortRun(Exception):
pass


@dataclasses.dataclass(frozen=True)
class LayerSignature:
time_res: str
products: tuple[str, ...]
env: str
datasets: int

def as_json(self) -> dict[str, list[str] | str | int]:
return {
"time_res": self.time_res,
"products": list(self.products),
"env": self.env,
"datasets": self.datasets,
}


DateOrDateTime = datetime | date
TimeSearchTerm = tuple[datetime, datetime] | tuple[date, date] | DateOrDateTime


class CoordRange(NamedTuple):
min: float
max: float


class LayerExtent:
def __init__(self, lat: CoordRange, lon: CoordRange, times: list[DateOrDateTime], bboxes: CFG_DICT):
self.lat = lat
self.lon = lon
self.times = times
self.start_time = times[0]
self.end_time = times[-1]
self.time_set = set(times)
self.bboxes = bboxes


class OWSAbstractIndex(ABC):
name: str = ""

# method to delete obsolete schemas etc.
@abstractmethod
def cleanup_schema(self, dc: Datacube):
...

# Schema creation method
@abstractmethod
def create_schema(self, dc: Datacube):
...

# Permission management method
@abstractmethod
def grant_perms(self, dc: Datacube, role: str, read_only: bool = False):
...

# Spatiotemporal index update method (e.g. refresh materialised views)
@abstractmethod
def update_geotemporal_index(self, dc: Datacube):
...

# Range table update method
@abstractmethod
def create_range_entry(self, layer: "OWSNamedLayer", cache: dict[LayerSignature, list[str]]) -> None:
...

# Range table read method
@abstractmethod
def get_ranges(self, layer: "OWSNamedLayer") -> LayerExtent | None:
...

# Spatiotemporal search methods
@abstractmethod
def ds_search(self,
layer: "OWSNamedLayer",
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> Iterable[Dataset]:
...

def dsid_search(self,
layer: "OWSNamedLayer",
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> Iterable[UUID]:
for ds in self.ds_search(layer, times, geom, products):
yield ds.id

def count(self,
layer: "OWSNamedLayer",
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> int:
return len([dsid for dsid in self.dsid_search(layer, times, geom, products)])

def extent(self,
layer: "OWSNamedLayer",
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None,
crs: CRS | None = None
) -> Geometry | None:
if crs is None:
crs = CRS("epsg:4326")
ext: Geometry | None = None
# Accumulate extent in native CRS if possible.
for ds in self.ds_search(layer, times, geom, products):
if ds.extent:
if ds.extent.crs != CRS(layer.native_CRS):
# Reproject to layer "native" CRS if needed.
ds_extent: Geometry = ds.extent.to_crs(layer.native_CRS)
else:
ds_extent = ds.extent
if ext is None:
ext = ds_extent
else:
ext = ext.union(ds_extent)
if ext is not None and crs != CRS(layer.native_CRS):
# Reproject to requested CRS if necessary
return ext.to_crs(crs)
return ext


class OWSAbstractIndexDriver(ABC):
@classmethod
@abstractmethod
def ows_index_class(cls) -> Type[OWSAbstractIndex]:
...

@classmethod
@abstractmethod
def ows_index(cls) -> OWSAbstractIndex:
...


def ows_index(odc: Datacube | AbstractIndex) -> OWSAbstractIndex:
if isinstance(odc, AbstractIndex):
index = odc
else:
index = odc.index
env = index.environment
from datacube_ows.index.driver import ows_index_driver_by_name
if env.index_driver in ('default', 'legacy'):
idx_drv_name = "postgres"
else:
idx_drv_name = env.index_driver
ows_index_driver = ows_index_driver_by_name(idx_drv_name)
if ows_index_driver is None:
raise ConfigException(f"ODC Environment {env._name} uses ODC index driver {env.index_driver} which is "
f"not (yet) supported by OWS.")
return ows_index_driver.ows_index()
55 changes: 55 additions & 0 deletions datacube_ows/index/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0

from threading import Lock
from typing import Optional

from datacube.drivers.driver_cache import load_drivers


TYPE_CHECKING = False
if TYPE_CHECKING:
from datacube_ows.index.api import OWSAbstractIndexDriver

cache_lock = Lock()


class OWSIndexDriverCache:
_instance = None
_initialised = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cache_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, group: str) -> None:
with cache_lock:
if not self._initialised:
self._initialised = True
self._drivers = load_drivers(group)
def __call__(self, name: str) -> Optional["OWSAbstractIndexDriver"]:
"""
:returns: None if driver with a given name is not found
:param name: Driver name
:return: Returns IndexDriver
"""
return self._drivers.get(name, None)

def drivers(self) -> list[str]:
""" Returns list of driver names
"""
return list(self._drivers.keys())


def ows_index_drivers() -> list[str]:
return OWSIndexDriverCache("datacube_ows.plugins.index").drivers()


def ows_index_driver_by_name(name: str) -> Optional["OWSAbstractIndexDriver"]:
return OWSIndexDriverCache("datacube_ows.plugins.index")(name)
5 changes: 5 additions & 0 deletions datacube_ows/index/postgis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0
5 changes: 5 additions & 0 deletions datacube_ows/index/postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0
119 changes: 119 additions & 0 deletions datacube_ows/index/postgres/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# This file is part of datacube-ows, part of the Open Data Cube project.
# See https://opendatacube.org for more information.
#
# Copyright (c) 2017-2024 OWS Contributors
# SPDX-License-Identifier: Apache-2.0

import click

from threading import Lock
from typing import cast, Iterable, Type
from uuid import UUID

from odc.geo import Geometry, CRS
from datacube import Datacube
from datacube.model import Product, Dataset

from datacube_ows.ows_configuration import OWSNamedLayer
from datacube_ows.index.api import OWSAbstractIndex, OWSAbstractIndexDriver, LayerSignature, LayerExtent, TimeSearchTerm
from .product_ranges import create_range_entry as create_range_entry_impl, get_ranges as get_ranges_impl
from .mv_index import MVSelectOpts, mv_search
from .sql import run_sql


class OWSPostgresIndex(OWSAbstractIndex):
name: str = "postgres"

# method to delete obsolete schemas etc.
def cleanup_schema(self, dc: Datacube):
run_sql(dc, "ows_schema/cleanup")

# Schema creation method
def create_schema(self, dc: Datacube):
click.echo("Creating/updating schema and tables...")
run_sql(dc, "ows_schema/create")
click.echo("Creating/updating materialised views...")
run_sql(dc, "extent_views/create")
click.echo("Setting ownership of materialised views...")
run_sql(dc, "extent_views/grants/refresh_owner")

# Permission management method
def grant_perms(self, dc: Datacube, role: str, read_only: bool = False):
if read_only:
run_sql(dc, "ows_schema/grants/read_only", role=role)
run_sql(dc, "extent_views/grants/read_only", role=role)
else:
run_sql(dc, "ows_schema/grants/read_write", role=role)
run_sql(dc, "extent_views/grants/write_refresh", role=role)

# Spatiotemporal index update method (e.g. refresh materialised views)
def update_geotemporal_index(self, dc: Datacube):
run_sql(dc, "extent_views/refresh")

def create_range_entry(self, layer: OWSNamedLayer, cache: dict[LayerSignature, list[str]]) -> None:
create_range_entry_impl(layer, cache)

def get_ranges(self, layer: OWSNamedLayer) -> LayerExtent | None:
return get_ranges_impl(layer)

def ds_search(self,
layer: OWSNamedLayer,
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> Iterable[Dataset]:
return cast(Iterable[Dataset], mv_search(layer.dc.index, MVSelectOpts.DATASETS,
times=times, geom=geom, products=products))

def dsid_search(self,
layer: OWSNamedLayer,
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> Iterable[UUID]:
return cast(Iterable[UUID], mv_search(layer.dc.index, MVSelectOpts.IDS,
times=times, geom=geom, products=products))

def count(self,
layer: OWSNamedLayer,
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None
) -> int:
return cast(int, mv_search(layer.dc.index, MVSelectOpts.COUNT,
times=times, geom=geom, products=products))

def extent(self,
layer: OWSNamedLayer,
times: Iterable[TimeSearchTerm] | None = None,
geom: Geometry | None = None,
products: Iterable[Product] | None = None,
crs: CRS | None = None
) -> Geometry | None:
extent = cast(Geometry | None, mv_search(layer.dc.index, MVSelectOpts.EXTENT,
times=times, geom=geom, products=products))
if extent is None or crs is None or crs == extent.crs:
return extent
else:
return extent.to_crs(crs)


pgdriverlock = Lock()


class OWSPostgresIndexDriver(OWSAbstractIndexDriver):
_driver = None
@classmethod
def ows_index_class(cls) -> Type[OWSAbstractIndex]:
return OWSPostgresIndex

@classmethod
def ows_index(cls) -> OWSAbstractIndex:
with pgdriverlock:
if cls._driver is None:
cls._driver = OWSPostgresIndex()
return cls._driver


def ows_index_driver_init():
return OWSPostgresIndexDriver()
Loading

0 comments on commit f827516

Please sign in to comment.