Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EdrCache): add SQL implementation of EDR cache store #405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/edr-cache-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@ dependencies {
implementation(libs.edc.util)

implementation(project(":spi:edr-cache-spi"))

testImplementation(testFixtures(project(":spi:edr-cache-spi")))

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.BaseCriterionToPredicateConverter;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;

public class EdrCacheEntryPredicateConverter extends BaseCriterionToPredicateConverter<EndpointDataReferenceEntry> {

@Override
protected Object property(String key, Object object) {
if (object instanceof EndpointDataReferenceEntry) {
var entry = (EndpointDataReferenceEntry) object;
switch (key) {
case "assetId":
return entry.getAssetId();
case "agreementId":
return entry.getAgreementId();
default:
return null;
}
}
throw new IllegalArgumentException("Can only handle objects of type " + EndpointDataReferenceEntry.class.getSimpleName() + " but received an " + object.getClass().getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.util.concurrency.LockManager;
Expand All @@ -28,6 +30,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
Expand All @@ -40,17 +44,16 @@
public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache {
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAgreementId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
private final Map<String, EndpointDataReference> edrsByTransferProcessId;

public InMemoryEndpointDataReferenceCache() {
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByAgreementId = new HashMap<>();
entriesByEdrId = new HashMap<>();
edrsByTransferProcessId = new HashMap<>();
}
Expand All @@ -71,14 +74,8 @@ public List<EndpointDataReference> referencesForAsset(String assetId) {
}

@Override
@NotNull
public List<EndpointDataReferenceEntry> entriesForAsset(String assetId) {
return lockManager.readLock(() -> entriesByAssetId.getOrDefault(assetId, emptyList()));
}

@Override
public @NotNull List<EndpointDataReferenceEntry> entriesForAgreement(String agreementId) {
return lockManager.readLock(() -> entriesByAgreementId.getOrDefault(agreementId, emptyList()));
public Stream<EndpointDataReferenceEntry> queryForEntries(QuerySpec spec) {
return filterBy(spec.getFilterExpression());
}

@Override
Expand All @@ -88,9 +85,6 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.add(entry);

var agreementList = entriesByAgreementId.computeIfAbsent(entry.getAgreementId(), k -> new ArrayList<>());
agreementList.add(entry);

edrsByTransferProcessId.put(entry.getTransferProcessId(), edr);
return null;
});
Expand All @@ -103,13 +97,23 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
if (edr == null) {
return notFound("EDR entry not found for id: " + id);
}
var entry = entriesByEdrId.get(edr.getId());
var entry = entriesByEdrId.remove(edr.getId());
var entries = entriesByAssetId.get(entry.getAssetId());
entries.remove(entry);
if (entries.isEmpty()) {
entriesByAssetId.remove(entry.getAssetId());
}

return success(entry);
});
}

private Stream<EndpointDataReferenceEntry> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);

