Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet reader: option to pass INT96 as bytes instead of as Timestamp #7220

Open
mbutrovich opened this issue Feb 28, 2025 · 6 comments · May be fixed by #7250
Open

Parquet reader: option to pass INT96 as bytes instead of as Timestamp #7220

mbutrovich opened this issue Feb 28, 2025 · 6 comments · May be fixed by #7250
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@mbutrovich
Copy link

mbutrovich commented Feb 28, 2025

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

We are adapting DataFusion Comet (Spark accelerator) to use DataFusion's native Parquet scan backed by arrow-rs. Spark defaults to writing timestamps in Parquet as INT96 (a la Hive, Impala, and other systems), which most systems infer as a timestamp despite the Parquet spec having a separate timestamp type. In arrow-rs's case, it converts to a Timestamp(TimeUnit::Nanoseconds, None). The nanosecond-precision renders the data type unable to represent the same range of dates as what Spark wrote to the file originally.

Describe the solution you'd like

An opt-in feature that allows INT96 to pass unmodified bytes for each value, perhaps as FixedSizedBinary(12).

Describe alternatives you've considered

  • An option to choose the precision for inferring INT96 as Timestamps. For example, Spark uses microsecond precision, so going to Timestamp(TimeUnit::Microsecond, None) would support a larger range of dates. However, I do not think it's reasonable to push Spark-specific options into arrow-rs.
  • An option to pass INT96 as a struct of Time64 and Date32 Arrow types, which is essentially what an INT96 timestamp represents, however I take the same issue with the previous point.
  • Bring over existing code from Comet's Parquet reader to arrow-rs which handles some of these quirks, and all of their respective Spark-specific configs. Same issue as above.

Additional context

  • Please see Inconsistent Signedness Of Legacy Parquet Timestamps Written By Spark datafusion#7958 for relevant discussion from 2023.
  • Interpreting INT96 as a timestamp can be tough: it depends on the Spark config, the Spark version, and there still seems to be debate on whether arithmetic during conversion should wrap on overflow or not.
  • DataFusion's SchemaAdapter gives us a lot of control over how to adjust data coming out of its Parquet scan. However, because this "lossy" conversion from INT96 to an Arrow type happens in arrow-rs, it's too late for us to fix it in a custom SchemaAdapter. If we implement this feature, we will be able to handle all of the Spark-specific quirks in a SchemaAdapter.
  • Comet already has a native Parquet scan operator that handles these types of timestamp quirks, but it does not support complex types. In order to support complex types and share code with arrow-rs we want to use DataFusion's Parquet scan instead.
@mbutrovich mbutrovich added the enhancement Any new improvement worthy of a entry in the changelog label Feb 28, 2025
@mbutrovich mbutrovich changed the title Parquet reader: option to pass INT96 as bytes instead of a Timestamp Parquet reader: option to pass INT96 as bytes instead of as Timestamp Feb 28, 2025
@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

An opt-in feature that allows INT96 to pass unmodified bytes for each value, perhaps as FixedSizedBinary(12).

This makes sense to me -- if the reader wants to read a INT96 as FixedSizedBinary(12) that seems quite reasonable to me

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2025

mentioned there is a similar thing in arrow-cpp: https://github.com/apache/arrow/blob/784aa6faf69f5cf135e09976a281dea9ebf58166/cpp/src/parquet/arrow/schema_internal.cc#L205-L206

This looks to just influence what TimeUnit it coerces to, e.g. milliseconds, nanoseconds, etc...

An opt-in feature that allows INT96 to pass unmodified bytes for each value, perhaps as FixedSizedBinary(12).

My 2 cents is that whilst possible, this results in an unfortunate UX. IMO we should support Int96 to the best of our ability, rather than forcing every downstream to reproduce this logic. Whilst it may be somewhat depressing that Spark is STILL writing a type that has been deprecated for almost a decade, it is where we are at and we should support it.

That being said I would suggest we split this issue into two parts:

  • Support influencing the precision used, similar to arrow-cpp
  • Support legacy rebase modes for timestamps before 1900 written by Spark versions before 3.x - see here

I suspect most users only actually care about the first of these - the number of people writing dates pre-1900 is likely small, and the people doing so with a half decade old version of Spark or Hive is likely even smaller, we can likely leave it as an issue for someone to pick up if they have a use-case for it.

@alamb
Copy link
Contributor

alamb commented Mar 8, 2025

This topic came up on the DataFusion call this week. I think @mbutrovich has the usecase for handling this in Spark

While not opposed to adding spark specific rebase mode (or whatever we will call it 🤮 ) I also think adding the ability in general for the parquet reader to pass out uninterpreted bytes (in this case FixedSizeByte(12)) is a good addition as it allows downstream crates an "escape hatch" until / if we implement a more holistic solution in this crate

@mbutrovich
Copy link
Author

mbutrovich commented Mar 10, 2025

That being said I would suggest we split this issue into two parts:

  • Support influencing the precision used, similar to arrow-cpp
  • Support legacy rebase modes for timestamps before 1900 written by Spark versions before 3.x - see here

I'm good with this approach. As I mentioned, I wasn't sure how much Spark-specific logic we wanted to bring down to the Parquet reader level, but I can work with this design. I might ask some followup questions about how to expose options that far into the Parquet reader since most of the API seems to be encoded with Schema. My guess is something in ArrowReaderOptions but I'll need to see how far through the call stack that actually makes it.

@mbutrovich
Copy link
Author

mbutrovich commented Mar 10, 2025

One challenge I had with an implementation that doesn't copy the data twice is that IntoBuffer is responsible for the conversion of Vec<Int96> and doesn't take any args. I didn't see how to pass that we want different logic applied in that method to avoid an intermediate representation (FSB(12)) that could still be converted to other possible target ArrowTypes. I could make Vec<Int96> not use the IntoBuffer method for getting a Buffer from the read, but that adds a new conditional for every type in consume_batch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants