Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EDR): adds EDR state machine for handling EDR renewal #620

Merged
merged 4 commits into from
Jul 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -9,8 +9,8 @@ maven/mavencentral/com.azure/azure-core-http-netty/1.13.5, MIT AND Apache-2.0, a
maven/mavencentral/com.azure/azure-core/1.39.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-core/1.40.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-core/1.41.0, MIT AND Apache-2.0, approved, #9648
maven/mavencentral/com.azure/azure-identity/1.9.0, MIT, approved, clearlydefined
maven/mavencentral/com.azure/azure-identity/1.9.2, , restricted, clearlydefined
maven/mavencentral/com.azure/azure-identity/1.9.0, MIT AND Apache-2.0, approved, #9686
maven/mavencentral/com.azure/azure-identity/1.9.2, MIT AND Apache-2.0, approved, #9686
maven/mavencentral/com.azure/azure-json/1.0.1, MIT AND Apache-2.0, approved, #7933
maven/mavencentral/com.azure/azure-security-keyvault-secrets/4.6.2, MIT, approved, #7940
maven/mavencentral/com.azure/azure-security-keyvault-secrets/4.6.3, MIT, approved, #7940
@@ -162,8 +162,8 @@ maven/mavencentral/io.netty/netty-transport/4.1.94.Final, Apache-2.0 AND BSD-3-C
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.27.0, Apache-2.0, approved, #9270
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.27.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.27.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.33, , restricted, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.28, Apache-2.0, approved, #9687
maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.33, Apache-2.0, approved, #9687
maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.28, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.33, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.projectreactor/reactor-core/3.4.27, Apache-2.0, approved, #7517
@@ -444,7 +444,10 @@ maven/mavencentral/org.slf4j/slf4j-api/1.7.36, MIT, approved, CQ13368
maven/mavencentral/org.slf4j/slf4j-api/1.7.7, MIT, approved, CQ9827
maven/mavencentral/org.slf4j/slf4j-api/2.0.5, MIT, approved, #5915
maven/mavencentral/org.slf4j/slf4j-api/2.0.7, MIT, approved, #5915
maven/mavencentral/org.testcontainers/database-commons/1.18.3, MIT, approved, clearlydefined
maven/mavencentral/org.testcontainers/jdbc/1.18.3, MIT, approved, clearlydefined
maven/mavencentral/org.testcontainers/junit-jupiter/1.18.3, MIT, approved, #7941
maven/mavencentral/org.testcontainers/postgresql/1.18.3, MIT, approved, #9332
maven/mavencentral/org.testcontainers/testcontainers/1.18.3, MIT, approved, #7938
maven/mavencentral/org.testcontainers/vault/1.18.3, MIT, approved, #7927
maven/mavencentral/org.yaml/snakeyaml/1.33, Apache-2.0, approved, clearlydefined
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ protected Object property(String key, Object object) {
case "assetId" -> entry.getAssetId();
case "agreementId" -> entry.getAgreementId();
case "providerId" -> entry.getProviderId();
case "state" -> entry.getState();
default -> null;
};
}
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.entity.StatefulEntity;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
@@ -24,18 +26,22 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
import static org.eclipse.edc.spi.result.StoreResult.notFound;
import static org.eclipse.edc.spi.result.StoreResult.success;
@@ -44,45 +50,71 @@
* An in-memory, threadsafe implementation of the cache.
*/
public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache {
private static final long DEFAULT_LEASE_TIME_MILLIS = 60_000;
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();


private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;

private final Map<String, EndpointDataReference> edrsByTransferProcessId;
private final String lockId;

private final Map<String, Lease> leases;

private final Clock clock;

public InMemoryEndpointDataReferenceCache() {
this(UUID.randomUUID().toString(), Clock.systemUTC(), new ConcurrentHashMap<>());
}

public InMemoryEndpointDataReferenceCache(String lockId, Clock clock, Map<String, Lease> leases) {
this.lockId = lockId;
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByEdrId = new ConcurrentHashMap<>();
edrsByTransferProcessId = new HashMap<>();
this.leases = leases;
this.clock = clock;
}

@Override
public @Nullable EndpointDataReference resolveReference(String transferProcessId) {
return lockManager.readLock(() -> edrsByTransferProcessId.get(transferProcessId));
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
return lockManager.readLock(() -> {
var edr = edrsByTransferProcessId.get(transferProcessId);
return entriesByEdrId.get(edr.getId());
});
}

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
var entries = entriesByAssetId.get(assetId);
return lockManager.readLock(() -> {
var entries = entriesByAssetId.get(assetId);

Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);
Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);

if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());
if (entries == null) {
return emptyList();
}
return entries.stream()
.filter(providerIdFilter)
.filter(this::filterActive)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());

});
}

@Override
@@ -102,9 +134,26 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
});
}

@Override
public void update(EndpointDataReferenceEntry entry) {
lockManager.writeLock(() -> {
acquireLease(entry.getTransferProcessId(), lockId);
var edr = edrsByTransferProcessId.get(entry.getTransferProcessId());
entriesByEdrId.put(edr.getId(), entry);
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.removeIf((edrEntry) -> edrEntry.getTransferProcessId().equals(entry.getTransferProcessId()));
list.add(entry);
freeLease(entry.getTransferProcessId());
return null;
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return lockManager.writeLock(() -> {
if (isLeased(id)) {
throw new IllegalStateException("EndpointDataReferenceEntry is leased and cannot be deleted!");
}
var edr = edrsByTransferProcessId.remove(id);
if (edr == null) {
return notFound("EDR entry not found for id: " + id);
@@ -120,12 +169,55 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
});
}

@Override
public @NotNull List<EndpointDataReferenceEntry> nextNotLeased(int max, Criterion... criteria) {
return leaseAndGet(max, criteria);
}


private @NotNull List<EndpointDataReferenceEntry> leaseAndGet(int max, Criterion... criteria) {
return lockManager.writeLock(() -> {
var filterPredicate = Arrays.stream(criteria).map(predicateConverter::convert).reduce(x -> true, Predicate::and);
var entities = entriesByEdrId.values().stream()
.filter(filterPredicate)
.filter(e -> !isLeased(e.getId()))
.sorted(comparingLong(StatefulEntity::getStateTimestamp)) //order by state timestamp, oldest first
.limit(max)
.toList();
entities.forEach(i -> acquireLease(i.getId(), lockId));
return entities.stream().map(StatefulEntity::copy).collect(toList());
});
}

private Stream<EndpointDataReferenceEntry> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);
return lockManager.readLock(() -> {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);

return entriesByEdrId.values().stream()
.filter(predicate);
});

}

private void freeLease(String id) {
leases.remove(id);
}

private void acquireLease(String id, String lockId) {
if (!isLeased(id) || isLeasedBy(id, lockId)) {
leases.put(id, new Lease(lockId, clock.millis(), DEFAULT_LEASE_TIME_MILLIS));
} else {
throw new IllegalStateException("Cannot acquire lease, is already leased by someone else!");
}
}

private boolean isLeased(String id) {
return leases.containsKey(id) && !leases.get(id).isExpired(clock.millis());
}

return entriesByEdrId.values().stream()
.filter(predicate);
private boolean isLeasedBy(String id, String lockId) {
return isLeased(id) && leases.get(id).getLeasedBy().equals(lockId);
}
}
Original file line number Diff line number Diff line change
@@ -14,15 +14,42 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.edc.spi.persistence.Lease;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheTestBase;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheTestBase {
private final HashMap<String, Lease> leases = new HashMap<>();
private InMemoryEndpointDataReferenceCache cache;

@BeforeEach
void setUp() {
cache = new InMemoryEndpointDataReferenceCache(CONNECTOR_NAME, Clock.systemUTC(), leases);
}

@Override
protected EndpointDataReferenceCache getStore() {
return cache;
}

@Override
protected void lockEntity(String negotiationId, String owner, Duration duration) {
leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis()));
}

@Override
protected boolean isLockedBy(String negotiationId, String owner) {
return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) &&
e.getValue().getLeasedBy().equals(owner) &&
!isExpired(e.getValue()));
}

private boolean isExpired(Lease e) {
return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis();
}
}
26 changes: 26 additions & 0 deletions core/edr-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# EDR core extension

This extension provide a base implementation of `EdrManager` and `EdrService` both
required for interacting with the EDR APIs and state machine

The EDR state machine handle the lifecycle of a negotiated EDR. The negotiation request can be submitted
via EDR APIs, and it will go through two phases:

- Contract Negotiation
- Transfer Request

Once the latter has completed the EDR entry will be saved with the associated EDR in the primordial state `NEGOTIATED`
The state machine will also manage the lifecycle and the renewal of the `EDR`. If a token is about to expire it will
transition to the `REFRESHING` state and fire off another transfer process with the same parameter of the expiring
one. Once completed the new `EDR` will be cached and the old ones, with same `assetId` and `agreementId` will transition
into the `EXPIRED` state. Then the state machine will also monitor the `EXPIRED` ones, and will delete them according to the
retention configuration.

## 1. EDR state machine Configuration

| Key | Description | Mandatory | Default |
|:--------------------------------------------|:----------------------------------------------------------------------------------------------------|-----------|---------|
| edc.edr.state-machine.iteration-wait-millis | The iteration wait time in milliseconds in the edr state machine | | 1000 |
| edc.edr.state-machine.batch-size | The batch size in the edr negotiation state machine | | 20 |
| edc.edr.state-machine.expiring-duration | The minimum duration on which the EDR token can be eligible for renewal (seconds) | | 60 |
| edc.edr.state-machine.expired-retention | The minimum duration on with the EDR token can be eligible for deletion when it's expired (seconds) | | 60 |
6 changes: 5 additions & 1 deletion core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -23,10 +23,14 @@ dependencies {
implementation(libs.edc.spi.aggregateservices)
implementation(libs.edc.spi.contract)
implementation(libs.edc.spi.controlplane)
implementation(libs.edc.statemachine)

implementation(project(":spi:edr-spi"))

implementation(project(":spi:core-spi"))


testImplementation(libs.edc.junit)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))

}
Original file line number Diff line number Diff line change
@@ -15,40 +15,115 @@
package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.tractusx.edc.edr.core.manager.EdrManagerImpl;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

