Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for arbitrary json in conn uri format #15100

Merged
merged 9 commits into from
Apr 14, 2021
Merged
19 changes: 16 additions & 3 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
:type uri: str
"""

EXTRA_KEY = '__extra__'

__tablename__ = "connection"

id = Column(Integer(), primary_key=True)
Expand Down Expand Up @@ -161,7 +163,11 @@ def _parse_from_uri(self, uri: str):
self.password = unquote(uri_parts.password) if uri_parts.password else uri_parts.password
self.port = uri_parts.port
if uri_parts.query:
self.extra = json.dumps(dict(parse_qsl(uri_parts.query, keep_blank_values=True)))
query = dict(parse_qsl(uri_parts.query, keep_blank_values=True))
if self.EXTRA_KEY in query:
self.extra = query[self.EXTRA_KEY]
else:
self.extra = json.dumps(query)

def get_uri(self) -> str:
"""Return connection in URI format"""
Expand Down Expand Up @@ -194,8 +200,15 @@ def get_uri(self) -> str:

uri += host_block

if self.extra_dejson:
uri += f'?{urlencode(self.extra_dejson)}'
if self.extra:
try:
query = urlencode(self.extra_dejson)
except TypeError:
query = None
if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
uri += '?' + query
else:
uri += '?' + urlencode({self.EXTRA_KEY: self.extra})

return uri

Expand Down
68 changes: 52 additions & 16 deletions docs/apache-airflow/howto/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ In general, Airflow's URI format is like so:

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

.. note::

The params ``param1`` and ``param2`` are just examples; you may supply arbitrary urlencoded json-serializable data there.

The above URI would produce a ``Connection`` object equivalent to the following:

.. code-block:: python
Expand All @@ -232,17 +228,6 @@ The above URI would produce a ``Connection`` object equivalent to the following:
extra=json.dumps(dict(param1='val1', param2='val2'))
)

You can verify a URI is parsed correctly like so:

.. code-block:: pycon

>>> from airflow.models.connection import Connection

>>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2')
>>> print(c.login)
my-login
>>> print(c.password)
my-password

.. _generating_connection_uri:

Expand Down Expand Up @@ -289,12 +274,63 @@ Additionally, if you have created a connection, you can use ``airflow connection

.. _manage-connections-connection-types:

Encoding arbitrary json
^^^^^^^^^^^^^^^^^^^^^^^

Some json structures cannot be urlencoded without loss. For such json, ``get_uri``
will store the entire string under the url query param ``__extra__``.

For example:

.. code-block:: pycon

>>> extra_dict = {'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}}
>>> c = Connection(
>>> conn_type='scheme',
>>> host='host/location',
>>> schema='schema',
>>> login='user',
>>> password='password',
>>> port=1234,
>>> extra=json.dumps(extra_dict),
>>> )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'


And we can verify that it returns the same dictionary:

.. code-block:: pycon

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True


But for the most common case of storing only key-value pairs, plain urlencoding is used.

You can verify a URI is parsed correctly like so:

.. code-block:: pycon

>>> from airflow.models.connection import Connection

>>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2')
>>> print(c.login)
my-login
>>> print(c.password)
my-password


Handling of special characters in connection params
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. note::

This process is automated as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
Use the convenience method ``Connection.get_uri`` when generating a connection
as described in section :ref:`Generating a Connection URI <generating_connection_uri>`.
This section for informational purposes only.

Special handling is required for certain characters when building a URI manually.

Expand Down
61 changes: 57 additions & 4 deletions tests/models/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,61 @@ def test_connection_extra_with_encryption_rotate_fernet_key(self):
),
description='with extras',
),
UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?' '__extra__=single+value',
test_conn_attributes=dict(
conn_type='scheme',
host='host/location',
schema='schema',
login='user',
password='password',
port=1234,
extra='single value',
),
description='with extras single value',
),
UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
'__extra__=arbitrary+string+%2A%29%2A%24',
test_conn_attributes=dict(
conn_type='scheme',
host='host/location',
schema='schema',
login='user',
password='password',
port=1234,
extra='arbitrary string *)*$',
),
description='with extra non-json',
),
UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
'__extra__=%5B%22list%22%2C+%22of%22%2C+%22values%22%5D',
test_conn_attributes=dict(
conn_type='scheme',
host='host/location',
schema='schema',
login='user',
password='password',
port=1234,
extra_dejson=['list', 'of', 'values'],
),
description='with extras list',
),
UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?'
'__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D', # noqa: E501 # pylint: disable=C0301
test_conn_attributes=dict(
conn_type='scheme',
host='host/location',
schema='schema',
login='user',
password='password',
port=1234,
extra_dejson={'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}},
),
description='with nested json',
),
UriTestCaseConfig(
test_conn_uri='scheme://user:password@host%2Flocation:1234/schema?extra1=a%20value&extra2=',
test_conn_attributes=dict(
Expand Down Expand Up @@ -351,11 +406,9 @@ def test_connection_get_uri_from_conn(self, test_config: UriTestCaseConfig):
for conn_attr, expected_val in test_config.test_conn_attributes.items():
actual_val = getattr(new_conn, conn_attr)
if expected_val is None:
assert expected_val is None
if isinstance(expected_val, dict):
assert expected_val == actual_val
assert actual_val is None
else:
assert expected_val == actual_val
assert actual_val == expected_val

@parameterized.expand(
[
Expand Down
6 changes: 3 additions & 3 deletions tests/secrets/test_local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ def test_missing_file(self, mock_exists):
{
"conn_a": "mysql://hosta",
"conn_b": ''.join(
"""scheme://Login:None@host:1234/lschema?
extra__google_cloud_platform__keyfile_dict=%7B%27a%27%3A+%27b%27%7D
&extra__google_cloud_platform__keyfile_path=asaa""".split()
"""scheme://Login:None@host:1234/lschema?__extra__=%7B
%22extra__google_cloud_platform__keyfile_dict%22%3A+%7B%22a%22%3A+%22b%22%7D%2C
+%22extra__google_cloud_platform__keyfile_path%22%3A+%22asaa%22%7D""".split()
),
},
),
Expand Down