-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: Implement transforms to/from Pub/Sub messages and Pub/Sub Lite messages. * feat: Implement Pub/Sub Publisher * fix: Re-delete b64_utils after merge. Co-authored-by: Daniel Collins <dpcollins@google.com>
- Loading branch information
1 parent
903070d
commit 58fda6f
Showing
6 changed files
with
151 additions
and
0 deletions.
There are no files selected for viewing
26 changes: 26 additions & 0 deletions
26
google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from typing import Mapping | ||
|
||
from google.pubsub_v1 import PubsubMessage | ||
|
||
from google.cloud.pubsublite.cloudpubsub.message_transforms import from_cps_publish_message | ||
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher | ||
from google.cloud.pubsublite.internal.wire.publisher import Publisher | ||
|
||
|
||
class AsyncPublisherImpl(AsyncPublisher): | ||
_publisher: Publisher | ||
|
||
def __init__(self, publisher: Publisher): | ||
self._publisher = publisher | ||
|
||
async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: | ||
cps_message = PubsubMessage(data=data, ordering_key=ordering_key, attributes=attrs) | ||
psl_message = from_cps_publish_message(cps_message) | ||
return (await self._publisher.publish(psl_message)).encode() | ||
|
||
def __aenter__(self): | ||
self._publisher.__aenter__() | ||
return self | ||
|
||
def __aexit__(self, exc_type, exc_value, traceback): | ||
self._publisher.__aexit__(exc_type, exc_value, traceback) |
23 changes: 23 additions & 0 deletions
23
google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe | ||
from concurrent.futures import Future | ||
from threading import Thread | ||
from typing import ContextManager | ||
|
||
|
||
class ManagedEventLoop(ContextManager): | ||
_loop: AbstractEventLoop | ||
_thread: Thread | ||
|
||
def __init__(self): | ||
self._loop = new_event_loop() | ||
self._thread = Thread(target=lambda: self._loop.run_forever()) | ||
|
||
def __enter__(self): | ||
self._thread.start() | ||
|
||
def __exit__(self, __exc_type, __exc_value, __traceback): | ||
self._loop.call_soon_threadsafe(self._loop.stop) | ||
self._thread.join() | ||
|
||
def submit(self, coro) -> Future: | ||
return run_coroutine_threadsafe(coro, self._loop) |
26 changes: 26 additions & 0 deletions
26
google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from concurrent.futures import Future | ||
from typing import Mapping | ||
|
||
from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ManagedEventLoop | ||
from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher | ||
|
||
|
||
class PublisherImpl(Publisher): | ||
_managed_loop: ManagedEventLoop | ||
_underlying: AsyncPublisher | ||
|
||
def __init__(self, underlying: AsyncPublisher): | ||
self._managed_loop = ManagedEventLoop() | ||
self._underlying = underlying | ||
|
||
def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'Future[str]': | ||
return self._managed_loop.submit(self._underlying.publish(data=data, ordering_key=ordering_key, **attrs)) | ||
|
||
def __enter__(self): | ||
self._managed_loop.__enter__() | ||
self._managed_loop.submit(self._underlying.__aenter__()).result() | ||
return self | ||
|
||
def __exit__(self, __exc_type, __exc_value, __traceback): | ||
self._managed_loop.submit(self._underlying.__aexit__(__exc_type, __exc_value, __traceback)).result() | ||
self._managed_loop.__exit__(__exc_type, __exc_value, __traceback) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from abc import abstractmethod | ||
from typing import AsyncContextManager, Mapping, ContextManager | ||
from concurrent import futures | ||
|
||
|
||
class AsyncPublisher(AsyncContextManager): | ||
""" | ||
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an | ||
async context. Any publish failures are permanent. | ||
""" | ||
|
||
@abstractmethod | ||
async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: | ||
""" | ||
Publish a message. | ||
Args: | ||
data: The bytestring payload of the message | ||
ordering_key: The key to enforce ordering on, or "" for no ordering. | ||
**attrs: Additional attributes to send. | ||
Returns: | ||
An ack id, which can be decoded using PublishMetadata.decode. | ||
Raises: | ||
GoogleApiCallError: On a permanent failure. | ||
""" | ||
|
||
|
||
class Publisher(ContextManager): | ||
""" | ||
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. | ||
""" | ||
|
||
@abstractmethod | ||
def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> 'futures.Future[str]': | ||
""" | ||
Publish a message. | ||
Args: | ||
data: The bytestring payload of the message | ||
ordering_key: The key to enforce ordering on, or "" for no ordering. | ||
**attrs: Additional attributes to send. | ||
Returns: | ||
A future completed with an ack id, which can be decoded using PublishMetadata.decode. | ||
Raises: | ||
GoogleApiCallError: On a permanent failure. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from asynctest.mock import MagicMock | ||
import pytest | ||
|
||
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl | ||
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher | ||
|
||
|
||
@pytest.fixture() | ||
def async_publisher(): | ||
publisher = MagicMock(spec=AsyncPublisher) | ||
publisher.__aenter__.return_value = publisher | ||
return publisher | ||
|
||
|
||
@pytest.fixture() | ||
def publisher(async_publisher): | ||
return PublisherImpl(async_publisher) | ||
|
||
|
||
def test_proxies_to_async(async_publisher, publisher: Publisher): | ||
with publisher: | ||
async_publisher.__aenter__.assert_called_once() | ||
publisher.publish(data=b'abc', ordering_key='zyx', xyz='xyz').result() | ||
async_publisher.publish.assert_called_once_with(data=b'abc', ordering_key='zyx', xyz='xyz') | ||
async_publisher.__aexit__.assert_called_once() |