Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<jacoco.version>0.8.8</jacoco.version>
<jsonwebtoken.version>0.11.5</jsonwebtoken.version>
<jackson.databind.version>2.13.4</jackson.databind.version>
<log4j2.version>2.17.1</log4j2.version>
<log4j2.version>2.17.2</log4j2.version>
</properties>
<dependencyManagement>
<dependencies>
Expand All @@ -40,7 +40,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.33</version>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -82,7 +82,22 @@
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>1.3.0</version>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-http-basic</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down Expand Up @@ -122,6 +137,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
Expand Down Expand Up @@ -245,7 +264,7 @@
<dependency>
<groupId>io.boomerang</groupId>
<artifactId>lib-eventing</artifactId>
<version>0.2.5</version>
<version>0.4.3</version>
</dependency>
<dependency>
<groupId>io.boomerang</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.boomerang.aspect;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import io.boomerang.mongo.entity.ActivityEntity;
import io.boomerang.mongo.repository.FlowWorkflowActivityRepository;
import io.boomerang.service.NATSEventingService;

@Aspect
@Component
@ConditionalOnProperty(value = "nats.eventing.enabled", havingValue = "true",
matchIfMissing = false)
public class ActivityEntityUpdateInterceptor {

private static final Logger logger = LogManager.getLogger(ActivityEntityUpdateInterceptor.class);

@Autowired
FlowWorkflowActivityRepository activityRepository;

@Autowired
private NATSEventingService eventingService;

@Before("execution(* io.boomerang.mongo.repository.FlowWorkflowActivityRepository.save(..))"
+ " && args(entityToBeSaved)")
public void beforeSaveInvoked(JoinPoint thisJoinPoint, Object entityToBeSaved) {

logger.info("Intercepted save action on entity {} from {}", entityToBeSaved,
thisJoinPoint.getSignature().getDeclaringTypeName());

if (entityToBeSaved instanceof ActivityEntity) {
activityEntityToBeUpdated((ActivityEntity) entityToBeSaved);
}
}

private void activityEntityToBeUpdated(ActivityEntity newActivityEntity) {

// Check if activity and workflow IDs are not empty
if (StringUtils.isNotBlank(newActivityEntity.getWorkflowId())
&& StringUtils.isNotBlank(newActivityEntity.getId())) {

// Retrieve old entity and compare the statuses
activityRepository.findById(newActivityEntity.getId()).ifPresent(oldActivityEntity -> {
if (oldActivityEntity.getStatus() != newActivityEntity.getStatus()) {

// Status has changed, publish status update CloudEvent
eventingService.publishStatusCloudEvent(newActivityEntity);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.boomerang.aspect;

import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import io.boomerang.mongo.entity.ActivityEntity;
import io.boomerang.mongo.entity.TaskExecutionEntity;
import io.boomerang.mongo.repository.FlowWorkflowActivityTaskRepository;
import io.boomerang.mongo.service.FlowWorkflowActivityService;
import io.boomerang.service.NATSEventingService;

@Aspect
@Component
@ConditionalOnProperty(value = "nats.eventing.enabled", havingValue = "true",
matchIfMissing = false)
public class TaskExecutionUpdateInterceptor {

private static final Logger logger = LogManager.getLogger(TaskExecutionUpdateInterceptor.class);

@Autowired
FlowWorkflowActivityTaskRepository taskRepository;

@Autowired
private FlowWorkflowActivityService activityService;

@Autowired
private NATSEventingService eventingService;

@Before("execution(* io.boomerang.mongo.repository.FlowWorkflowActivityTaskRepository.save(..))"
+ " && args(entityToBeSaved)")
public void beforeSaveInvoked(JoinPoint thisJoinPoint, Object entityToBeSaved) {

logger.info("Intercepted save action on entity {} from {}", entityToBeSaved,
thisJoinPoint.getSignature().getDeclaringTypeName());

if (entityToBeSaved instanceof TaskExecutionEntity) {
taskExecutionToBeUpdated((TaskExecutionEntity) entityToBeSaved);
}
}

private void taskExecutionToBeUpdated(TaskExecutionEntity newTaskExecution) {

// Check if task and activity IDs are not empty
if (StringUtils.isNotBlank(newTaskExecution.getActivityId())
&& StringUtils.isNotBlank(newTaskExecution.getId())) {

// Retrieve old entity and compare the statuses
taskRepository.findById(newTaskExecution.getId()).ifPresent(oldTaskExecution -> {

if (oldTaskExecution.getFlowTaskStatus() != newTaskExecution.getFlowTaskStatus()) {

// Status has changed, publish status update CloudEvent
ActivityEntity activityEntity =
activityService.findWorkflowActivtyById(newTaskExecution.getActivityId());
eventingService.publishStatusCloudEvent(newTaskExecution, activityEntity);
}
});
}
}
}
108 changes: 0 additions & 108 deletions src/main/java/io/boomerang/client/EventingSubscriberClient.java

This file was deleted.

Loading