return entriesByEdrId.values().stream()
.filter(predicate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,15 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class InMemoryEndpointDataReferenceCacheTest {
private static final String TRANSFER_PROCESS_ID = "tp1";
private static final String ASSET_ID = "asset1";
private static final String AGREEMENT_ID = "agreement1";

private static final String EDR_ID = "edr1";
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();

@Test
@SuppressWarnings("DataFlowIssue")
void verify_operations() {
var edr = EndpointDataReference.Builder.newInstance()
.endpoint("http://test.com")
.id(EDR_ID)
.authCode("11111")
.authKey("authentication").build();

var entry = EndpointDataReferenceEntry.Builder.newInstance()
.assetId(ASSET_ID)
.agreementId(AGREEMENT_ID)
.transferProcessId(TRANSFER_PROCESS_ID)
.build();

cache.save(entry, edr);

assertThat(cache.resolveReference(TRANSFER_PROCESS_ID).getId()).isEqualTo(EDR_ID);

var edrs = cache.referencesForAsset(ASSET_ID);
assertThat(edrs.size()).isEqualTo(1);
assertThat(edrs.get((0)).getId()).isEqualTo(EDR_ID);

var entries = cache.entriesForAsset(ASSET_ID);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAssetId()).isEqualTo(ASSET_ID);

entries = cache.entriesForAgreement(AGREEMENT_ID);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAgreementId()).isEqualTo(AGREEMENT_ID);

assertThat(cache.deleteByTransferProcessId(TRANSFER_PROCESS_ID).succeeded()).isTrue();

assertThat(cache.entriesForAsset(ASSET_ID)).isEmpty();
assertThat(cache.resolveReference(TRANSFER_PROCESS_ID)).isNull();
@Override
protected EndpointDataReferenceCache getStore() {
return cache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ plugins {
dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:edr-cache-sql"))
runtimeOnly(libs.edc.azure.vault)
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(project(":edc-extensions:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
runtimeOnly(libs.edc.sql.pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -88,26 +87,27 @@ public ServiceResult<EndpointDataReference> findByTransferProcessId(String trans

@Override
public ServiceResult<List<EndpointDataReferenceEntry>> findByAssetAndAgreement(String assetId, String agreementId) {
var results = queryEdrs(assetId, agreementId)
.stream()
.filter(fieldFilter(assetId, EndpointDataReferenceEntry::getAssetId))
.filter(fieldFilter(agreementId, EndpointDataReferenceEntry::getAgreementId))
.collect(Collectors.toList());
var results = queryEdrs(assetId, agreementId).collect(Collectors.toList());
return success(results);
}

private Predicate<EndpointDataReferenceEntry> fieldFilter(String value, Function<EndpointDataReferenceEntry, String> function) {
return entry -> Optional.ofNullable(value)
.map(val -> val.equals(function.apply(entry)))
.orElse(true);
private Stream<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
var queryBuilder = QuerySpec.Builder.newInstance();
if (assetId != null) {
queryBuilder.filter(fieldFilter("assetId", assetId));
}
if (agreementId != null) {
queryBuilder.filter(fieldFilter("agreementId", agreementId));
}
return endpointDataReferenceCache.queryForEntries(queryBuilder.build());
}

private List<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
// Try first for agreementId and then assetId
return Optional.ofNullable(agreementId)
.map(endpointDataReferenceCache::entriesForAgreement)
.or(() -> Optional.ofNullable(assetId).map(endpointDataReferenceCache::entriesForAsset))
.orElseGet(Collections::emptyList);
}

private Criterion fieldFilter(String field, String value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
.build();
}
}
27 changes: 27 additions & 0 deletions edc-extensions/edr-cache-sql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# SQL-based `EndpointDataReferenceCache` extension

This extensions provide a persistent implementation of `EndpointDataReferenceCache`.

It will store in the database this fields:

- tranferProcessId
- agreementId
- assetId
- edrId

It represent a single EDR negotiation done with the new Control Plane Adapter APIs.

The EDR itself it is stored in the participant vault with a prefixed key `edr__<edrId>`.

**_Note that the SQL statements (DDL) are specific to and only tested with PostgreSQL. Using it with other RDBMS may
work but might have unexpected side effects!_**

## 1. Table schema

see [schema.sql](docs/schema.sql).

## 2. Configuration

| Key | Description | Mandatory | Default |
|:---------------------------------------|:----------------------------------|-----------|---------|
| edc.datasource.edr.name | Datasource used by this extension | | edr |
35 changes: 35 additions & 0 deletions edc-extensions/edr-cache-sql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
`maven-publish`
}

dependencies {
implementation(project(":spi:edr-cache-spi"))

implementation(libs.edc.spi.core)
implementation(libs.edc.core.sql)
implementation(libs.edc.spi.transactionspi)
implementation(libs.edc.spi.transaction.datasource)

testImplementation(libs.edc.transaction.local)

testImplementation(testFixtures(project(":spi:edr-cache-spi")))
testImplementation(testFixtures(libs.edc.core.sql))

testImplementation(testFixtures(libs.edc.junit))

}
22 changes: 22 additions & 0 deletions edc-extensions/edr-cache-sql/docs/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
--
-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--

CREATE TABLE IF NOT EXISTS edc_edr_cache
(
transfer_process_id VARCHAR NOT NULL PRIMARY KEY,
agreement_id VARCHAR NOT NULL,
asset_id VARCHAR NOT NULL,
edr_id VARCHAR NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
Loading