Skip to content

Commit 21de7b8

Browse files
Online & Utils: Use _get_conn_async
1 parent 9ecb755 commit 21de7b8

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

sdk/python/feast/infra/online_stores/contrib/postgres.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from feast.infra.online_stores.online_store import OnlineStore
2929
from feast.infra.utils.postgres.connection_utils import (
3030
_get_conn,
31+
_get_conn_async,
3132
_get_connection_pool,
3233
_get_connection_pool_async,
3334
)
@@ -57,6 +58,8 @@ class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
5758
class PostgreSQLOnlineStore(OnlineStore):
5859
_conn: Optional[Connection] = None
5960
_conn_pool: Optional[ConnectionPool] = None
61+
62+
_conn_async: Optional[AsyncConnection] = None
6063
_conn_pool_async: Optional[AsyncConnectionPool] = None
6164

6265
@contextlib.contextmanager
@@ -79,14 +82,19 @@ def _get_conn(self, config: RepoConfig) -> Generator[Connection, Any, Any]:
7982
async def _get_conn_async(
8083
self, config: RepoConfig
8184
) -> AsyncGenerator[AsyncConnection, Any]:
82-
if not self._conn_pool_async:
83-
self._conn_pool_async = await _get_connection_pool_async(
84-
config.online_store
85-
)
86-
await self._conn_pool_async.open()
87-
connection = await self._conn_pool_async.getconn()
88-
yield connection
89-
await self._conn_pool_async.putconn(connection)
85+
if config.online_store.conn_type == ConnectionType.pool:
86+
if not self._conn_pool_async:
87+
self._conn_pool_async = await _get_connection_pool_async(
88+
config.online_store
89+
)
90+
await self._conn_pool_async.open()
91+
connection = await self._conn_pool_async.getconn()
92+
yield connection
93+
await self._conn_pool_async.putconn(connection)
94+
else:
95+
if not self._conn_async:
96+
self._conn_async = await _get_conn_async(config.online_store)
97+
yield self._conn_async
9098

9199
def online_write_batch(
92100
self,

sdk/python/feast/infra/utils/postgres/connection_utils.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pandas as pd
55
import psycopg
66
import pyarrow as pa
7-
from psycopg.connection import Connection
7+
from psycopg import AsyncConnection, Connection
88
from psycopg_pool import AsyncConnectionPool, ConnectionPool
99

1010
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
@@ -21,6 +21,16 @@ def _get_conn(config: PostgreSQLConfig) -> Connection:
2121
return conn
2222

2323

24+
async def _get_conn_async(config: PostgreSQLConfig) -> AsyncConnection:
25+
"""Get a psycopg `AsyncConnection`."""
26+
conn = await psycopg.AsyncConnection.connect(
27+
conninfo=_get_conninfo(config),
28+
keepalives_idle=config.keepalives_idle,
29+
**_get_conn_kwargs(config),
30+
)
31+
return conn
32+
33+
2434
def _get_connection_pool(config: PostgreSQLConfig) -> ConnectionPool:
2535
"""Get a psycopg `ConnectionPool`."""
2636
return ConnectionPool(

0 commit comments

Comments
 (0)