Skip to content

Commit

Permalink
Fix activity variable size limit bug
Browse files Browse the repository at this point in the history
The activity json string input variable might be too big
to be stored in DB (4000 characters limit), e.g. the activity
json string contains a variable being resolved at runtime with
a more than 4000 characters string value, when camunda persists
this new activity json string as variable in DB, it fails with
an exception.

This commit fixed this issue by definiting the input activity
string in a map instead of string, the map object is stored in
bytearray table as BLOB type without limit.
  • Loading branch information
yinan-symphony authored and ystxn committed Jun 5, 2023
1 parent a91fc19 commit 60fd1fa
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class CamundaExecutor implements JavaDelegate {

public static final String EXECUTOR = "executor";
public static final String ACTIVITY = "activity";

public static final String SERIALISED_ACTIVITY = "serialisedActivity";
public static final ObjectMapper OBJECT_MAPPER;

// set MDC entries so that executors can produce log that we can contextualize
Expand All @@ -61,12 +61,9 @@ public class CamundaExecutor implements JavaDelegate {
SimpleModule module = new SimpleModule();
module.addDeserializer(List.class, new EscapedJsonVariableDeserializer<>(List.class));
module.addDeserializer(Map.class, new EscapedJsonVariableDeserializer<>(Map.class));
OBJECT_MAPPER = JsonMapper.builder()
.addModule(module)
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
OBJECT_MAPPER = JsonMapper.builder().addModule(module).configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
// to escape # or $ in message received content and still serialize it to JSON
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
.build();
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true).build();
OBJECT_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
// serialized properties must be annotated explicitly with @JsonProperty
OBJECT_MAPPER.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);
Expand Down Expand Up @@ -106,13 +103,14 @@ public void execute(DelegateExecution execution) throws Exception {
} catch (NoSuchBeanDefinitionException noSuchBeanDefinitionException) {
executor = (ActivityExecutor<?>) implClass.getDeclaredConstructor().newInstance();
}


Type type =
((ParameterizedType) (implClass.getGenericInterfaces()[0])).getActualTypeArguments()[0];
Type type = ((ParameterizedType) (implClass.getGenericInterfaces()[0])).getActualTypeArguments()[0];

// escape break line and new line characters
String activityAsJsonString = ((String) execution.getVariable(ACTIVITY)).replaceAll("(\\r|\\n|\\r\\n)+", "\\\\n");
String activityAsJsonString =
((Map) execution.getVariable(SERIALISED_ACTIVITY)).get(execution.getVariable(ACTIVITY))
.toString()
.replaceAll("(\\r|\\n|\\r\\n)+", "\\\\n");

BaseActivity activity =
(BaseActivity) OBJECT_MAPPER.readValue(activityAsJsonString, Class.forName(type.getTypeName()));

