Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade to EDC 0.2.0 #674

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 119 additions & 146 deletions DEPENDENCIES

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -86,13 +87,25 @@ public InMemoryEndpointDataReferenceCache(String lockId, Clock clock, Map<String
}

@Override
public @Nullable EndpointDataReferenceEntry findByTransferProcessId(String transferProcessId) {
public @Nullable StoreResult<EndpointDataReferenceEntry> findByIdAndLease(String transferProcessId) {
return lockManager.readLock(() -> {
var edr = edrsByTransferProcessId.get(transferProcessId);
return entriesByEdrId.get(edr.getId());
var edrEntry = entriesByEdrId.get(edr.getId());
return edrEntry == null ? StoreResult.notFound(format("EndpointDataReferenceEntry %s not found", transferProcessId)) :
StoreResult.success(edrEntry);
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> findByCorrelationIdAndLease(String correlationId) {
return findByIdAndLease(correlationId);
}

@Override
public void save(EndpointDataReferenceEntry entity) {
throw new UnsupportedOperationException("Please use save(EndpointDataReferenceEntry, EndpointDataReference) instead!");
}

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
Expand Down Expand Up @@ -48,7 +46,6 @@
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -152,7 +149,7 @@ private void update(EndpointDataReferenceEntry edrEntry) {


private StateProcessorImpl<EndpointDataReferenceEntry> processEdrInState(EndpointDataReferenceEntryStates state, Function<EndpointDataReferenceEntry, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()) };
var filter = new Criterion[] {hasState(state.code())};
return new StateProcessorImpl<>(() -> edrCache.nextNotLeased(batchSize, filter), telemetry.contextPropagationMiddleware(function));
}

Expand All @@ -169,15 +166,11 @@ private StateProcessorImpl<EndpointDataReferenceEntry> processDeletingEdr(Functi
private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

var requestData = ContractRequestData.Builder.newInstance()
return ContractRequest.Builder.newInstance()
.counterPartyAddress(request.getConnectorAddress())
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.providerId(request.getConnectorId())
.callbackAddresses(callbacks).build();
}

Expand Down Expand Up @@ -244,21 +237,13 @@ private StatusResult<Void> fireTransferProcess(EndpointDataReferenceEntry entry)
}
var dataRequest = transferProcess.getDataRequest();

var newDataRequest = DataRequest.Builder.newInstance()
.id(UUID.randomUUID().toString())
var transferRequest = TransferRequest.Builder.newInstance()
.assetId(dataRequest.getAssetId())
.connectorId(dataRequest.getConnectorId())
.contractId(dataRequest.getContractId())
.protocol(dataRequest.getProtocol())
.connectorAddress(dataRequest.getConnectorAddress())
.dataDestination(dataRequest.getDataDestination())
.destinationType(dataRequest.getDestinationType())
.processId(dataRequest.getProcessId())
.managedResources(dataRequest.isManagedResources())
.build();

var transferRequest = TransferRequest.Builder.newInstance()
.dataRequest(newDataRequest)
.callbackAddresses(transferProcess.getCallbackAddresses())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public static NegotiateEdrRequest getNegotiateEdrRequest() {
.id("id")
.assetId("assetId")
.policy(Policy.Builder.newInstance().build())
.providerId("provider")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ void initEdrNegotiation() {

assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().containsAll(negotiateEdrRequest.getCallbackAddresses());
assertThat(msg.getCallbackAddresses()).usingRecursiveFieldByFieldElementComparator().contains(LOCAL_CALLBACK);
assertThat(msg.getRequestData().getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getRequestData().getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getRequestData().getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());
assertThat(msg.getContractOffer()).usingRecursiveComparison().isEqualTo(negotiateEdrRequest.getOffer());
assertThat(msg.getProtocol()).isEqualTo(negotiateEdrRequest.getProtocol());
assertThat(msg.getCounterPartyAddress()).isEqualTo(negotiateEdrRequest.getConnectorAddress());

}

Expand All @@ -118,7 +118,7 @@ void initial_shouldTransitionRequesting() {
var edrEntry = edrEntryBuilder().state(NEGOTIATED.code()).build();
var transferProcess = createTransferProcessBuilder().build();
when(edrCache.nextNotLeased(anyInt(), stateIs(NEGOTIATED.code()))).thenReturn(List.of(edrEntry)).thenReturn(emptyList());
when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

Expand All @@ -138,7 +138,7 @@ void initial_shouldNotTransitionToRefreshing_WhenNotExpired() {
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any())).thenReturn(ServiceResult.success(transferProcess));

Expand All @@ -159,7 +159,7 @@ void initial_shouldTransitionError_whenTransferProcessNotFound() {
.thenReturn(List.of(edrEntry))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(null);

edrManager.start();
Expand All @@ -179,7 +179,7 @@ void initial_shouldNotTransitionError_whenInitiatedTransferFailsOnce() {
.thenReturn(List.of(edrEntry.copy()))
.thenReturn(emptyList());

when(edrCache.findByTransferProcessId(edrEntry.getTransferProcessId())).thenReturn(edrEntry);
when(edrCache.findByCorrelationIdAndLease(edrEntry.getTransferProcessId())).thenReturn(StoreResult.success(edrEntry));
when(transferProcessService.findById(edrEntry.getTransferProcessId())).thenReturn(transferProcess);
when(transferProcessService.initiateTransfer(any()))
.thenReturn(ServiceResult.badRequest("bad"))
Expand Down Expand Up @@ -221,7 +221,7 @@ void initial_shouldDeleteTheEntry_whenTheRetentionPeriodIsOver() {
.filter(hasState(DELETING.code()))
.limit(DEFAULT_BATCH_SIZE)
.build();

when(edrCache.queryForEntries(query))
.thenReturn(Stream.of(edrEntry))
.thenReturn(Stream.empty());
Expand Down Expand Up @@ -253,7 +253,6 @@ private TransferProcess.Builder createTransferProcessBuilder() {
.processId(processId)
.protocol("protocol")
.connectorAddress("http://an/address")
.managedResources(false)
.build();

return TransferProcess.Builder.newInstance()
Expand All @@ -273,7 +272,7 @@ private DataRequest.Builder createDataRequestBuilder() {
}

private Criterion[] stateIs(int state) {
return aryEq(new Criterion[]{ hasState(state) });
return aryEq(new Criterion[] {hasState(state)});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ private NegotiateEdrRequest getNegotiateEdrRequest() {
.id("id")
.assetId("assetId")
.policy(Policy.Builder.newInstance().build())
.providerId("provider")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.net.URI;

import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.CREDENTIALS_SUMMARY_V_1;
import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.CREDENTIALS_V_1;
import static org.eclipse.tractusx.edc.jsonld.JsonLdExtension.SECURITY_ED25519_V1;
Expand All @@ -46,10 +48,10 @@ void setup(ObjectFactory factory, ServiceExtensionContext context) {
@Test
void initialize(ServiceExtensionContext context) {
extension.initialize(context);
jsonLdService.registerCachedDocument(eq(CREDENTIALS_V_1), any());
jsonLdService.registerCachedDocument(eq(CREDENTIALS_SUMMARY_V_1), any());
jsonLdService.registerCachedDocument(eq(SECURITY_JWS_V1), any());
jsonLdService.registerCachedDocument(eq(SECURITY_ED25519_V1), any());
jsonLdService.registerCachedDocument(eq(CREDENTIALS_V_1), any(URI.class));
jsonLdService.registerCachedDocument(eq(CREDENTIALS_SUMMARY_V_1), any(URI.class));
jsonLdService.registerCachedDocument(eq(SECURITY_JWS_V1), any(URI.class));
jsonLdService.registerCachedDocument(eq(SECURITY_ED25519_V1), any(URI.class));

}
}
1 change: 1 addition & 0 deletions edc-controlplane/edc-controlplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
runtimeOnly(libs.edc.auth.tokenbased)

runtimeOnly(libs.edc.api.management)
runtimeOnly(libs.edc.api.management.config)
runtimeOnly(libs.edc.api.observability)
runtimeOnly(libs.edc.dsp)
runtimeOnly(libs.edc.spi.jwt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ plugins {

dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(libs.edc.core.controlplane)
runtimeOnly(libs.edc.dpf.transfer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies {
runtimeOnly(libs.edc.controlplane.callback.dispatcher.http)

runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ plugins {
dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ plugins {

dependencies {
implementation(project(":edc-dataplane:edc-dataplane-base"))
implementation(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(libs.edc.vault.hashicorp)
runtimeOnly(project(":edc-extensions:edr:edr-cache-sql"))
runtimeOnly(libs.edc.transaction.local)
runtimeOnly(libs.edc.sql.pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OTEL_AGENT_LOCATION "https://github.com/open-telemetry/opentelemetry-java-in

HEALTHCHECK NONE

RUN apk update && apk add curl=8.2.0-r1 --no-cache
RUN apk update && apk add curl=8.2.1-r0 --no-cache
RUN curl -L --proto "=https" -sSf ${OTEL_AGENT_LOCATION} --output /tmp/opentelemetry-javaagent.jar

FROM eclipse-temurin:17.0.6_10-jre-alpine
Expand Down
1 change: 0 additions & 1 deletion edc-extensions/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies {
implementation(project(":edc-extensions:cx-oauth2"))
implementation(project(":edc-extensions:data-encryption"))
implementation(project(":edc-extensions:dataplane-selector-configuration"))
implementation(project(":edc-extensions:hashicorp-vault"))
implementation(project(":edc-extensions:postgresql-migration"))
implementation(project(":edc-extensions:provision-additional-headers"))
implementation(project(":edc-extensions:transferprocess-sftp-client"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.eclipse.tractusx.edc.validation.businesspartner.functions.BusinessPartnerPermissionFunction;
import org.eclipse.tractusx.edc.validation.businesspartner.functions.BusinessPartnerProhibitionFunction;

import static org.eclipse.edc.policy.engine.spi.PolicyEngine.ALL_SCOPES;
import static org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver.CATALOGING_SCOPE;
import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.NEGOTIATION_SCOPE;
import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.TRANSFER_SCOPE;

public class BusinessPartnerValidationExtension implements ServiceExtension {

Expand Down Expand Up @@ -93,15 +95,18 @@ public void initialize(ServiceExtensionContext context) {
final BusinessPartnerProhibitionFunction prohibitionFunction =
new BusinessPartnerProhibitionFunction(monitor, logAgreementEvaluation);

ruleBindingRegistry.bind("USE", ALL_SCOPES);
ruleBindingRegistry.bind(BUSINESS_PARTNER_CONSTRAINT_KEY, ALL_SCOPES);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, TRANSFER_SCOPE);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, NEGOTIATION_SCOPE);
bindToScope(dutyFunction, permissionFunction, prohibitionFunction, CATALOGING_SCOPE);
}

private void bindToScope(BusinessPartnerDutyFunction dutyFunction, BusinessPartnerPermissionFunction permissionFunction, BusinessPartnerProhibitionFunction prohibitionFunction, String scope) {
ruleBindingRegistry.bind("USE", scope);
ruleBindingRegistry.bind(BUSINESS_PARTNER_CONSTRAINT_KEY, scope);

policyEngine.registerFunction(
ALL_SCOPES, Duty.class, BUSINESS_PARTNER_CONSTRAINT_KEY, dutyFunction);
policyEngine.registerFunction(
ALL_SCOPES, Permission.class, BUSINESS_PARTNER_CONSTRAINT_KEY, permissionFunction);
policyEngine.registerFunction(
ALL_SCOPES, Prohibition.class, BUSINESS_PARTNER_CONSTRAINT_KEY, prohibitionFunction);
policyEngine.registerFunction(scope, Duty.class, BUSINESS_PARTNER_CONSTRAINT_KEY, dutyFunction);
policyEngine.registerFunction(scope, Permission.class, BUSINESS_PARTNER_CONSTRAINT_KEY, permissionFunction);
policyEngine.registerFunction(scope, Prohibition.class, BUSINESS_PARTNER_CONSTRAINT_KEY, prohibitionFunction);
}

private Boolean logAgreementEvaluationSetting(ServiceExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected boolean evaluate(
return false;
}

final ParticipantAgent participantAgent = policyContext.getParticipantAgent();
final ParticipantAgent participantAgent = policyContext.getContextData(ParticipantAgent.class);

if (participantAgent == null) {
return false;
Expand Down Expand Up @@ -149,7 +149,7 @@ private boolean isBusinessPartnerNumber(String referringConnectorClaim, Object b
policyContext.reportProblem(message);
return false;
}
if (!(businessPartnerNumber instanceof String)) {
if (!(businessPartnerNumber instanceof String businessPartnerNumberStr)) {
final String message =
format(
FAIL_EVALUATION_BECAUSE_RIGHT_VALUE_NOT_STRING,
Expand All @@ -159,7 +159,6 @@ private boolean isBusinessPartnerNumber(String referringConnectorClaim, Object b
return false;
}

var businessPartnerNumberStr = (String) businessPartnerNumber;
var agreement = policyContext.getContextData(ContractAgreement.class);
var isCorrectBusinessPartner = isCorrectBusinessPartner(referringConnectorClaim, businessPartnerNumberStr);

Expand Down
Loading