Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'function' object has no attribute 'openapi_types' #441

Closed
veonua opened this issue May 18, 2022 · 23 comments
Closed

'function' object has no attribute 'openapi_types' #441

veonua opened this issue May 18, 2022 · 23 comments
Labels
wontfix This will not be worked on
Milestone

Comments

@veonua
Copy link

veonua commented May 18, 2022

  4 point = writer._row_to_line_protocol(df.iloc[0])

----> 5 writer.write_api.write(bucket=bucket, record=point)

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/client/write_api.py in write(self, bucket, org, record, write_precision, **kwargs)
359 return self._post_write(_async_req, bucket, org, final_string, payload[0])
360
--> 361 results = list(map(write_payload, payloads.items()))
362 if not _async_req:
363 return None

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/client/write_api.py in write_payload(payload)
357 def write_payload(payload):
358 final_string = b'\n'.join(payload[1])
--> 359 return self._post_write(_async_req, bucket, org, final_string, payload[0])
360
361 results = list(map(write_payload, payloads.items()))

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/client/write_api.py in _post_write(self, _async_req, bucket, org, body, precision, **kwargs)
468 def _post_write(self, _async_req, bucket, org, body, precision, **kwargs):
469
--> 470 return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
471 async_req=_async_req, content_encoding="identity",
472 content_type="text/plain; charset=utf-8",

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/service/write_service.py in post_write(self, org, bucket, body, **kwargs)
60 return self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
61 else:
---> 62 (data) = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
63 return data
64

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/service/write_service.py in post_write_with_http_info(self, org, bucket, body, **kwargs)
90 self._post_write_prepare(org, bucket, body, **kwargs)
91
---> 92 return self.api_client.call_api(
93 '/api/v2/write', 'POST',
94 path_params,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, async_req, _return_http_data_only, collection_formats, _preload_content, _request_timeout, urlopen_kw)
339 """
340 if not async_req:
--> 341 return self.__call_api(resource_path, method,
342 path_params, query_params, header_params,
343 body, post_params, files,

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, urlopen_kw)
144 # query parameters
145 if query_params:
--> 146 query_params = self.sanitize_for_serialization(query_params)
147 query_params = self.parameters_to_tuples(query_params,
148 collection_formats)

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in sanitize_for_serialization(self, obj)
210 return obj
211 elif isinstance(obj, list):
--> 212 return [self.sanitize_for_serialization(sub_obj)
213 for sub_obj in obj]
214 elif isinstance(obj, tuple):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in (.0)
210 return obj
211 elif isinstance(obj, list):
--> 212 return [self.sanitize_for_serialization(sub_obj)
213 for sub_obj in obj]
214 elif isinstance(obj, tuple):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in sanitize_for_serialization(self, obj)
213 for sub_obj in obj]
214 elif isinstance(obj, tuple):
--> 215 return tuple(self.sanitize_for_serialization(sub_obj)
216 for sub_obj in obj)
217 elif isinstance(obj, (datetime.datetime, datetime.date)):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in (.0)
213 for sub_obj in obj]
214 elif isinstance(obj, tuple):
--> 215 return tuple(self.sanitize_for_serialization(sub_obj)
216 for sub_obj in obj)
217 elif isinstance(obj, (datetime.datetime, datetime.date)):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-c7e9b99c-6dca-42e7-8dc4-24c481895c90/lib/python3.8/site-packages/influxdb_client/_sync/api_client.py in sanitize_for_serialization(self, obj)
227 # model definition for request.
228 obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
--> 229 for attr, _ in six.iteritems(obj.openapi_types)
230 if getattr(obj, attr) is not None}
231

@bednar
Copy link
Contributor

bednar commented May 19, 2022

Hi @veonua,

thanks for using our client.

Can you share a little bit more about your code? How looks like an initialisation of the client?

Regards

@bednar bednar added the question Further information is requested label May 19, 2022
@veonua
Copy link
Author

veonua commented May 19, 2022

most likely it’s the way how influx db works with Pandas..Timestamp .
when I use to_datetime() the issue was solved.
however you never know in python without type validation

@bednar
Copy link
Contributor

bednar commented May 19, 2022

@veonua is there a way how to reproduce this error?

We are currently works to improve Pandas.Timestamp handling in #438, #440.

@veonua
Copy link
Author

veonua commented May 19, 2022

my pipeline is
Event Hub -> Databricks (pyspark) -> InfluxDB

I can share the notebook, but I don’t know how do you reproduce other parts

@veonua
Copy link
Author

veonua commented May 19, 2022

likely it was some changes in
1.27.0 [2022-03-18]

before this day the pipeline worked fine

@bednar
Copy link
Contributor

bednar commented May 21, 2022

I can share the notebook, but I don’t know how do you reproduce other parts

Thanks, maybe it can be useful to find the cause.

It looks like problem with initialisation of parameters for the client. How do you setup your org and bucket?

@veonua
Copy link
Author

veonua commented May 21, 2022 via email

@bednar
Copy link
Contributor

bednar commented May 23, 2022

How do you specify the org and bucket settings for the client?

@veonua
Copy link
Author

veonua commented May 23, 2022

import datetime
import pyspark
from pyspark.sql import SparkSession

from datetime import datetime
import os
from influxdb_client import *
from influxdb_client.client.write_api import SYNCHRONOUS
import urllib3

class InfluxDBWriter:
    def __init__(self):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)

    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True

    def process(self, row):
        number = 5
        while number > 0:
            try:
                number -= 1
                self.write_api.write(bucket=bucket, record=self._row_to_line_protocol(row))
                break
            except urllib3.exceptions.ReadTimeoutError:
                pass

    def close(self, error):
        self.write_api.__del__()
        self.client.__del__()
        print("Closed with error: %s" % str(error))
        
    def _row_to_line_protocol(self, df: pyspark.sql.Row):
        val = df['Value']
        try:
            val = float(val)
        except ValueError:
            pass
        
        date = df['Date']
        if hasattr(date, "to_pydatetime"):
            date = date.to_pydatetime()
        
        return Point( df['Attribute'] ) \
            .time(date, WritePrecision.MS ) \
            .tag("DeviceName", df['DeviceName'] ) \
            .tag("DeviceId",    df['DeviceId'] ) \
            .tag("GroupId",     df['GroupId'] ) \
            .tag("LocationId",  df['LocationId'] ) \
            .tag("LocationName",  df['LocationName'] ) \
            .tag("Mode",        df['Mode'] ) \
            .tag("Tz",          df['Tz'] ) \
            .tag("HubId",       df['HubId'] ) \
            .tag("Source",      df['Source'] ) \
            .tag("Unit",        df['Unit'] ) \
            .field("Value",     val ) \
            #.field("LocalTime",  df['LocalTime'] ) \
            
event_stream.writeStream.foreach(InfluxDBWriter()).start()

@bednar
Copy link
Contributor

bednar commented May 24, 2022

Can you check the type and value of the bucket in the process function?

@veonua
Copy link
Author

veonua commented May 24, 2022

bucket = "iot"

@bednar
Copy link
Contributor

bednar commented May 24, 2022

The 'function' object has no attribute 'openapi_types' is caused by transforming unsupported object to URL query parameter. I am able to simulate it if the org or bucket parameter is something different from str...

@veonua
Copy link
Author

veonua commented May 24, 2022

that is unfortunate that you have the same error message with another parameters , but maybe a python dev can take a look at it

@bednar
Copy link
Contributor

bednar commented May 24, 2022

import datetime
import pyspark
from pyspark.sql import SparkSession

from datetime import datetime
import os
from influxdb_client import *
from influxdb_client.client.write_api import SYNCHRONOUS
import urllib3

class InfluxDBWriter:
def init(self):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)

def open(self, partition_id, epoch_id):
    print("Opened %d, %d" % (partition_id, epoch_id))
    return True

def process(self, row):
    number = 5
    while number > 0:
        try:
            number -= 1
            self.write_api.write(bucket=bucket, record=self._row_to_line_protocol(row))
            break
        except urllib3.exceptions.ReadTimeoutError:
            pass

def close(self, error):
    self.write_api.__del__()
    self.client.__del__()
    print("Closed with error: %s" % str(error))
    
def _row_to_line_protocol(self, df: pyspark.sql.Row):
    val = df['Value']
    try:
        val = float(val)
    except ValueError:
        pass
    
    date = df['Date']
    if hasattr(date, "to_pydatetime"):
        date = date.to_pydatetime()
    
    return Point( df['Attribute'] ) \
        .time(date, WritePrecision.MS ) \
        .tag("DeviceName", df['DeviceName'] ) \
        .tag("DeviceId",    df['DeviceId'] ) \
        .tag("GroupId",     df['GroupId'] ) \
        .tag("LocationId",  df['LocationId'] ) \
        .tag("LocationName",  df['LocationName'] ) \
        .tag("Mode",        df['Mode'] ) \
        .tag("Tz",          df['Tz'] ) \
        .tag("HubId",       df['HubId'] ) \
        .tag("Source",      df['Source'] ) \
        .tag("Unit",        df['Unit'] ) \
        .field("Value",     val ) \
        #.field("LocalTime",  df['LocalTime'] ) \

event_stream.writeStream.foreach(InfluxDBWriter()).start()

Where is a writer._row_to_line_protocol(df.iloc[0]) from your first post? Is this the code which produce your error?

@veonua
Copy link
Author

veonua commented May 24, 2022

yes

        date = df['Date']
        if hasattr(date, "to_pydatetime"):
            date = date.to_pydatetime()

without this code the InfluxDB python library fails

@bednar
Copy link
Contributor

bednar commented May 25, 2022

yes

        date = df['Date']
        if hasattr(date, "to_pydatetime"):
            date = date.to_pydatetime()

How looks like the error without this code? Is it 'function' object has no attribute 'openapi_types'?

@veonua
Copy link
Author

veonua commented May 25, 2022

exactly

@bednar
Copy link
Contributor

bednar commented May 26, 2022

What is the type and value of date before using date = date.to_pydatetime()?

@veonua
Copy link
Author

veonua commented May 26, 2022

Uploading DCE9B28D-1926-448A-BD36-955F9734F6A6.jpeg…

@veonua
Copy link
Author

veonua commented May 26, 2022

we are going circles, please ask Python dev to look into this issue

@bednar
Copy link
Contributor

bednar commented May 26, 2022

we are going circles, please ask Python dev to look into this issue

I am trying to gets information how to simulate your error to be able to fix it or provide other suitable solution. Are you able to provide a test-case to simulate its or provide a data for your InfluxDBWriter to reproduce your problem?

@bednar
Copy link
Contributor

bednar commented Jun 15, 2022

This issue has been closed because it has not had recent activity. Please reopen if this issue is still important to you and you have additionally information.

@bednar bednar closed this as completed Jun 15, 2022
@bednar bednar added wontfix This will not be worked on and removed question Further information is requested labels Jun 15, 2022
@bednar bednar added this to the 1.30.0 milestone Jun 15, 2022
@veonua
Copy link
Author

veonua commented Jun 15, 2022

good luck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants