Skip to content

Commit 4fc41d2

Browse files
authored
feat: add InvocableScripts API (#404)
1 parent d9c31ea commit 4fc41d2

22 files changed

+449
-157
lines changed

CHANGELOG.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
### Features
44
1. [#412](https://github.com/influxdata/influxdb-client-python/pull/412): `DeleteApi` uses default value from `InfluxDBClient.org` if an `org` parameter is not specified
5-
2. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.
5+
1. [#405](https://github.com/influxdata/influxdb-client-python/pull/405): Add `InfluxLoggingHandler`. A handler to use the client in native python logging.
6+
1. [#404](https://github.com/influxdata/influxdb-client-python/pull/404): Add `InvocableScriptsApi` to create, update, list, delete and invoke scripts by seamless way
67

78
### CI
89
1. [#411](https://github.com/influxdata/influxdb-client-python/pull/411): Use new Codecov uploader for reporting code coverage
@@ -46,11 +47,10 @@ This release introduces a support for new version of InfluxDB OSS API definition
4647
1. [#408](https://github.com/influxdata/influxdb-client-python/pull/408): Improve error message when the client cannot find organization by name
4748
1. [#407](https://github.com/influxdata/influxdb-client-python/pull/407): Use `pandas.concat()` instead of deprecated `DataFrame.append()` [DataFrame]
4849

49-
5050
## 1.25.0 [2022-01-20]
5151

5252
### Features
53-
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Added callback function for getting profilers output with example and test
53+
1. [#393](https://github.com/influxdata/influxdb-client-python/pull/393): Add callback function for getting profilers output with example and test
5454

5555
### Bug Fixes
5656
1. [#375](https://github.com/influxdata/influxdb-client-python/pull/375): Construct `InfluxDBError` without HTTP response

docs/api.rst

+11
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ TasksApi
6262
.. autoclass:: influxdb_client.domain.Task
6363
:members:
6464

65+
InvocableScriptsApi
66+
"""""""""""""""""""
67+
.. autoclass:: influxdb_client.InvocableScriptsApi
68+
:members:
69+
70+
.. autoclass:: influxdb_client.domain.Script
71+
:members:
72+
73+
.. autoclass:: influxdb_client.domain.ScriptCreateRequest
74+
:members:
75+
6576
DeleteApi
6677
"""""""""
6778
.. autoclass:: influxdb_client.DeleteApi

examples/invocable_scripts.py

+46-14
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
"""
44
import datetime
55

6-
from influxdb_client import InfluxDBClient, InvocableScriptsService, ScriptCreateRequest, ScriptInvocationParams, \
7-
ScriptLanguage
6+
from influxdb_client import InfluxDBClient, ScriptCreateRequest, ScriptLanguage, \
7+
ScriptUpdateRequest, Point
8+
from influxdb_client.client.write_api import SYNCHRONOUS
89

910
"""
1011
Define credentials
@@ -14,48 +15,79 @@
1415
bucket_name = '...'
1516
org_name = '...'
1617

17-
with InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token, org=org_name, debug=False, timeout=20_000) as client:
18+
with InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token, org=org_name, debug=False,
19+
timeout=20_000) as client:
1820
uniqueId = str(datetime.datetime.now())
21+
22+
"""
23+
Prepare data
24+
"""
25+
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
26+
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
27+
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket_name, record=[_point1, _point2])
28+
1929
"""
2030
Find Organization ID by Organization API.
2131
"""
2232
org = client.organizations_api().find_organizations(org=org_name)[0]
2333

24-
scripts_service = InvocableScriptsService(api_client=client.api_client)
34+
scripts_api = client.invocable_scripts_api()
2535

2636
"""
2737
Create Invocable Script
2838
"""
2939
print(f"------- Create -------\n")
30-
create_request = ScriptCreateRequest(name=f"my_scrupt_{uniqueId}",
40+
create_request = ScriptCreateRequest(name=f"my_script_{uniqueId}",
3141
description="my first try",
3242
language=ScriptLanguage.FLUX,
33-
org_id=org.id,
3443
script=f"from(bucket: params.bucket_name) |> range(start: -30d) |> limit(n:2)")
3544

36-
created_script = scripts_service.post_scripts(script_create_request=create_request)
45+
created_script = scripts_api.create_script(create_request=create_request)
46+
print(created_script)
47+
48+
"""
49+
Update Invocable Script
50+
"""
51+
print(f"------- Update -------\n")
52+
update_request = ScriptUpdateRequest(description="my updated description")
53+
created_script = scripts_api.update_script(script_id=created_script.id, update_request=update_request)
3754
print(created_script)
3855

3956
"""
4057
Invoke a script
4158
"""
42-
print(f"\n------- Invoke -------\n")
43-
response = scripts_service.post_scripts_id_invoke(script_id=created_script.id,
44-
script_invocation_params=ScriptInvocationParams(
45-
params={"bucket_name": bucket_name}))
46-
print(response)
59+
# FluxRecords
60+
print(f"\n------- Invoke to FluxRecords -------\n")
61+
tables = scripts_api.invoke_scripts(script_id=created_script.id, params={"bucket_name": bucket_name})
62+
for table in tables:
63+
for record in table.records:
64+
print(f'FluxRecord {record}')
65+
# Pandas DataFrame
66+
print(f"\n------- Invoke to PandasData Frame -------\n")
67+
data_frame = scripts_api.invoke_scripts_data_frame(script_id=created_script.id, params={"bucket_name": bucket_name})
68+
print(data_frame.to_string())
69+
# CSV
70+
print(f"\n------- Invoke to CSV-------\n")
71+
csv_lines = scripts_api.invoke_scripts_csv(script_id=created_script.id, params={"bucket_name": bucket_name})
72+
for csv_line in csv_lines:
73+
if not len(csv_line) == 0:
74+
print(f'CSV row: {csv_line}')
75+
# RAW
76+
print(f"\n------- Invoke to Raw-------\n")
77+
raw = scripts_api.invoke_scripts_raw(script_id=created_script.id, params={"bucket_name": bucket_name})
78+
print(f'RAW output:\n {raw}')
4779

4880
"""
4981
List scripts
5082
"""
5183
print(f"\n------- List -------\n")
52-
scripts = scripts_service.get_scripts().scripts
84+
scripts = scripts_api.find_scripts()
5385
print("\n".join([f" ---\n ID: {it.id}\n Name: {it.name}\n Description: {it.description}" for it in scripts]))
5486
print("---")
5587

5688
"""
5789
Delete previously created Script
5890
"""
5991
print(f"------- Delete -------\n")
60-
scripts_service.delete_scripts_id(script_id=created_script.id)
92+
scripts_api.delete_script(script_id=created_script.id)
6193
print(f" Successfully deleted script: '{created_script.name}'")

examples/query.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from influxdb_client import InfluxDBClient, Point, Dialect
44
from influxdb_client.client.write_api import SYNCHRONOUS
55

6-
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True) as client:
6+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
77

88
write_api = client.write_api(write_options=SYNCHRONOUS)
99

influxdb_client/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@
376376
from influxdb_client.client.authorizations_api import AuthorizationsApi
377377
from influxdb_client.client.bucket_api import BucketsApi
378378
from influxdb_client.client.delete_api import DeleteApi
379+
from influxdb_client.client.invocable_scripts_api import InvocableScriptsApi
379380
from influxdb_client.client.labels_api import LabelsApi
380381
from influxdb_client.client.organizations_api import OrganizationsApi
381382
from influxdb_client.client.query_api import QueryApi

influxdb_client/client/flux_csv_parser.py

+19-4
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,32 @@ class FluxCsvParserException(Exception):
3434

3535

3636
class FluxSerializationMode(Enum):
37-
"""The type how we wan't to serialize data."""
37+
"""The type how we want to serialize data."""
3838

3939
tables = 1
4040
stream = 2
4141
dataFrame = 3
4242

4343

44+
class FluxResponseMetadataMode(Enum):
45+
"""The configuration for expected amount of metadata respons from InfluxDB."""
46+
47+
full = 1
48+
# useful for Invocable scripts
49+
only_names = 2
50+
51+
4452
class FluxCsvParser(object):
4553
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""
4654

4755
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
48-
data_frame_index: List[str] = None, query_options=None) -> None:
56+
data_frame_index: List[str] = None, query_options=None,
57+
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> None:
4958
"""Initialize defaults."""
5059
self._response = response
5160
self.tables = []
5261
self._serialization_mode = serialization_mode
62+
self._response_metadata_mode = response_metadata_mode
5363
self._data_frame_index = data_frame_index
5464
self._data_frame_values = []
5565
self._profilers = query_options.profilers if query_options is not None else None
@@ -97,8 +107,9 @@ def _parse_flux_response(self):
97107
raise FluxQueryException(error, reference_value)
98108

99109
token = csv[0]
100-
# start new table
101-
if token in ANNOTATIONS and not start_new_table:
110+
# start new table
111+
if (token in ANNOTATIONS and not start_new_table) or \
112+
(self._response_metadata_mode is FluxResponseMetadataMode.only_names and not table):
102113

103114
# Return already parsed DataFrame
104115
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
@@ -127,6 +138,10 @@ def _parse_flux_response(self):
127138
else:
128139
# parse column names
129140
if start_new_table:
141+
# Invocable scripts doesn't supports dialect => all columns are string
142+
if not table.columns and self._response_metadata_mode is FluxResponseMetadataMode.only_names:
143+
self.add_data_types(table, list(map(lambda column: 'string', csv)))
144+
groups = list(map(lambda column: 'false', csv))
130145
self.add_groups(table, groups)
131146
self.add_column_names_and_tags(table, csv)
132147
start_new_table = False

influxdb_client/client/influxdb_client.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import base64
99
import warnings
1010

11-
from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService, PingService
11+
from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService, PingService, \
12+
InvocableScriptsApi
1213
from influxdb_client.client.authorizations_api import AuthorizationsApi
1314
from influxdb_client.client.bucket_api import BucketsApi
1415
from influxdb_client.client.delete_api import DeleteApi
@@ -337,13 +338,21 @@ def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
337338

338339
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
339340
"""
340-
Create a Query API instance.
341+
Create an Query API instance.
341342
342343
:param query_options: optional query api configuration
343344
:return: Query api instance
344345
"""
345346
return QueryApi(self, query_options)
346347

348+
def invocable_scripts_api(self) -> InvocableScriptsApi:
349+
"""
350+
Create an InvocableScripts API instance.
351+
352+
:return: InvocableScripts API instance
353+
"""
354+
return InvocableScriptsApi(self)
355+
347356
def close(self):
348357
"""Shutdown the client."""
349358
self.__del__()

0 commit comments

Comments
 (0)