import java.time.Clock;
import java.time.Duration;

/**
* Registers default services for the EDR cache.
*/
@Provides(EdrManager.class)
@Extension(value = EdrCoreExtension.NAME)
public class EdrCoreExtension implements ServiceExtension {
static final String NAME = "EDR Core";
public static final long DEFAULT_ITERATION_WAIT = 1000;

public static final int DEFAULT_BATCH_SIZE = 20;

public static final int DEFAULT_SEND_RETRY_LIMIT = 7;

public static final long DEFAULT_SEND_RETRY_BASE_DELAY = 1000L;
public static final long DEFAULT_EXPIRING_DURATION = 60;

public static final long DEFAULT_EXPIRED_RETENTION = 60;

protected static final String NAME = "EDR Core";
@Setting(value = "The iteration wait time in milliseconds in the edr state machine.", type = "long", defaultValue = "" + DEFAULT_ITERATION_WAIT)
private static final String EDR_STATE_MACHINE_ITERATION_WAIT_MILLIS = "edc.edr.state-machine.iteration-wait-millis";
@Setting(value = "The batch size in the edr negotiation state machine.", type = "int", defaultValue = "" + DEFAULT_BATCH_SIZE)
private static final String EDR_STATE_MACHINE_BATCH_SIZE = "edc.edr.state-machine.batch-size";
@Setting(value = "The minimum duration on which the EDR token can be eligible for renewal", type = "long", defaultValue = "" + DEFAULT_EXPIRING_DURATION)
private static final String EDR_STATE_MACHINE_EXPIRING_DURATION = "edc.edr.state-machine.expiring-duration";

@Setting(value = "The minimum duration on with the EDR token can be eligible for deletion when it's expired.", type = "long", defaultValue = "" + DEFAULT_EXPIRED_RETENTION)
private static final String EDR_STATE_MACHINE_EXPIRED_RETENTION = "edc.edr.state-machine.expired-retention";

@Inject
private Monitor monitor;

@Inject
private ContractNegotiationService contractNegotiationService;


@Inject
private TransferProcessService transferProcessService;
@Inject
private EndpointDataReferenceCache endpointDataReferenceCache;

@Inject
private ExecutorInstrumentation executorInstrumentation;

@Inject
private Telemetry telemetry;

@Inject
private Clock clock;
private EdrManagerImpl edrManager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {

var iterationWaitMillis = context.getSetting(EDR_STATE_MACHINE_ITERATION_WAIT_MILLIS, DEFAULT_ITERATION_WAIT);

var expiringDuration = context.getSetting(EDR_STATE_MACHINE_EXPIRING_DURATION, DEFAULT_EXPIRING_DURATION);

var expiredRetention = context.getSetting(EDR_STATE_MACHINE_EXPIRED_RETENTION, DEFAULT_EXPIRED_RETENTION);

@Provider
public EdrService adapterTransferProcessService() {
return new EdrServiceImpl(contractNegotiationService, endpointDataReferenceCache);

edrManager = EdrManagerImpl.Builder.newInstance()
.contractNegotiationService(contractNegotiationService)
.monitor(monitor)
.waitStrategy(new ExponentialWaitStrategy(iterationWaitMillis))
.executorInstrumentation(executorInstrumentation)
.edrCache(endpointDataReferenceCache)
.transferProcessService(transferProcessService)
.telemetry(telemetry)
.batchSize(context.getSetting(EDR_STATE_MACHINE_BATCH_SIZE, DEFAULT_BATCH_SIZE))
.expiringDuration(Duration.ofSeconds(expiringDuration))
.expiredRetention(Duration.ofSeconds(expiredRetention))
.clock(clock)
.build();

context.registerService(EdrManager.class, edrManager);
}

@Override
public void start() {
edrManager.start();
}

@Override
public void shutdown() {
if (edrManager != null) {
edrManager.stop();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

/**
* Registers default services for the EDR cache.
*/
@Extension(value = EdrCoreServiceExtension.NAME)
public class EdrCoreServiceExtension implements ServiceExtension {
protected static final String NAME = "EDR Core Service extension";

@Inject
private Monitor monitor;

@Inject
private EdrManager edrManager;

@Inject
private EndpointDataReferenceCache endpointDataReferenceCache;

@Override
public String name() {
return NAME;
}


@Provider
public EdrService edrService() {
return new EdrServiceImpl(edrManager, endpointDataReferenceCache);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -15,48 +15,40 @@
package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;

public class EdrServiceImpl implements EdrService {

public static final String LOCAL_ADAPTER_URI = "local://adapter";
public static final Set<String> LOCAL_EVENTS = Set.of("contract.negotiation", "transfer.process");
public static final CallbackAddress LOCAL_CALLBACK = CallbackAddress.Builder.newInstance()
.transactional(true)
.uri(LOCAL_ADAPTER_URI)
.events(LOCAL_EVENTS)
.build();
private final ContractNegotiationService contractNegotiationService;
private final EdrManager edrManager;

private final EndpointDataReferenceCache endpointDataReferenceCache;

public EdrServiceImpl(ContractNegotiationService contractNegotiationService, EndpointDataReferenceCache endpointDataReferenceCache) {
this.contractNegotiationService = contractNegotiationService;
public EdrServiceImpl(EdrManager edrManager, EndpointDataReferenceCache endpointDataReferenceCache) {
this.edrManager = edrManager;
this.endpointDataReferenceCache = endpointDataReferenceCache;
}

@Override
public ServiceResult<ContractNegotiation> initiateEdrNegotiation(NegotiateEdrRequest request) {
var contractNegotiation = contractNegotiationService.initiateNegotiation(createContractRequest(request));
return ServiceResult.success(contractNegotiation);
var contractNegotiation = edrManager.initiateEdrNegotiation(request);
if (contractNegotiation.succeeded()) {
return ServiceResult.success(contractNegotiation.getContent());
} else {
return ServiceResult.badRequest(contractNegotiation.getFailureMessages());
}
}

@Override
@@ -79,19 +71,4 @@ public ServiceResult<EndpointDataReferenceEntry> deleteByTransferProcessId(Strin
return ServiceResult.from(deleted);
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

var requestData = ContractRequestData.Builder.newInstance()
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.callbackAddresses(callbacks).build();
}

}
Original file line number Diff line number Diff line change
@@ -13,3 +13,4 @@
#

org.eclipse.tractusx.edc.edr.core.EdrCoreExtension
org.eclipse.tractusx.edc.edr.core.EdrCoreServiceExtension
Original file line number Diff line number Diff line change
@@ -12,13 +12,13 @@
*
*/

package org.eclipse.tractusx.edc.edr.core.service;
package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.ObjectFactory;
import org.eclipse.tractusx.edc.edr.core.EdrCoreExtension;
import org.eclipse.tractusx.edc.edr.core.manager.EdrManagerImpl;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -30,21 +30,18 @@
@ExtendWith(DependencyInjectionExtension.class)
public class EdrCoreExtensionTest {

EdrCoreExtension extension;

@BeforeEach
void setUp(ObjectFactory factory, ServiceExtensionContext context) {
void setUp(ServiceExtensionContext context) {
context.registerService(ContractNegotiationService.class, mock(ContractNegotiationService.class));
context.registerService(EndpointDataReferenceCache.class, mock(EndpointDataReferenceCache.class));
extension = factory.constructInstance(EdrCoreExtension.class);
}

@Test
void shouldInitializeTheExtension(ServiceExtensionContext context) {
void shouldInitializeTheExtension(ServiceExtensionContext context, EdrCoreExtension extension) {
extension.initialize(context);
var service = extension.adapterTransferProcessService();
assertThat(service).isInstanceOf(EdrServiceImpl.class);

var service = context.getService(EdrManager.class);
assertThat(service).isInstanceOf(EdrManagerImpl.class);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@ExtendWith(DependencyInjectionExtension.class)
public class EdrCoreServiceExtensionTest {

@BeforeEach
void setUp(ServiceExtensionContext context) {
context.registerService(EdrManager.class, mock(EdrManager.class));
context.registerService(EndpointDataReferenceCache.class, mock(EndpointDataReferenceCache.class));
}

@Test
void shouldInitializeTheExtension(ServiceExtensionContext context, EdrCoreServiceExtension extension) {
extension.initialize(context);

var service = extension.edrService();
assertThat(service).isInstanceOf(EdrServiceImpl.class);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.fixtures;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;

import java.util.List;
import java.util.Set;

public class TestFunctions {


public static NegotiateEdrRequest getNegotiateEdrRequest() {
return NegotiateEdrRequest.Builder.newInstance()
.protocol("protocol")
.connectorAddress("http://test")
.callbackAddresses(List.of(CallbackAddress.Builder.newInstance().uri("test").events(Set.of("test")).build()))
.offer(ContractOffer.Builder.newInstance()
.id("id")
.assetId("assetId")
.policy(Policy.Builder.newInstance().build())
.providerId("provider")
.build())
.build();
}

public static ContractNegotiation getContractNegotiation() {
return ContractNegotiation.Builder.newInstance()
.id("id")
.counterPartyAddress("http://test")
.counterPartyId("provider")
.protocol("protocol")
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.manager;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
import static org.eclipse.tractusx.edc.edr.core.EdrCoreExtension.DEFAULT_BATCH_SIZE;
import static org.eclipse.tractusx.edc.edr.core.EdrCoreExtension.DEFAULT_EXPIRING_DURATION;
import static org.eclipse.tractusx.edc.edr.core.fixtures.TestFunctions.getContractNegotiation;
import static org.eclipse.tractusx.edc.edr.core.fixtures.TestFunctions.getNegotiateEdrRequest;
import static org.eclipse.tractusx.edc.edr.core.manager.EdrManagerImpl.LOCAL_CALLBACK;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.DELETING;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.ERROR;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.NEGOTIATED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class EdrManagerImplTest {

private final EndpointDataReferenceCache edrCache = mock(EndpointDataReferenceCache.class);
private final ContractNegotiationService negotiationService = mock(ContractNegotiationService.class);
private final TransferProcessService transferProcessService = mock(TransferProcessService.class);
private EdrManagerImpl edrManager;

@BeforeEach
void setup() {
edrManager = EdrManagerImpl.Builder.newInstance()
.contractNegotiationService(negotiationService)
.transferProcessService(transferProcessService)
.edrCache(edrCache)
.monitor(mock(Monitor.class))
.expiredRetention(Duration.ofSeconds(1))
.clock(Clock.systemUTC())
.build();
}

@Test
@DisplayName("Verify that EDR negotiation is initiated")
void initEdrNegotiation() {

var captor = ArgumentCaptor.forClass(ContractRequest.class);

when(negotiationService.initiateNegotiation(any())).thenReturn(getContractNegotiation());

var negotiateEdrRequest = getNegotiateEdrRequest();

var result = edrManager.initiateEdrNegotiation(negotiateEdrRequest);

assertThat(result.succeeded()).isTrue();
assertThat(result.getContent()).isNotNull();

verify(negotiationService).initiateNegotiation(captor.capture());

var msg = captor.getValue();

assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().containsAll(negotiateEdrRequest.getCallbackAddresses());
assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(LOCAL_CALLBACK);
assertThat(msg.getRequestData().getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getRequestData().getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getRequestData().getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());

}

@Test
@DisplayName("Verify that EDR state should transition to REFRESHING")
void initial_shouldTransitionRequesting() {
var edrEntry = edrEntryBuilder().state(NEGOTIATED.code()).build();
var transferProcess = createTransferProcessBuilder().build();
when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code()))).thenReturn(List.of(edrEntry)).thenReturn(emptyList());
when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

edrManager.start();

await().untilAsserted(() -> verify(edrCache).update(argThat(p -> p.getState() == REFRESHING.code())));
}

@Test
@DisplayName("Verify that EDR state should not transition to REFRESHING when the token it's not expired")
void initial_shouldNotTransitionToRefreshing_WhenNotExpired() {
var expiration = Instant.now().atOffset(ZoneOffset.UTC).toInstant().plusSeconds(DEFAULT_EXPIRING_DURATION + 10);
var edrEntry = edrEntryBuilder().expirationTimestamp(expiration.toEpochMilli()).state(NEGOTIATED.code()).build();
var transferProcess = createTransferProcessBuilder().build();
when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code())))
.thenReturn(List.of(edrEntry))
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

edrManager.start();

await().untilAsserted(() -> {
verify(edrCache, atLeast(2)).nextNotLeased(anyInt(), stateIs(NEGOTIATED.code()));
verify(edrCache, times(0)).update(argThat(p -> p.getState() == REFRESHING.code()));
});
}


@Test
@DisplayName("Verify that EDR state should transition to ERROR the transfer process is not found")
void initial_shouldTransitionError_whenTransferProcessNotFound() {
var edrEntry = edrEntryBuilder().state(NEGOTIATED.code()).build();
when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code())))
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(null);

edrManager.start();

await().untilAsserted(() -> verify(edrCache).update(argThat(p -> p.getState() == ERROR.code())));
}


@Test
@DisplayName("Verify that EDR state should not transition to ERROR on transient errors")
void initial_shouldNotTransitionError_whenInitiatedTransferFailsOnce() {
var edrEntry = edrEntryBuilder().state(NEGOTIATED.code()).build();
var transferProcess = createTransferProcessBuilder().build();

when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code())))
.thenReturn(List.of(edrEntry))
.thenReturn(List.of(edrEntry.copy()))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any()))
.thenReturn(ServiceResult.badRequest("bad"))
.thenReturn(ServiceResult.success(transferProcess));


edrManager.start();

await().untilAsserted(() -> {
var captor = ArgumentCaptor.forClass(EndpointDataReferenceEntry.class);
verify(edrCache, times(2)).update(captor.capture());
var states = captor.getAllValues().stream().map(EndpointDataReferenceEntry::getState).toList();
assertThat(states).containsExactly(NEGOTIATED.code(), REFRESHING.code());
});
}

@Test
@DisplayName("Verify that EDR state should transition to deleting when the retention period is over")
void initial_shouldTransitionToDeleting_whenTheRetentionPeriodIsOver() {
var expiration = Instant.now().atOffset(ZoneOffset.UTC).toInstant().minusSeconds(DEFAULT_EXPIRING_DURATION + 10);
var edrEntry = edrEntryBuilder().state(EXPIRED.code()).expirationTimestamp(expiration.toEpochMilli()).build();

when(edrCache.nextNotLeased(anyInt(), stateIs(EXPIRED.code())))
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

edrManager.start();

await().untilAsserted(() -> verify(edrCache).update(argThat(p -> p.getState() == DELETING.code())));
}

@Test
@DisplayName("Verify that EDR is deleted when state is DELETING")
void initial_shouldDeleteTheEntry_whenTheRetentionPeriodIsOver() {
var expiration = Instant.now().atOffset(ZoneOffset.UTC).toInstant().minusSeconds(DEFAULT_EXPIRING_DURATION + 10);
var edrEntry = edrEntryBuilder().state(DELETING.code()).expirationTimestamp(expiration.toEpochMilli()).build();

var query = QuerySpec.Builder.newInstance()
.filter(hasState(DELETING.code()))
.limit(DEFAULT_BATCH_SIZE)
.build();

when(edrCache.queryForEntries(query))
.thenReturn(Stream.of(edrEntry))
.thenReturn(Stream.empty());


when(edrCache.deleteByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success());

edrManager.start();

await().untilAsserted(() -> {
verify(edrCache, times(1)).deleteByTransferProcessId(edrEntry.getTransferProcessId());
});
}


private EndpointDataReferenceEntry.Builder edrEntryBuilder() {
return EndpointDataReferenceEntry.Builder.newInstance()
.id(UUID.randomUUID().toString())
.assetId(UUID.randomUUID().toString())
.agreementId(UUID.randomUUID().toString())
.transferProcessId(UUID.randomUUID().toString())
.expirationTimestamp(Instant.now().toEpochMilli())
.stateTimestamp(Instant.now().toEpochMilli());
}

private TransferProcess.Builder createTransferProcessBuilder() {
var processId = UUID.randomUUID().toString();
var dataRequest = createDataRequestBuilder()
.processId(processId)
.protocol("protocol")
.connectorAddress("http://an/address")
.managedResources(false)
.build();

return TransferProcess.Builder.newInstance()
.provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build())
.type(CONSUMER)
.id("test-process-" + processId)
.state(TransferProcessStates.COMPLETED.code())
.dataRequest(dataRequest);
}

private DataRequest.Builder createDataRequestBuilder() {
return DataRequest.Builder.newInstance()
.id(UUID.randomUUID().toString())
.contractId(UUID.randomUUID().toString())
.assetId(UUID.randomUUID().toString())
.destinationType("test-type");
}

private Criterion[] stateIs(int state) {
return aryEq(new Criterion[]{ hasState(state) });
}

}
Original file line number Diff line number Diff line change
@@ -15,52 +15,47 @@
package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.service.spi.result.ServiceFailure;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EdrManager;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl.LOCAL_CALLBACK;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class EdrServiceImplTest {

ContractNegotiationService contractNegotiationService = mock(ContractNegotiationService.class);
EdrManager edrManager = mock(EdrManager.class);

EndpointDataReferenceCache endpointDataReferenceCache = mock(EndpointDataReferenceCache.class);

EdrServiceImpl transferService;

@BeforeEach
void setup() {
transferService = new EdrServiceImpl(contractNegotiationService, endpointDataReferenceCache);
transferService = new EdrServiceImpl(edrManager, endpointDataReferenceCache);
}

@Test
void initEdrNegotiation_shouldFireContractNegotiation_WhenUsingCallbacks() {

var captor = ArgumentCaptor.forClass(ContractRequest.class);

when(contractNegotiationService.initiateNegotiation(any())).thenReturn(getContractNegotiation());
when(edrManager.initiateEdrNegotiation(any())).thenReturn(StatusResult.success(getContractNegotiation()));

var negotiateEdrRequest = getNegotiateEdrRequest();

@@ -69,16 +64,6 @@ void initEdrNegotiation_shouldFireContractNegotiation_WhenUsingCallbacks() {
assertThat(result.succeeded()).isTrue();
assertThat(result.getContent()).isNotNull();

verify(contractNegotiationService).initiateNegotiation(captor.capture());

var msg = captor.getValue();

assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().containsAll(negotiateEdrRequest.getCallbackAddresses());
assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(LOCAL_CALLBACK);
assertThat(msg.getRequestData().getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getRequestData().getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getRequestData().getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());

}

@Test
@@ -152,7 +137,7 @@ void queryEdrs() {
.extracting(ServiceResult::getContent)
.extracting(List::size)
.isEqualTo(0);

}

private NegotiateEdrRequest getNegotiateEdrRequest() {
Original file line number Diff line number Diff line change
@@ -44,6 +44,9 @@
import java.util.stream.Collectors;

import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.PROVIDER_ID;

@Consumes({ MediaType.APPLICATION_JSON })
@Produces({ MediaType.APPLICATION_JSON })
@@ -124,13 +127,13 @@ private void logIfError(Result<?> result) {
private QuerySpec querySpec(String assetId, String agreementId, String providerId) {
var queryBuilder = QuerySpec.Builder.newInstance();
if (assetId != null) {
queryBuilder.filter(fieldFilter("assetId", assetId));
queryBuilder.filter(fieldFilter(ASSET_ID, assetId));
}
if (agreementId != null) {
queryBuilder.filter(fieldFilter("agreementId", agreementId));
queryBuilder.filter(fieldFilter(AGREEMENT_ID, agreementId));
}
if (providerId != null) {
queryBuilder.filter(fieldFilter("providerId", agreementId));
queryBuilder.filter(fieldFilter(PROVIDER_ID, agreementId));
}
return queryBuilder.build();
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,9 @@
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_EXPIRATION_DATE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_PROVIDER_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_STATE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_TRANSFER_PROCESS_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_TYPE;

@@ -42,6 +45,9 @@ public JsonObjectFromEndpointDataReferenceEntryTransformer() {
.add(EDR_ENTRY_AGREEMENT_ID, dto.getAgreementId())
.add(EDR_ENTRY_TRANSFER_PROCESS_ID, dto.getTransferProcessId())
.add(EDR_ENTRY_ASSET_ID, dto.getAssetId())
.add(EDR_ENTRY_PROVIDER_ID, dto.getProviderId())
.add(EDR_ENTRY_STATE, dto.getEdrState())
.add(EDR_ENTRY_EXPIRATION_DATE, dto.getExpirationTimestamp())
.build();
}

Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@
import org.eclipse.tractusx.edc.api.edr.dto.NegotiateEdrRequestDto;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -49,11 +50,15 @@
import static org.eclipse.tractusx.edc.api.edr.TestFunctions.negotiationRequest;
import static org.eclipse.tractusx.edc.api.edr.TestFunctions.openRequest;
import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.TX_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_PROVIDER_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_STATE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_TRANSFER_PROCESS_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_TYPE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.PROVIDER_ID;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -154,17 +159,18 @@ void getEdr_shouldReturnDataAddress_whenFound() {

@Test
void queryEdrs_shouldReturnCachedEntries_whenAssetIdIsProvided() {
var assetId = "id";
var transferProcessId = "id";
var agreementId = "id";
var providerId = "id";
var assetId = "assetId";
var transferProcessId = "transferProcessId";
var agreementId = "agreementId";
var providerId = "providerId";


var entry = EndpointDataReferenceEntry.Builder.newInstance()
.transferProcessId(transferProcessId)
.agreementId(agreementId)
.assetId(assetId)
.providerId(providerId)
.state(EndpointDataReferenceEntryStates.NEGOTIATED.code())
.build();

var response = Json.createObjectBuilder()
@@ -173,9 +179,10 @@ void queryEdrs_shouldReturnCachedEntries_whenAssetIdIsProvided() {
.add(EDR_ENTRY_TRANSFER_PROCESS_ID, entry.getTransferProcessId())
.add(EDR_ENTRY_AGREEMENT_ID, entry.getAgreementId())
.add(EDR_ENTRY_PROVIDER_ID, entry.getProviderId())
.add(EDR_ENTRY_STATE, entry.getEdrState())
.build();

var filter = QuerySpec.Builder.newInstance().filter(fieldFilter("assetId", assetId)).build();
var filter = QuerySpec.Builder.newInstance().filter(fieldFilter(ASSET_ID, assetId)).build();

when(edrService.findBy(eq(filter))).thenReturn(ServiceResult.success(List.of(entry)));
when(transformerRegistry.transform(any(EndpointDataReferenceEntry.class), eq(JsonObject.class))).thenReturn(Result.success(response));
@@ -188,7 +195,8 @@ void queryEdrs_shouldReturnCachedEntries_whenAssetIdIsProvided() {
.body("[0].'edc:transferProcessId'", is(entry.getTransferProcessId()))
.body("[0].'edc:agreementId'", is(entry.getAgreementId()))
.body("[0].'edc:assetId'", is(entry.getAssetId()))
.body("[0].'edc:providerId'", is(entry.getProviderId()));
.body("[0].'edc:providerId'", is(entry.getProviderId()))
.body("[0].'tx:edrState'", is(entry.getEdrState()));

}

@@ -216,8 +224,8 @@ void queryEdrs_shouldReturnCachedEntries_whenAgreementIdIsProvided() {
.build();

var filter = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
.filter(fieldFilter("providerId", entry.getProviderId()))
.filter(fieldFilter(AGREEMENT_ID, agreementId))
.filter(fieldFilter(PROVIDER_ID, entry.getProviderId()))
.build();

when(edrService.findBy(eq(filter))).thenReturn(ServiceResult.success(List.of(entry)));
Original file line number Diff line number Diff line change
@@ -16,12 +16,18 @@

import org.eclipse.edc.transform.spi.TransformerContext;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_AGREEMENT_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_EXPIRATION_DATE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_PROVIDER_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_STATE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.EDR_ENTRY_TRANSFER_PROCESS_ID;
import static org.mockito.Mockito.mock;

@@ -42,6 +48,9 @@ void transform() {
.assetId("id")
.transferProcessId("tpId")
.agreementId("aId")
.providerId("providerId")
.state(EndpointDataReferenceEntryStates.NEGOTIATED.code())
.expirationTimestamp(Instant.now().toEpochMilli())
.build();

var jsonObject = transformer.transform(dto, context);
@@ -50,5 +59,9 @@ void transform() {
assertThat(jsonObject.getJsonString(EDR_ENTRY_AGREEMENT_ID).getString()).isNotNull().isEqualTo(dto.getAgreementId());
assertThat(jsonObject.getJsonString(EDR_ENTRY_ASSET_ID).getString()).isNotNull().isEqualTo(dto.getAssetId());
assertThat(jsonObject.getJsonString(EDR_ENTRY_TRANSFER_PROCESS_ID).getString()).isNotNull().isEqualTo(dto.getTransferProcessId());
assertThat(jsonObject.getJsonString(EDR_ENTRY_PROVIDER_ID).getString()).isNotNull().isEqualTo(dto.getProviderId());
assertThat(jsonObject.getJsonString(EDR_ENTRY_STATE).getString()).isNotNull().isEqualTo(dto.getEdrState());
assertThat(jsonObject.getJsonNumber(EDR_ENTRY_EXPIRATION_DATE).longValue()).isNotNull().isEqualTo(dto.getExpirationTimestamp());

}
}
2 changes: 2 additions & 0 deletions edc-extensions/edr/edr-cache-sql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -24,11 +24,13 @@ dependencies {
implementation(libs.edc.core.sql)
implementation(libs.edc.spi.transactionspi)
implementation(libs.edc.spi.transaction.datasource)
implementation(libs.edc.sql.lease)

testImplementation(libs.edc.transaction.local)

testImplementation(testFixtures(project(":spi:edr-spi")))
testImplementation(testFixtures(libs.edc.core.sql))
testImplementation(testFixtures(libs.edc.sql.lease))

testImplementation(testFixtures(libs.edc.junit))

32 changes: 32 additions & 0 deletions edc-extensions/edr/edr-cache-sql/docs/schema.sql
Original file line number Diff line number Diff line change
@@ -11,13 +11,45 @@
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--


CREATE TABLE IF NOT EXISTS edc_lease
(
leased_by VARCHAR NOT NULL,
leased_at BIGINT,
lease_duration INTEGER DEFAULT 60000 NOT NULL,
lease_id VARCHAR NOT NULL
CONSTRAINT lease_pk
PRIMARY KEY
);

COMMENT ON COLUMN edc_lease.leased_at IS 'posix timestamp of lease';

COMMENT ON COLUMN edc_lease.lease_duration IS 'duration of lease in milliseconds';


CREATE UNIQUE INDEX IF NOT EXISTS lease_lease_id_uindex
ON edc_lease (lease_id);

CREATE TABLE IF NOT EXISTS edc_edr_cache
(
transfer_process_id VARCHAR NOT NULL PRIMARY KEY,
agreement_id VARCHAR NOT NULL,
asset_id VARCHAR NOT NULL,
edr_id VARCHAR NOT NULL,
provider_id VARCHAR,
expiration_timestamp BIGINT,
state INTEGER DEFAULT 0 NOT NULL,
state_count INTEGER DEFAULT 0,
state_timestamp BIGINT,
error_detail VARCHAR,
lease_id VARCHAR CONSTRAINT edc_edr_cache_lease_lease_id_fk REFERENCES edc_lease ON DELETE SET NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);

CREATE INDEX IF NOT EXISTS edc_edr_asset_id_index
ON edc_edr_cache (asset_id);


CREATE INDEX IF NOT EXISTS edc_edr_agreement_id_index
ON edc_edr_cache (agreement_id);
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.ResultSetMapper;
import org.eclipse.edc.sql.lease.SqlLeaseContextBuilder;
import org.eclipse.edc.sql.store.AbstractSqlStore;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;
@@ -35,14 +37,16 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery;
import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuerySingle;
import static java.util.stream.Collectors.toList;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.ASSET_ID;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry.PROVIDER_ID;

public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements EndpointDataReferenceCache {

@@ -52,12 +56,21 @@ public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements E
private final Clock clock;
private final Vault vault;

private final SqlLeaseContextBuilder leaseContext;

public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, EdrStatements statements, ObjectMapper objectMapper, Vault vault, Clock clock) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper);
private final String leaseHolder;


public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, String dataSourceName,
TransactionContext transactionContext, EdrStatements statements,
ObjectMapper objectMapper, Vault vault, Clock clock,
QueryExecutor queryExecutor, String connectorId) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor);
this.statements = statements;
this.clock = clock;
this.vault = vault;
this.leaseHolder = connectorId;
leaseContext = SqlLeaseContextBuilder.with(transactionContext, connectorId, statements, clock, queryExecutor);
}

@Override
@@ -76,9 +89,30 @@ public SqlEndpointDataReferenceCache(DataSourceRegistry dataSourceRegistry, Stri
});
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findById(connection, transferProcessId, this::mapResultSet);
} catch (Exception exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public @NotNull List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
return internalQuery(queryFor("assetId", assetId), this::mapToEdrId).map(this::referenceFromEntry).collect(Collectors.toList());
var querySpec = QuerySpec.Builder.newInstance();
querySpec.filter(filterFor(ASSET_ID, assetId));

if (providerId != null) {
querySpec.filter(filterFor(PROVIDER_ID, providerId));
}

return internalQuery(querySpec.build(), this::mapToWrapper)
.filter(wrapper -> filterActive(wrapper.getEntry()))
.map(EndpointDataReferenceEntryWrapper::getEdrId)
.map(this::referenceFromEntry).collect(Collectors.toList());
}

@Override
@@ -91,54 +125,122 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
transactionContext.execute(() -> {
try (var connection = getConnection()) {
var sql = statements.getInsertTemplate();
var createdAt = clock.millis();
executeQuery(connection, sql, entry.getTransferProcessId(), entry.getAssetId(), entry.getAgreementId(), edr.getId(), entry.getProviderId(), createdAt, createdAt);
queryExecutor.execute(connection, sql,
entry.getTransferProcessId(),
entry.getAssetId(),
entry.getAgreementId(),
edr.getId(),
entry.getProviderId(),
entry.getExpirationTimestamp(),
entry.getState(),
entry.getStateCount(),
entry.getStateTimestamp(),
entry.getErrorDetail(),
entry.getCreatedAt(),
entry.getUpdatedAt());
vault.storeSecret(VAULT_PREFIX + edr.getId(), toJson(edr)).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
} catch (Exception exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public void update(EndpointDataReferenceEntry entry) {
transactionContext.execute(() -> {
try (var connection = getConnection()) {
leaseContext.withConnection(connection).breakLease(entry.getTransferProcessId());
var sql = statements.getUpdateTemplate();
queryExecutor.execute(connection, sql,
entry.getState(),
entry.getStateCount(),
entry.getStateTimestamp(),
entry.getErrorDetail(),
entry.getUpdatedAt(),
entry.getTransferProcessId());
} catch (SQLException exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var entryWrapper = findById(connection, id, this::mapToWrapper);
if (entryWrapper != null) {
executeQuery(connection, statements.getDeleteByIdTemplate(), id);
leaseContext.withConnection(connection).acquireLease(id);
queryExecutor.execute(connection, statements.getDeleteByIdTemplate(), id);
leaseContext.withConnection(connection).breakLease(id);
vault.deleteSecret(VAULT_PREFIX + entryWrapper.getEdrId()).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
return StoreResult.success(entryWrapper.getEntry());
} else {
return StoreResult.notFound(format("EDR with id %s not found", id));
}
} catch (Exception exception) {
} catch (SQLException exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public @NotNull List<EndpointDataReferenceEntry> nextNotLeased(int max, Criterion... criteria) {
return transactionContext.execute(() -> {
var filter = Arrays.stream(criteria).collect(toList());
var querySpec = QuerySpec.Builder.newInstance().filter(filter).limit(max).build();
var statement = statements.createQuery(querySpec);
statement.addWhereClause(statements.getNotLeasedFilter());
statement.addParameter(clock.millis());

try (
var connection = getConnection();
var stream = queryExecutor.query(getConnection(), true, this::mapResultSet, statement.getQueryAsString(), statement.getParameters())
) {
var negotiations = stream.collect(toList());
negotiations.forEach(cn -> leaseContext.withConnection(connection).acquireLease(cn.getId()));
return negotiations;
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private <T> T findById(Connection connection, String id, ResultSetMapper<T> resultSetMapper) {
var sql = statements.getFindByTransferProcessIdTemplate();
return executeQuerySingle(connection, false, resultSetMapper, sql, id);
return queryExecutor.single(connection, false, resultSetMapper, sql, id);
}

@NotNull
private <T> Stream<T> internalQuery(QuerySpec spec, ResultSetMapper<T> resultSetMapper) {
try {
var queryStmt = statements.createQuery(spec);
return executeQuery(getConnection(), true, resultSetMapper, queryStmt.getQueryAsString(), queryStmt.getParameters());
} catch (SQLException exception) {
throw new EdcPersistenceException(exception);
}
return transactionContext.execute(() -> {
try {
var queryStmt = statements.createQuery(spec);
return queryExecutor.query(getConnection(), true, resultSetMapper, queryStmt.getQueryAsString(), queryStmt.getParameters());
} catch (SQLException exception) {
throw new EdcPersistenceException(exception);
}
});

}

private EndpointDataReferenceEntry mapResultSet(ResultSet resultSet) throws SQLException {
Long expirationTimestamp = resultSet.getLong(statements.getExpirationTimestampColumn());
if (resultSet.wasNull()) {
expirationTimestamp = null;
}
return EndpointDataReferenceEntry.Builder.newInstance()
.transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn()))
.assetId(resultSet.getString(statements.getAssetIdColumn()))
.agreementId(resultSet.getString(statements.getAgreementIdColumn()))
.providerId(resultSet.getString(statements.getProviderIdColumn()))
.state(resultSet.getInt(statements.getStateColumn()))
.stateTimestamp(resultSet.getLong(statements.getStateTimestampColumn()))
.stateCount(resultSet.getInt(statements.getStateCountColumn()))
.createdAt(resultSet.getLong(statements.getCreatedAtColumn()))
.updatedAt(resultSet.getLong(statements.getUpdatedAtColumn()))
.errorDetail(resultSet.getString(statements.getErrorDetailColumn()))
.expirationTimestamp(expirationTimestamp)
.build();
}

@@ -158,14 +260,12 @@ private EndpointDataReference referenceFromEntry(String edrId) {
return null;
}

private QuerySpec queryFor(String field, String value) {
var filter = Criterion.Builder.newInstance()
private Criterion filterFor(String field, Object value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
.build();

return QuerySpec.Builder.newInstance().filter(filter).build();
}

private static class EndpointDataReferenceEntryWrapper {
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
@@ -51,6 +52,9 @@ public class SqlEndpointDataReferenceCacheExtension implements ServiceExtension
@Inject
private Vault vault;

@Inject
private QueryExecutor queryExecutor;

@Override
public String name() {
return NAME;
@@ -59,7 +63,7 @@ public String name() {
@Provider
public EndpointDataReferenceCache edrCache(ServiceExtensionContext context) {
var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DEFAULT_DATASOURCE_NAME);
return new SqlEndpointDataReferenceCache(dataSourceRegistry, dataSourceName, transactionContext, getStatementImpl(), typeManager.getMapper(), vault, clock);
return new SqlEndpointDataReferenceCache(dataSourceRegistry, dataSourceName, transactionContext, getStatementImpl(), typeManager.getMapper(), vault, clock, queryExecutor, context.getConnectorId());
}

private EdrStatements getStatementImpl() {
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@ public String getFindByTransferProcessIdTemplate() {
return format("SELECT * FROM %s WHERE %s = ?", getEdrTable(), getTransferProcessIdColumn());
}


@Override
public SqlQueryStatement createQuery(QuerySpec querySpec) {
var select = format("SELECT * FROM %s", getEdrTable());
@@ -35,22 +34,56 @@ public SqlQueryStatement createQuery(QuerySpec querySpec) {

@Override
public String getInsertTemplate() {
return format("INSERT INTO %s (%s, %s, %s, %s,%s, %s, %s) VALUES (?, ?, ?, ?, ?, ?, ?)",
return format("INSERT INTO %s (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
getEdrTable(),
getTransferProcessIdColumn(),
getAssetIdColumn(),
getAgreementIdColumn(),
getEdrId(),
getProviderIdColumn(),
getExpirationTimestampColumn(),
getStateColumn(),
getStateCountColumn(),
getStateTimestampColumn(),
getErrorDetailColumn(),
getCreatedAtColumn(),
getUpdatedAtColumn()
);
}

@Override
public String getUpdateTemplate() {
return format("UPDATE %s SET %s=?, %s=?, %s=?, %s=?, %s=? WHERE %s = ?;",
getEdrTable(), getStateColumn(), getStateCountColumn(), getStateTimestampColumn(),
getErrorDetailColumn(), getUpdatedAtColumn(), getTransferProcessIdColumn());
}

@Override
public String getDeleteByIdTemplate() {
return format("DELETE FROM %s WHERE %s = ?",
getEdrTable(),
getTransferProcessIdColumn());
}

@Override
public String getDeleteLeaseTemplate() {
return format("DELETE FROM %s WHERE %s=?", getLeaseTableName(), getLeaseIdColumn());
}

@Override
public String getInsertLeaseTemplate() {
return format("INSERT INTO %s (%s, %s, %s, %s) VALUES (?, ?, ?, ?);",
getLeaseTableName(), getLeaseIdColumn(), getLeasedByColumn(), getLeasedAtColumn(), getLeaseDurationColumn());
}

@Override
public String getUpdateLeaseTemplate() {
return format("UPDATE %s SET %s=? WHERE %s = ?;", getEdrTable(), getLeaseIdColumn(), getTransferProcessIdColumn());
}

@Override
public String getFindLeaseByEntityTemplate() {
return format("SELECT * FROM %s WHERE %s = (SELECT lease_id FROM %s WHERE %s=? )",
getLeaseTableName(), getLeaseIdColumn(), getEdrTable(), getTransferProcessIdColumn());
}
}
Original file line number Diff line number Diff line change
@@ -26,5 +26,6 @@ public EdrMapping(EdrStatements statements) {
add("assetId", statements.getAssetIdColumn());
add("agreementId", statements.getAgreementIdColumn());
add("providerId", statements.getProviderIdColumn());
add("state", statements.getStateColumn());
}
}
Original file line number Diff line number Diff line change
@@ -15,12 +15,13 @@
package org.eclipse.tractusx.edc.edr.store.sql.schema;

import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.lease.LeaseStatements;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

/**
* Sql Statements for DataPlane Store
*/
public interface EdrStatements {
public interface EdrStatements extends LeaseStatements {

default String getEdrTable() {
return "edc_edr_cache";
@@ -54,13 +55,34 @@ default String getUpdatedAtColumn() {
return "updated_at";
}

default String getStateColumn() {
return "state";
}

default String getExpirationTimestampColumn() {
return "expiration_timestamp";
}

default String getStateCountColumn() {
return "state_count";
}

default String getStateTimestampColumn() {
return "state_timestamp";
}

default String getErrorDetailColumn() {
return "error_detail";
}

String getFindByTransferProcessIdTemplate();

SqlQueryStatement createQuery(QuerySpec querySpec);

String getInsertTemplate();

String getUpdateTemplate();

String getDeleteByIdTemplate();

}
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
package org.eclipse.tractusx.edc.edr.store.sql;

import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.SqlQueryExecutor;
import org.eclipse.edc.sql.testfixtures.PostgresqlLocalInstance;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.local.LocalDataSourceRegistry;
@@ -29,6 +31,7 @@
import org.junit.jupiter.api.extension.ParameterResolver;

import java.sql.Connection;
import java.util.List;
import java.util.UUID;
import javax.sql.DataSource;

@@ -46,14 +49,17 @@
public class PostgresqlTransactionalStoreSetupExtension implements BeforeEachCallback, AfterEachCallback, BeforeAllCallback, ParameterResolver {

private final String datasourceName;
private final QueryExecutor queryExecutor;
private final Monitor monitor = mock();
private DataSourceRegistry dataSourceRegistry = null;
private DataSource dataSource = null;
private Connection connection = null;
private LocalTransactionContext transactionContext = null;
private Monitor monitor = mock(Monitor.class);


public PostgresqlTransactionalStoreSetupExtension(String datasourceName) {
this.datasourceName = datasourceName;
this.queryExecutor = new SqlQueryExecutor();
}

public PostgresqlTransactionalStoreSetupExtension() {
@@ -111,7 +117,7 @@ public void beforeAll(ExtensionContext context) throws Exception {
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
return type.equals(PostgresqlTransactionalStoreSetupExtension.class);
return List.of(PostgresqlTransactionalStoreSetupExtension.class, QueryExecutor.class).contains(type);
}

@Override
@@ -120,6 +126,8 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
var type = parameterContext.getParameter().getParameterizedType();
if (type.equals(PostgresqlTransactionalStoreSetupExtension.class)) {
return this;
} else if (type.equals(QueryExecutor.class)) {
return queryExecutor;
}
return null;
}
Original file line number Diff line number Diff line change
@@ -18,8 +18,10 @@
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.lease.testfixtures.LeaseUtil;
import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheTestBase;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements;
import org.eclipse.tractusx.edc.edr.store.sql.schema.postgres.PostgresEdrStatements;
@@ -33,6 +35,7 @@
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Clock;
import java.time.Duration;

import static java.util.UUID.randomUUID;
import static org.eclipse.tractusx.edc.edr.spi.TestFunctions.edr;
@@ -47,7 +50,7 @@

@PostgresqlDbIntegrationTest
@ExtendWith(PostgresqlStoreSetupExtension.class)
public class SqlEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
public class SqlEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheTestBase {

EdrStatements statements = new PostgresEdrStatements();
SqlEndpointDataReferenceCache cache;
@@ -58,17 +61,19 @@ public class SqlEndpointDataReferenceCacheTest extends EndpointDataReferenceCach

TypeManager typeManager = new TypeManager();

LeaseUtil leaseUtil;

@BeforeEach
void setUp(PostgresqlStoreSetupExtension extension) throws IOException {
void setUp(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException {

when(vault.deleteSecret(any())).thenReturn(Result.success());
when(vault.storeSecret(any(), any())).thenReturn(Result.success());
when(vault.resolveSecret(any())).then(a -> edrJson(a.getArgument(0)));

cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock);
cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock, queryExecutor, CONNECTOR_NAME);
var schema = Files.readString(Paths.get("./docs/schema.sql"));
extension.runQuery(schema);
leaseUtil = new LeaseUtil(extension.getTransactionContext(), extension::getConnection, statements, clock);

}

@@ -96,6 +101,16 @@ protected EndpointDataReferenceCache getStore() {
return cache;
}

@Override
protected void lockEntity(String negotiationId, String owner, Duration duration) {
leaseUtil.leaseEntity(negotiationId, owner, duration);
}

@Override
protected boolean isLockedBy(String negotiationId, String owner) {
return leaseUtil.isLeased(negotiationId, owner);
}


private String edrJson(String id) {
return typeManager.writeValueAsString(edr(id.split(SEPARATOR)[1]));
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements;
import org.eclipse.tractusx.edc.edr.store.sql.schema.postgres.PostgresEdrStatements;
import org.junit.jupiter.api.AfterEach;
@@ -38,6 +39,7 @@
import static java.util.UUID.randomUUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheTestBase.CONNECTOR_NAME;
import static org.eclipse.tractusx.edc.edr.spi.TestFunctions.edr;
import static org.eclipse.tractusx.edc.edr.spi.TestFunctions.edrEntry;
import static org.eclipse.tractusx.edc.edr.store.sql.SqlEndpointDataReferenceCache.VAULT_PREFIX;
@@ -62,12 +64,12 @@ public class SqlEndpointDataReferenceCacheTransactionalTest {
TypeManager typeManager = new TypeManager();

@BeforeEach
void setUp(PostgresqlTransactionalStoreSetupExtension extension) throws IOException {
void setUp(PostgresqlTransactionalStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException {

when(vault.deleteSecret(any())).thenReturn(Result.success());
when(vault.storeSecret(any(), any())).thenReturn(Result.success());

cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock);
cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock, queryExecutor, CONNECTOR_NAME);
var schema = Files.readString(Paths.get("./docs/schema.sql"));
extension.runQuery(schema);

3 changes: 3 additions & 0 deletions edc-extensions/edr/edr-callback/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -20,9 +20,12 @@ plugins {
dependencies {
implementation(project(":spi:callback-spi"))
implementation(project(":spi:edr-spi"))
implementation(project(":spi:core-spi"))

implementation(libs.edc.spi.core)
implementation(libs.edc.spi.transfer)
implementation(libs.edc.spi.controlplane)
implementation(libs.nimbus.jwt)

testImplementation(libs.edc.junit)
}
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ public String name() {
public void initialize(ServiceExtensionContext context) {

callbackRegistry.registerHandler(new ContractNegotiationCallback(transferProcessService, monitor));
callbackRegistry.registerHandler(new TransferProcessLocalCallback(edrCache, transferProcessStore, transformerRegistry, transactionContext));
callbackRegistry.registerHandler(new TransferProcessLocalCallback(edrCache, transferProcessStore, transformerRegistry, transactionContext, monitor));

resolverRegistry.registerResolver(this::resolveProtocol);
registry.register(new InProcessCallbackMessageDispatcher(callbackRegistry));
Original file line number Diff line number Diff line change
@@ -14,18 +14,26 @@

package org.eclipse.tractusx.edc.callback;

import com.nimbusds.jwt.SignedJWT;
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates;
import org.eclipse.tractusx.edc.spi.callback.InProcessCallback;

import java.text.ParseException;
import java.time.ZoneOffset;

import static java.lang.String.format;

public class TransferProcessLocalCallback implements InProcessCallback {
@@ -36,11 +44,14 @@ public class TransferProcessLocalCallback implements InProcessCallback {

private final TransactionContext transactionContext;

public TransferProcessLocalCallback(EndpointDataReferenceCache edrCache, TransferProcessStore transferProcessStore, TypeTransformerRegistry transformerRegistry, TransactionContext transactionContext) {
private final Monitor monitor;

public TransferProcessLocalCallback(EndpointDataReferenceCache edrCache, TransferProcessStore transferProcessStore, TypeTransformerRegistry transformerRegistry, TransactionContext transactionContext, Monitor monitor) {
this.edrCache = edrCache;
this.transferProcessStore = transferProcessStore;
this.transformerRegistry = transformerRegistry;
this.transactionContext = transactionContext;
this.monitor = monitor;
}

@Override
@@ -59,19 +70,67 @@ private Result<Void> storeEdr(EndpointDataReference edr) {
return transactionContext.execute(() -> {
var transferProcess = transferProcessStore.findForCorrelationId(edr.getId());
if (transferProcess != null) {
var expirationTime = extractExpirationTime(edr);

if (expirationTime.failed()) {
return expirationTime.mapTo();
}
var cacheEntry = EndpointDataReferenceEntry.Builder.newInstance()
.transferProcessId(transferProcess.getId())
.assetId(transferProcess.getDataRequest().getAssetId())
.agreementId(transferProcess.getDataRequest().getContractId())
.providerId(transferProcess.getDataRequest().getConnectorId())
.state(EndpointDataReferenceEntryStates.NEGOTIATED.code())
.expirationTimestamp(expirationTime.getContent())
.build();

cleanOldEdr(transferProcess.getDataRequest().getAssetId(), transferProcess.getDataRequest().getContractId());
edrCache.save(cacheEntry, edr);

return Result.success();
} else {
return Result.failure(format("Failed to find a transfer process with correlation ID %s", edr.getId()));
}
});

}

private void cleanOldEdr(String assetId, String agreementId) {
var querySpec = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
.filter(fieldFilter("assetId", assetId))
.build();

edrCache.queryForEntries(querySpec).forEach((entry -> {
monitor.debug(format("Expiring EDR for transfer process %s", entry.getTransferProcessId()));
entry.transitionToExpired();
edrCache.update(entry);
}));
}

private Result<Long> extractExpirationTime(EndpointDataReference edr) {
try {
if (edr.getAuthCode() != null) {
var jwt = SignedJWT.parse(edr.getAuthCode());
var expirationTime = jwt.getJWTClaimsSet().getExpirationTime();
if (expirationTime != null) {
return Result.success(expirationTime
.toInstant()
.atOffset(ZoneOffset.UTC)
.toInstant().toEpochMilli());
}
}
} catch (ParseException e) {
return Result.failure(format("Failed to parts JWT token for edr %s", edr.getId()));
}
return Result.success(0L);
}

private Criterion fieldFilter(String field, String value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -14,6 +14,14 @@

package org.eclipse.tractusx.edc.callback;

import com.nimbusds.jose.JOSEException;
import com.nimbusds.jose.JWSAlgorithm;
import com.nimbusds.jose.JWSHeader;
import com.nimbusds.jose.crypto.RSASSASigner;
import com.nimbusds.jose.jwk.KeyUse;
import com.nimbusds.jose.jwk.gen.RSAKeyGenerator;
import com.nimbusds.jwt.JWTClaimsSet;
import com.nimbusds.jwt.SignedJWT;
import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationFinalized;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
@@ -25,6 +33,8 @@
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;

import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -73,7 +83,7 @@ public static TransferProcessStarted getTransferProcessStartedEvent(DataAddress
public static EndpointDataReference getEdr() {
return EndpointDataReference.Builder.newInstance()
.id("dataRequestId")
.authCode("authCode")
.authCode(createToken())
.authKey("authKey")
.endpoint("http://endpoint")
.build();
@@ -93,4 +103,22 @@ public static <T extends Event> CallbackEventRemoteMessage<T> remoteMessage(T ev
.build();
return new CallbackEventRemoteMessage<T>(callback, envelope, "local");
}

private static String createToken() {
try {
var key = new RSAKeyGenerator(2048)
.keyUse(KeyUse.SIGNATURE)
.keyID(UUID.randomUUID().toString())
.generate();

var claims = new JWTClaimsSet.Builder().expirationTime(new Date(Instant.now().toEpochMilli())).build();
var header = new JWSHeader.Builder(JWSAlgorithm.RS256).keyID(UUID.randomUUID().toString()).build();

var jwt = new SignedJWT(header, claims);
jwt.sign(new RSASSASigner(key.toPrivateKey()));
return jwt.serialize();
} catch (JOSEException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
@@ -31,6 +32,7 @@
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -51,6 +53,7 @@
import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferProcessStartedEvent;
import static org.eclipse.tractusx.edc.callback.TestFunctions.remoteMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -72,7 +75,7 @@ public class TransferProcessLocalCallbackTest {

@BeforeEach
void setup() {
callback = new TransferProcessLocalCallback(edrCache, transferProcessStore, transformerRegistry, transactionContext);
callback = new TransferProcessLocalCallback(edrCache, transferProcessStore, transformerRegistry, transactionContext, mock(Monitor.class));
}

@Test
@@ -97,9 +100,15 @@ void invoke_shouldStoreTheEdrInCache_whenDataAddressIsPresent() {
.dataRequest(dataRequest)
.build();

var edrEntry = EndpointDataReferenceEntry.Builder.newInstance()
.agreementId(contractId)
.transferProcessId(transferProcessId)
.assetId(assetId).build();

when(transformerRegistry.transform(any(DataAddress.class), eq(EndpointDataReference.class))).thenReturn(Result.success(edr));
when(transferProcessStore.findForCorrelationId(edr.getId())).thenReturn(transferProcess);
when(transferProcessStore.findById(transferProcessId)).thenReturn(transferProcess);
when(edrCache.queryForEntries(any())).thenReturn(Stream.of(edrEntry));


var event = getTransferProcessStartedEvent(DataAddress.Builder.newInstance().type(EDR_SIMPLE_TYPE).build());
@@ -112,7 +121,8 @@ void invoke_shouldStoreTheEdrInCache_whenDataAddressIsPresent() {
assertThat(result.succeeded()).isTrue();

verify(edrCache).save(cacheEntryCaptor.capture(), edrCaptor.capture());

verify(edrCache).update(argThat(entry -> entry.getState() == EndpointDataReferenceEntryStates.EXPIRED.code()));

assertThat(edrCaptor.getValue()).usingRecursiveComparison().isEqualTo(edr);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
--
-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--

-- Statements are designed for and tested with Postgres only!


-- add column
ALTER TABLE edc_edr_cache ADD COLUMN expiration_timestamp BIGINT;
ALTER TABLE edc_edr_cache ADD COLUMN state INTEGER DEFAULT 50 NOT NULL;
ALTER TABLE edc_edr_cache ADD COLUMN state_count INTEGER DEFAULT 0;
ALTER TABLE edc_edr_cache ADD COLUMN state_timestamp BIGINT;
ALTER TABLE edc_edr_cache ADD COLUMN error_detail VARCHAR;
ALTER TABLE edc_edr_cache ADD COLUMN lease_id VARCHAR CONSTRAINT edc_edr_cache_lease_lease_id_fk REFERENCES edc_lease ON DELETE SET NULL;

5 changes: 4 additions & 1 deletion edc-tests/e2e-tests/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -38,13 +38,16 @@ dependencies {
testImplementation(libs.edc.ext.jsonld)
testImplementation(libs.edc.dsp)
testImplementation(testFixtures(libs.edc.sql.core))

testImplementation(libs.awaitility)

testCompileOnly(project(":edc-tests:runtime:extensions"))
testCompileOnly(project(":edc-tests:runtime:runtime-memory"))
testCompileOnly(project(":edc-tests:runtime:runtime-memory-ssi"))
testCompileOnly(project(":edc-tests:runtime:runtime-postgresql"))
testImplementation(libs.edc.auth.oauth2.client)
testImplementation(libs.testcontainers.junit)
testImplementation(libs.testcontainers.postgres)

}

// do not publish
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.helpers;

import org.postgresql.ds.PGSimpleDataSource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import javax.sql.DataSource;

import static java.lang.String.format;

@Deprecated(forRemoval = true)
public final class TxPostgresqlLocalInstance {
private final String password;
private final String jdbcUrlPrefix;
private final String username;
private final String databaseName;

public TxPostgresqlLocalInstance(String user, String password, String jdbcUrlPrefix, String db) {
username = user;
this.password = password;
this.jdbcUrlPrefix = jdbcUrlPrefix;
databaseName = db;
}

public void createDatabase() {
createDatabase(databaseName);
}

public void createDatabase(String name) {
try (var connection = DriverManager.getConnection(jdbcUrlPrefix + username, username, password)) {
connection.createStatement().execute(format("create database %s;", name));
} catch (SQLException e) {
e.printStackTrace();
// database could already exist
}
}

public Connection getTestConnection(String hostName, int port, String dbName) {
try {
return createTestDataSource(hostName, port, dbName).getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public Connection getConnection() {
try {
return DriverManager.getConnection(jdbcUrlPrefix, username, password);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

public String getJdbcUrlPrefix() {
return jdbcUrlPrefix;
}

private DataSource createTestDataSource(String hostName, int port, String dbName) {
var dataSource = new PGSimpleDataSource();
dataSource.setServerNames(new String[]{ hostName });
dataSource.setPortNumbers(new int[]{ port });
dataSource.setUser(username);
dataSource.setPassword(password);
dataSource.setDatabaseName(dbName);
return dataSource;
}
}
Original file line number Diff line number Diff line change
@@ -59,6 +59,12 @@ public void clearAssetIndex() {

public void clearEdrCache() {
var edrCache = context.getService(EndpointDataReferenceCache.class);
edrCache.queryForEntries(QuerySpec.max()).forEach(entry -> edrCache.deleteByTransferProcessId(entry.getTransferProcessId()));
edrCache.queryForEntries(QuerySpec.max()).forEach(entry -> {
try {
edrCache.deleteByTransferProcessId(entry.getTransferProcessId());
} catch (Exception e) {
context.getMonitor().warning("Failed to clean up the cache", e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.response.Response;
import io.restassured.response.ValidatableResponse;
import io.restassured.specification.RequestSpecification;
import jakarta.json.Json;
import jakarta.json.JsonArray;
@@ -198,16 +199,20 @@ public String getContractNegotiationError(String negotiationId) {
}

public JsonObject getEdr(String transferProcessId) {
return baseRequest()
.when()
.get("/edrs/{id}", transferProcessId)
.then()
return getEdrRequest(transferProcessId)
.statusCode(200)
.extract()
.body()
.as(JsonObject.class);
}

public ValidatableResponse getEdrRequest(String transferProcessId) {
return baseRequest()
.when()
.get("/edrs/{id}", transferProcessId)
.then();
}

public JsonArray getEdrEntriesByAssetId(String assetId) {
return baseRequest()
.when()
Original file line number Diff line number Diff line change
@@ -22,17 +22,33 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.InjectionContainer;
import org.eclipse.edc.sql.testfixtures.PostgresqlLocalInstance;
import org.eclipse.tractusx.edc.helpers.TxPostgresqlLocalInstance;
import org.eclipse.tractusx.edc.token.MockDapsService;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.PostgreSQLContainer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.lang.String.format;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.DB_SCHEMA_NAME;
import static org.mockito.Mockito.mock;

public class PgParticipantRuntime extends ParticipantRuntime {

private static final String POSTGRES_IMAGE_NAME = "postgres:14.2";
private static final String USER = "postgres";
private static final String PASSWORD = "password";


private final String dbName;
public PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(POSTGRES_IMAGE_NAME)
.withExposedPorts(5432)
.withUsername(USER)
.withPassword(PASSWORD)
.withDatabaseName("itest");
private TxPostgresqlLocalInstance helper;

public PgParticipantRuntime(String moduleName, String runtimeName, String bpn, Map<String, String> properties) {
super(moduleName, runtimeName, bpn, properties);
@@ -41,12 +57,69 @@ public PgParticipantRuntime(String moduleName, String runtimeName, String bpn, M
this.registerServiceMock(Vault.class, new InMemoryVaultOverride(mock(Monitor.class)));
}

@Override
public void beforeAll(ExtensionContext context) throws Exception {
postgreSqlContainer.start();
var config = postgresqlConfiguration(dbName);
config.forEach(System::setProperty);
super.beforeAll(context);
}

@Override
public void afterAll(ExtensionContext context) throws Exception {
super.afterAll(context);
postgreSqlContainer.stop();
postgreSqlContainer.close();
}

@Override
protected void bootExtensions(ServiceExtensionContext context, List<InjectionContainer<ServiceExtension>> serviceExtensions) {
PostgresqlLocalInstance.createDatabase(dbName);
helper = new TxPostgresqlLocalInstance(postgreSqlContainer.getUsername(), postgreSqlContainer.getPassword(), baseJdbcUrl(), postgreSqlContainer.getDatabaseName());
helper.createDatabase(dbName);
super.bootExtensions(context, serviceExtensions);
}

public Map<String, String> postgresqlConfiguration(String name) {
var jdbcUrl = jdbcUrl(name);
return new HashMap<>() {
{
put("edc.datasource.asset.name", "asset");
put("edc.datasource.asset.url", jdbcUrl);
put("edc.datasource.asset.user", USER);
put("edc.datasource.asset.password", PASSWORD);
put("edc.datasource.contractdefinition.name", "contractdefinition");
put("edc.datasource.contractdefinition.url", jdbcUrl);
put("edc.datasource.contractdefinition.user", USER);
put("edc.datasource.contractdefinition.password", PASSWORD);
put("edc.datasource.contractnegotiation.name", "contractnegotiation");
put("edc.datasource.contractnegotiation.url", jdbcUrl);
put("edc.datasource.contractnegotiation.user", USER);
put("edc.datasource.contractnegotiation.password", PASSWORD);
put("edc.datasource.policy.name", "policy");
put("edc.datasource.policy.url", jdbcUrl);
put("edc.datasource.policy.user", USER);
put("edc.datasource.policy.password", PASSWORD);
put("edc.datasource.transferprocess.name", "transferprocess");
put("edc.datasource.transferprocess.url", jdbcUrl);
put("edc.datasource.transferprocess.user", USER);
put("edc.datasource.transferprocess.password", PASSWORD);
put("edc.datasource.edr.name", "edr");
put("edc.datasource.edr.url", jdbcUrl);
put("edc.datasource.edr.user", USER);
put("edc.datasource.edr.password", PASSWORD);
// use non-default schema name to test usage of non-default schema
put("org.eclipse.tractusx.edc.postgresql.migration.schema", DB_SCHEMA_NAME);
}
};
}

public String jdbcUrl(String name) {
return baseJdbcUrl() + name + "?currentSchema=" + DB_SCHEMA_NAME;
}

public String baseJdbcUrl() {
return format("jdbc:postgresql://%s:%s/", postgreSqlContainer.getHost(), postgreSqlContainer.getFirstMappedPort());
}

private static class InMemoryVaultOverride extends InMemoryVault {

Original file line number Diff line number Diff line change
@@ -66,54 +66,6 @@ public class TestRuntimeConfiguration {

static final String OAUTH_TOKEN_URL = "http://localhost:" + OAUTH_PORT;

public static Map<String, String> sokratesPostgresqlConfiguration() {
var baseConfiguration = sokratesConfiguration();
var postgresConfiguration = postgresqlConfiguration(SOKRATES_NAME.toLowerCase());
baseConfiguration.putAll(postgresConfiguration);
return baseConfiguration;
}

public static Map<String, String> platoPostgresqlConfiguration() {
var baseConfiguration = platoConfiguration();
var postgresConfiguration = postgresqlConfiguration(PLATO_NAME.toLowerCase());
baseConfiguration.putAll(postgresConfiguration);
return baseConfiguration;
}

public static Map<String, String> postgresqlConfiguration(String name) {
var jdbcUrl = jdbcUrl(name);
return new HashMap<>() {
{
put("edc.datasource.asset.name", "asset");
put("edc.datasource.asset.url", jdbcUrl);
put("edc.datasource.asset.user", PostgresqlLocalInstance.USER);
put("edc.datasource.asset.password", PostgresqlLocalInstance.PASSWORD);
put("edc.datasource.contractdefinition.name", "contractdefinition");
put("edc.datasource.contractdefinition.url", jdbcUrl);
put("edc.datasource.contractdefinition.user", PostgresqlLocalInstance.USER);
put("edc.datasource.contractdefinition.password", PostgresqlLocalInstance.PASSWORD);
put("edc.datasource.contractnegotiation.name", "contractnegotiation");
put("edc.datasource.contractnegotiation.url", jdbcUrl);
put("edc.datasource.contractnegotiation.user", PostgresqlLocalInstance.USER);
put("edc.datasource.contractnegotiation.password", PostgresqlLocalInstance.PASSWORD);
put("edc.datasource.policy.name", "policy");
put("edc.datasource.policy.url", jdbcUrl);
put("edc.datasource.policy.user", PostgresqlLocalInstance.USER);
put("edc.datasource.policy.password", PostgresqlLocalInstance.PASSWORD);
put("edc.datasource.transferprocess.name", "transferprocess");
put("edc.datasource.transferprocess.url", jdbcUrl);
put("edc.datasource.transferprocess.user", PostgresqlLocalInstance.USER);
put("edc.datasource.transferprocess.password", PostgresqlLocalInstance.PASSWORD);
put("edc.datasource.edr.name", "edr");
put("edc.datasource.edr.url", jdbcUrl);
put("edc.datasource.edr.user", PostgresqlLocalInstance.USER);
put("edc.datasource.edr.password", PostgresqlLocalInstance.PASSWORD);
// use non-default schema name to test usage of non-default schema
put("org.eclipse.tractusx.edc.postgresql.migration.schema", DB_SCHEMA_NAME);
}
};
}

public static Map<String, String> sokratesSsiConfiguration() {
var ssiConfiguration = new HashMap<String, String>() {
{
Original file line number Diff line number Diff line change
@@ -22,8 +22,8 @@
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoPostgresqlConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesPostgresqlConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

@PostgresqlDbIntegrationTest
public class CatalogPostgresqlTest extends AbstractCatalogTest {
@@ -33,13 +33,13 @@ public class CatalogPostgresqlTest extends AbstractCatalogTest {
":edc-tests:runtime:runtime-postgresql",
SOKRATES_NAME,
SOKRATES_BPN,
sokratesPostgresqlConfiguration()
sokratesConfiguration()
);
@RegisterExtension
protected static final PgParticipantRuntime PLATO_RUNTIME = new PgParticipantRuntime(
":edc-tests:runtime:runtime-postgresql",
PLATO_NAME,
PLATO_BPN,
platoPostgresqlConfiguration()
platoConfiguration()
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.tests.edr;

import jakarta.json.Json;
import okhttp3.mockwebserver.MockWebServer;
import org.assertj.core.api.Condition;
import org.eclipse.tractusx.edc.lifecycle.Participant;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.helpers.PolicyHelperFunctions.businessPartnerNumberPolicy;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

public abstract class AbstractDeleteEdrTest {

protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());
private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
MockWebServer server;

@BeforeEach
void setup() {
server = new MockWebServer();
}

@Test
@DisplayName("Verify that expired EDR are deleted")
void negotiateEdr_shouldRemoveExpiredEdrs() throws IOException {

var assetId = UUID.randomUUID().toString();

var authCodeHeaderName = "test-authkey";
var authCode = "test-authcode";
PLATO.createAsset(assetId, Json.createObjectBuilder().build(), Json.createObjectBuilder()
.add(EDC_NAMESPACE + "type", "HttpData")
.add(EDC_NAMESPACE + "contentType", "application/json")
.add(EDC_NAMESPACE + "baseUrl", "http://test:8080")
.add(EDC_NAMESPACE + "authKey", authCodeHeaderName)
.add(EDC_NAMESPACE + "authCode", authCode)
.build());

PLATO.createPolicy(businessPartnerNumberPolicy("policy-1", SOKRATES.getBpn()));
PLATO.createPolicy(businessPartnerNumberPolicy("policy-2", SOKRATES.getBpn()));
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2");

var callbacks = Json.createArrayBuilder()
.build();

SOKRATES.negotiateEdr(PLATO, assetId, callbacks);

var expired = new ArrayList<String>();

await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.add(localExpired.get(0));
});

await().atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> expired.forEach((id) -> SOKRATES.getEdrRequest(id).statusCode(404)));

}

@AfterEach
void teardown() throws IOException {
server.shutdown();
}


private Condition<String> stateCondition(String value, String description) {
return new Condition<>(m -> m.equals(value), description);
}

}
Loading