|
1 | 1 | import contextlib
|
| 2 | +import logging |
2 | 3 | from dataclasses import asdict
|
3 | 4 | from datetime import datetime
|
4 | 5 | from typing import (
|
|
23 | 24 | from pytz import utc
|
24 | 25 |
|
25 | 26 | from feast.data_source import DataSource
|
26 |
| -from feast.errors import InvalidEntityType |
| 27 | +from feast.errors import InvalidEntityType, ZeroColumnQueryResult, ZeroRowsQueryResult |
27 | 28 | from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
|
28 | 29 | from feast.infra.offline_stores import offline_utils
|
29 | 30 | from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
|
@@ -276,6 +277,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
|
276 | 277 | with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur:
|
277 | 278 | conn.read_only = True
|
278 | 279 | cur.execute(query)
|
| 280 | + if not cur.description: |
| 281 | + raise ZeroColumnQueryResult(query) |
279 | 282 | fields = [
|
280 | 283 | (c.name, pg_type_code_to_arrow(c.type_code))
|
281 | 284 | for c in cur.description
|
@@ -331,16 +334,19 @@ def _get_entity_df_event_timestamp_range(
|
331 | 334 | entity_df_event_timestamp.max().to_pydatetime(),
|
332 | 335 | )
|
333 | 336 | elif isinstance(entity_df, str):
|
334 |
| - # If the entity_df is a string (SQL query), determine range |
335 |
| - # from table |
| 337 | + # If the entity_df is a string (SQL query), determine range from table |
336 | 338 | with _get_conn(config.offline_store) as conn, conn.cursor() as cur:
|
337 |
| - ( |
338 |
| - cur.execute( |
339 |
| - f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM ({entity_df}) as tmp_alias" |
340 |
| - ), |
341 |
| - ) |
| 339 | + query = f""" |
| 340 | + SELECT |
| 341 | + MIN({entity_df_event_timestamp_col}) AS min, |
| 342 | + MAX({entity_df_event_timestamp_col}) AS max |
| 343 | + FROM ({entity_df}) AS tmp_alias |
| 344 | + """ |
| 345 | + cur.execute(query) |
342 | 346 | res = cur.fetchone()
|
343 |
| - entity_df_event_timestamp_range = (res[0], res[1]) |
| 347 | + if not res: |
| 348 | + raise ZeroRowsQueryResult(query) |
| 349 | + entity_df_event_timestamp_range = (res[0], res[1]) |
344 | 350 | else:
|
345 | 351 | raise InvalidEntityType(type(entity_df))
|
346 | 352 |
|
|
0 commit comments