Expand All @@ -138,9 +136,8 @@ private static void logErrorVariables(DelegateExecution execution, BaseActivity
innerMap.put("message", e.getCause() == null ? e.getMessage() : e.getCause().getMessage());
innerMap.put("activityInstId", execution.getActivityInstanceId());
innerMap.put("activityId", activity.getId());
ObjectValue objectValue = Variables.objectValue(innerMap)
.serializationDataFormat(Variables.SerializationDataFormats.JSON)
.create();
ObjectValue objectValue =
Variables.objectValue(innerMap).serializationDataFormat(Variables.SerializationDataFormats.JSON).create();
execution.getProcessEngineServices()
.getRuntimeService()
.setVariable(execution.getId(), ActivityExecutorContext.ERROR, objectValue);
Expand Down Expand Up @@ -183,9 +180,8 @@ public void setOutputVariables(Map<String, Object> variables) {

Map<String, Object> outer = new HashMap<>();
outer.put(ActivityExecutorContext.OUTPUTS, innerMap);
ObjectValue objectValue = Variables.objectValue(outer)
.serializationDataFormat(Variables.SerializationDataFormats.JSON)
.create();
ObjectValue objectValue =
Variables.objectValue(outer).serializationDataFormat(Variables.SerializationDataFormats.JSON).create();

// flatten outputs for message correlation
Map<String, Object> flattenOutputs = new HashMap<>();
Expand Down Expand Up @@ -267,6 +263,5 @@ public File getResourceFile(Path resourcePath) throws IOException {
public Path saveResource(Path resourcePath, byte[] content) throws IOException {
return resourceLoader.saveResource(resourcePath, content);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import com.symphony.bdk.workflow.engine.WorkflowDirectedGraph.NodeChildren;
import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.CamundaExecutor;
import com.symphony.bdk.workflow.engine.camunda.CamundaTranslatedWorkflowContext;
import com.symphony.bdk.workflow.engine.camunda.WorkflowDirectedGraphService;
import com.symphony.bdk.workflow.engine.camunda.bpmn.builder.WorkflowNodeBpmnBuilderRegistry;
import com.symphony.bdk.workflow.engine.camunda.variable.VariablesListener;
import com.symphony.bdk.workflow.swadl.v1.Activity;
import com.symphony.bdk.workflow.swadl.v1.Workflow;
import com.symphony.bdk.workflow.swadl.v1.activity.BaseActivity;

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.RequiredArgsConstructor;
Expand All @@ -29,10 +32,18 @@
import org.camunda.bpm.model.bpmn.builder.ExclusiveGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.ProcessBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaEntry;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaInputOutput;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaInputParameter;
import org.camunda.bpm.model.bpmn.instance.camunda.CamundaMap;
import org.camunda.bpm.model.xml.ModelValidationException;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Events are created with async before to make sure they are not blocking the dispatch of events (starting or
Expand Down Expand Up @@ -79,7 +90,8 @@ public Deployment deployWorkflow(CamundaTranslatedWorkflowContext context) {

private DeploymentBuilder setWorkflowTokenIfExists(DeploymentBuilder deploymentBuilder, Workflow workflow) {
workflow.getActivities().forEach(activity -> {
Optional<String> token = activity.getEvents().getEvents()
Optional<String> token = activity.getEvents()
.getEvents()
.stream()
.filter(event -> event.getRequestReceived() != null && event.getRequestReceived().getToken() != null)
.map(event -> event.getRequestReceived().getToken())
Expand All @@ -101,11 +113,61 @@ private CamundaTranslatedWorkflowContext workflowToBpmn(Workflow workflow) throw
BuildProcessContext context = new BuildProcessContext(workflowDirectedGraph, process);
buildWorkflowInDfs(new NodeChildren(context.getStartEvents()), "", context);
AbstractFlowNodeBuilder<?, ?> builder = closeUpSubProcessesIfAny(context, context.getLastNodeBuilder());

BpmnModelInstance instance = builder.done();
process.addExtensionElement(VariablesListener.create(instance, workflow.getVariables()));
injectActivityDefAsInput(instance, workflow.getActivities());
return new CamundaTranslatedWorkflowContext(workflow, workflowDirectedGraph, instance);
}

/**
* Fix bug where the activity definition contains a variable, which could be resolved, during the workflow process,
* with a big size exceeding the Camunda DB text size limit. The fix is to make the activity definition an map object
* as the input parameter, instead of a simple string previously. An object parameter variable is stored in bytearray table
* as BLOB type, while a string has a limit of 4000 characters.
*
* @param instance the bpmn model instance being built
* @param activities the swadl activity list
* @throws JsonProcessingException json serialisation exception
*/
private void injectActivityDefAsInput(BpmnModelInstance instance, List<Activity> activities)
throws JsonProcessingException {
Map<String, BaseActivity> activityMap =
activities.stream().collect(Collectors.toMap(a -> a.getActivity().getId(), Activity::getActivity));

Collection<CamundaInputOutput> activityInputOutputElements =
instance.getModelElementsByType(CamundaInputOutput.class);
for (CamundaInputOutput inputOutput : activityInputOutputElements) {
CamundaInputParameter activityNameInputParam = extractActivityNameInputParam(inputOutput);
addSerialisedActivityInputParam(instance, inputOutput, activityNameInputParam, activityMap);
}
}

private void addSerialisedActivityInputParam(BpmnModelInstance instance, CamundaInputOutput inputOutput,
CamundaInputParameter activityNameInputParam, Map<String, BaseActivity> activityMap)
throws JsonProcessingException {
CamundaMap map = instance.newInstance(CamundaMap.class);
CamundaEntry entry = instance.newInstance(CamundaEntry.class);
CamundaInputParameter inputParameter = instance.newInstance(CamundaInputParameter.class);

String activityName = activityNameInputParam.getTextContent();
entry.setCamundaKey(activityName);
entry.setTextContent(CamundaExecutor.OBJECT_MAPPER.writeValueAsString(activityMap.get(activityName)));
map.getCamundaEntries().add(entry);

inputParameter.setCamundaName(CamundaExecutor.SERIALISED_ACTIVITY);
inputParameter.setValue(map);
inputOutput.addChildElement(inputParameter);
}

private static CamundaInputParameter extractActivityNameInputParam(CamundaInputOutput inputOutput) {
return inputOutput.getChildElementsByType(CamundaInputParameter.class)
.stream()
.filter(input -> CamundaExecutor.ACTIVITY.equals(input.getCamundaName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Activity missing its name"));
}

private AbstractFlowNodeBuilder<?, ?> closeUpSubProcessesIfAny(BuildProcessContext context,
AbstractFlowNodeBuilder<?, ?> builder) {
while (context.hasEventSubProcess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.symphony.bdk.workflow.engine.WorkflowNode;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
import org.camunda.bpm.model.bpmn.builder.SubProcessBuilder;
Expand All @@ -15,7 +14,7 @@ public abstract class AbstractNodeBpmnBuilder implements WorkflowNodeBpmnBuilder

@Override
public AbstractFlowNodeBuilder<?, ?> connect(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
String nodeId = element.getId();
if (context.isAlreadyBuilt(nodeId)) {
if (element.isConditional()) {
Expand Down Expand Up @@ -63,5 +62,5 @@ protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?
}

protected abstract AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException;
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;
import com.symphony.bdk.workflow.swadl.v1.EventWithTimeout;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractCatchEventBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractGatewayBuilder;
Expand All @@ -17,7 +16,7 @@ public class ActivityExpiredNodeBuilder extends ActivityNodeBuilder {

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
if (hasNoExclusiveFormReplyParent(element, context)) {
if (context.hasTimeoutSubProcess()) {
builder = context.removeLastSubProcessTimeoutBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractActivityBuilder;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -14,17 +13,13 @@ public class ActivityFailedNodeBuilder extends ActivityNodeBuilder {

@Override
protected void connectToExistingNode(String nodeId, AbstractFlowNodeBuilder<?, ?> builder) {
((AbstractActivityBuilder<?, ?>) builder).boundaryEvent()
.name("error_" + nodeId)
.error().connectTo(nodeId);
((AbstractActivityBuilder<?, ?>) builder).boundaryEvent().name("error_" + nodeId).error().connectTo(nodeId);
}

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
builder = ((AbstractActivityBuilder<?, ?>) builder).boundaryEvent()
.name("error_" + element.getId())
.error();
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
builder = ((AbstractActivityBuilder<?, ?>) builder).boundaryEvent().name("error_" + element.getId()).error();
return addTask(builder, element.getActivity());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.symphony.bdk.workflow.swadl.v1.activity.BaseActivity;
import com.symphony.bdk.workflow.swadl.v1.activity.ExecuteScript;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -19,7 +18,7 @@ public class ActivityNodeBuilder extends AbstractNodeBpmnBuilder {

@Override
public AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
return addTask(builder, element.getActivity());
}

Expand All @@ -28,8 +27,7 @@ public WorkflowNodeType type() {
return WorkflowNodeType.ACTIVITY;
}

protected AbstractFlowNodeBuilder<?, ?> addTask(AbstractFlowNodeBuilder<?, ?> eventBuilder, BaseActivity activity)
throws JsonProcessingException {
protected AbstractFlowNodeBuilder<?, ?> addTask(AbstractFlowNodeBuilder<?, ?> eventBuilder, BaseActivity activity) {
// hardcoded so we can rely on Camunda's script task instead of a service task
if (activity instanceof ExecuteScript) {
return addScriptTask(eventBuilder, (ExecuteScript) activity);
Expand All @@ -48,15 +46,14 @@ public WorkflowNodeType type() {
.camundaExecutionListenerClass(ExecutionListener.EVENTNAME_START, ScriptTaskAuditListener.class);
}

private AbstractFlowNodeBuilder<?, ?> addServiceTask(AbstractFlowNodeBuilder<?, ?> builder, BaseActivity activity)
throws JsonProcessingException {
private AbstractFlowNodeBuilder<?, ?> addServiceTask(AbstractFlowNodeBuilder<?, ?> builder, BaseActivity activity) {
return builder.serviceTask()
.id(activity.getId())
.name(activity.getId())
.camundaAsyncAfter()
.camundaClass(CamundaExecutor.class)
.camundaInputParameter(CamundaExecutor.EXECUTOR,
ActivityRegistry.getActivityExecutors().get(activity.getClass()).getName())
.camundaInputParameter(CamundaExecutor.ACTIVITY, CamundaExecutor.OBJECT_MAPPER.writeValueAsString(activity));
.camundaInputParameter(CamundaExecutor.ACTIVITY, activity.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;
import com.symphony.bdk.workflow.engine.camunda.variable.FormVariableListener;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;
import org.springframework.stereotype.Component;
Expand All @@ -14,7 +13,7 @@
public class JoinActivityNodeBuilder extends AbstractNodeBpmnBuilder {
@Override
protected AbstractFlowNodeBuilder<?, ?> build(WorkflowNode element, String parentId,
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) throws JsonProcessingException {
AbstractFlowNodeBuilder<?, ?> builder, BuildProcessContext context) {
return builder.parallelGateway(element.getId())
.camundaExecutionListenerClass(ExecutionListener.EVENTNAME_START, FormVariableListener.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import com.symphony.bdk.workflow.engine.WorkflowNodeType;
import com.symphony.bdk.workflow.engine.camunda.bpmn.BuildProcessContext;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.camunda.bpm.model.bpmn.builder.AbstractFlowNodeBuilder;

public interface WorkflowNodeBpmnBuilder {
String ERROR_CODE = "408";
String DEFAULT_FORM_REPLIED_EVENT_TIMEOUT = "PT24H";

AbstractFlowNodeBuilder<?, ?> connect(WorkflowNode element, String parentId, AbstractFlowNodeBuilder<?, ?> builder,
BuildProcessContext context) throws JsonProcessingException;
BuildProcessContext context);

WorkflowNodeType type();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.symphony.bdk.workflow.engine.executor.request;

import static com.symphony.bdk.workflow.engine.camunda.CamundaExecutor.OBJECT_MAPPER;

import com.symphony.bdk.workflow.engine.executor.ActivityExecutor;
import com.symphony.bdk.workflow.engine.executor.ActivityExecutorContext;
import com.symphony.bdk.workflow.engine.executor.request.client.HttpClient;
Expand Down Expand Up @@ -45,12 +43,10 @@ public void execute(ActivityExecutorContext<ExecuteRequest> execution) throws IO
headersToString(activity.getHeaders()));

log.info("Received response {}", response.getCode());
String valueAsString = OBJECT_MAPPER.writeValueAsString(response.getContent());

Map<String, Object> outputs = new HashMap<>();
outputs.put(OUTPUT_STATUS_KEY, response.getCode());
outputs.put(OUTPUT_BODY_KEY,
valueAsString.length() > 1000 ? valueAsString.substring(0, 1000) : response.getContent());
outputs.put(OUTPUT_BODY_KEY, response.getContent());
execution.setOutputVariables(outputs);
}

Expand Down

0 comments on commit 60fd1fa

Please sign in to comment.