Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data is replicated when overwriting data concurrently #1254

Closed
sfilimonov-exos opened this issue Mar 29, 2023 · 5 comments · Fixed by #1275
Closed

Data is replicated when overwriting data concurrently #1254

sfilimonov-exos opened this issue Mar 29, 2023 · 5 comments · Fixed by #1275
Assignees
Labels
bug Something isn't working

Comments

@sfilimonov-exos
Copy link

Environment

Delta-rs version: 0.8.1

Environment:

  • Cloud provider: s3 or locally
  • OS: Ubuntu 20.04

Bug

What happened: Data is replicated in the Delta Table when writing to the table with write_deltalake(..., mode="overwrite")

What you expected to happen: Data is not replicated, there are no extra duplications in the data.

How to reproduce it:

import threading
from shutil import rmtree

import pandas as pd
from deltalake import DeltaTable, write_deltalake

path = "demo"

rmtree("demo", ignore_errors=True)

df = pd.DataFrame(data=[('one', 1), ('two', 2)], columns=["name", "val"])

write_deltalake(path, data=df, mode="append")

assert len(DeltaTable(path).to_pandas()) == 2, len(DeltaTable(path).to_pandas())


def overwrite(delta_table: DeltaTable):
    table = delta_table.to_pyarrow_table()  # appears like redundant file is being read here
    write_deltalake(delta_table, table, mode="overwrite")


def comp():
    for _ in range(5):
        overwrite(DeltaTable(path))


for i in range(5):
    overwrite(DeltaTable(path))

assert len(DeltaTable(path).to_pandas()) == 2, len(DeltaTable(path).to_pandas())  # Sequential - ok

t1 = threading.Thread(target=comp)
t2 = threading.Thread(target=comp)
t1.start()
t2.start()
t1.join()
t2.join()

res = DeltaTable(path).to_pandas()

assert len(res) == 4, len(res)  # Concurrent - fails

More details:

The same happens when writing to S3 having enabled locking through dynamo (AWS_S3_LOCKING_PROVIDER = "dynamodb" in envs).

@sfilimonov-exos sfilimonov-exos added the bug Something isn't working label Mar 29, 2023
@wjones127
Copy link
Collaborator

Thanks for reporting this. We should indeed test this better. It's possible that #632 will fix this. I will try that out soon.

@roeap
Copy link
Collaborator

roeap commented Mar 29, 2023

On first glance I would think @wjones127 is correct. Since we are not doing conflict resolution we can delete the same file twice. In the above scenario I would assume that the the table is at version 3, contains a duplicate remove action for the initial file, as well as both new added files. @sfilimonov-exos, can you confirm is this is what we are seeing, or share the delta log?

@sfilimonov-exos
Copy link
Author

sfilimonov-exos commented Mar 29, 2023

Removed sequential overwrites from the code to reproduce to shrink the log and got:

00000000000000000000.json

{"commitInfo":{"delta-rs":"0.8.0","timestamp":1680119358218}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":1}}
{"metaData":{"id":"29a1faaf-b1b1-4cfc-a7cd-0b49dd7dcf4b","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1680119358218,"configuration":{}}}
{"add":{"path":"0-f92037bf-f84e-4c60-a4bb-c76bd0806bab-0.parquet","size":2202,"partitionValues":{},"modificationTime":1680119358217,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"name\": \"one\", \"val\": 1}, \"maxValues\": {\"name\": \"two\", \"val\": 2}, \"nullCount\": {\"name\": 0, \"val\": 0}}","tags":null}}

00000000000000000001.json

{"add":{"path":"1-152c41b0-4b69-47ef-8c70-1c717c857014-0.parquet","size":868,"partitionValues":{},"modificationTime":1680119358238,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"name\": \"one\", \"val\": 1}, \"maxValues\": {\"name\": \"two\", \"val\": 2}, \"nullCount\": {\"name\": 0, \"val\": 0}}","tags":null}}
{"remove":{"path":"0-f92037bf-f84e-4c60-a4bb-c76bd0806bab-0.parquet","deletionTimestamp":1680119358238,"dataChange":true,"extendedFileMetadata":false,"partitionValues":{},"size":2202,"tags":null}}
{"commitInfo":{"timestamp":1680119358239,"operation":"WRITE","operationParameters":{"partitionBy":"[]","mode":"Overwrite"},"clientVersion":"delta-rs.0.8.0"}}

00000000000000000002.json

{"add":{"path":"1-0478742a-deda-4d12-837a-75431596fd56-0.parquet","size":868,"partitionValues":{},"modificationTime":1680119358240,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"name\": \"one\", \"val\": 1}, \"maxValues\": {\"name\": \"two\", \"val\": 2}, \"nullCount\": {\"name\": 0, \"val\": 0}}","tags":null}}
{"remove":{"path":"0-f92037bf-f84e-4c60-a4bb-c76bd0806bab-0.parquet","deletionTimestamp":1680119358241,"dataChange":true,"extendedFileMetadata":false,"partitionValues":{},"size":2202,"tags":null}}
{"commitInfo":{"timestamp":1680119358241,"operation":"WRITE","operationParameters":{"partitionBy":"[]","mode":"Overwrite"},"clientVersion":"delta-rs.0.8.0"}}

00000000000000000003.json

{"add":{"path":"2-cf317a1d-0465-484b-ba4b-8abcc4745764-0.parquet","size":868,"partitionValues":{},"modificationTime":1680119358248,"dataChange":true,"stats":"{\"numRecords\": 2, \"minValues\": {\"name\": \"one\", \"val\": 1}, \"maxValues\": {\"name\": \"two\", \"val\": 2}, \"nullCount\": {\"name\": 0, \"val\": 0}}","tags":null}}
{"remove":{"path":"1-152c41b0-4b69-47ef-8c70-1c717c857014-0.parquet","deletionTimestamp":1680119358249,"dataChange":true,"extendedFileMetadata":false,"partitionValues":{},"size":868,"tags":null}}
{"commitInfo":{"timestamp":1680119358249,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"clientVersion":"delta-rs.0.8.0"}}

And at this point it already has duplicated data and DeltaTable(path).files() returns ['1-0478742a-deda-4d12-837a-75431596fd56-0.parquet', '2-cf317a1d-0465-484b-ba4b-8abcc4745764-0.parquet']

@roeap
Copy link
Collaborator

roeap commented Mar 29, 2023

Yeah, I think will was correct, this is a commit issue. op 1 and 2 read version 0 of the table. You can see that both contain identical remove actions. op 3 must have read version 1 of the table, where 1 commit already happed, this it only removes the file from 1, but not the file from 2.

So we are left with 2 active files, from op 2 and 3. As will pointed out, we are about to merge a PR that will enable us to support the optimistic commit protocol and resolve situations like this.

@wjones127
Copy link
Collaborator

#632 will not fix this for Python, but in a follow-up PR we will get the Python bindings using the new conflict checker.

@wjones127 wjones127 self-assigned this Apr 11, 2023
wjones127 added a commit that referenced this issue Apr 13, 2023
# Description

Switches Python implementation over to the new transaction API.

# Related Issue(s)

* Closes #1254

# Documentation

<!---
Share links to useful documentation
--->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants