Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow-translator/flow-translator-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.codice.keip</groupId>
<artifactId>flow-translator-lib</artifactId>
<version>0.1.0</version>
<version>0.2.0</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>org.codice.keip.schemas</groupId>
<artifactId>validation</artifactId>
<version>0.1.0</version>
<version>0.2.0</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ public class GuavaGraph implements EipGraph {
private final ImmutableValueGraph<EipNode, EdgeProps> graph;

private GuavaGraph(ImmutableValueGraph<EipNode, EdgeProps> graph) {
if (Graphs.hasCycle(graph.asGraph())) {
throw new IllegalArgumentException("Graphs with cycles are not allowed");
}
this.graph = graph;
}

Expand Down Expand Up @@ -63,7 +60,12 @@ public Optional<EdgeProps> getEdgeProps(EipNode source, EipNode target) {

/** Finds the nodes in the graph that have no incoming edges (indicating the start of a flow). */
private Stream<EipNode> findRoots() {
return graph.nodes().stream().filter(node -> graph.inDegree(node) == 0);
Stream<EipNode> roots = graph.nodes().stream().filter(node -> graph.inDegree(node) == 0);
if (Graphs.hasCycle(graph.asGraph())) {
// traverse acyclic components first
return Stream.concat(roots, graph.nodes().stream());
}
return roots;
}

private static void addNodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public enum ConnectionType {
CONTENT_BASED_ROUTER,
INBOUND_REQUEST_REPLY,
PASSTHRU,
REQUEST_REPLY,
SINK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Map<String, Object> createChannelAttributes() {
Map<String, Object> attributes = new LinkedHashMap<>();
return switch (this.node.connectionType()) {
case CONTENT_BASED_ROUTER -> handleRouter(attributes, predecessors);
case INBOUND_REQUEST_REPLY ->
handleInboundRequestReply(attributes, predecessors, successors);
case PASSTHRU -> handlePassthru(attributes, predecessors, successors);
case REQUEST_REPLY -> handleRequestReply(attributes, predecessors, successors);
case SINK -> handleSink(attributes, predecessors, successors);
Expand All @@ -115,6 +117,21 @@ private Map<String, Object> handleRouter(
return attributes;
}

private Map<String, Object> handleInboundRequestReply(
Map<String, Object> attributes, Set<EipNode> predecessors, Set<EipNode> successors) {
if (predecessors.size() > 1 || successors.size() > 1) {
throw new IllegalArgumentException(
"'RequestReply' nodes can have at most one incoming and one outgoing edge");
}
predecessors.stream()
.findFirst()
.ifPresent(p -> attributes.put(REPLY_CHANNEL, getChannelId(p, this.node)));
successors.stream()
.findFirst()
.ifPresent(s -> attributes.put(REQUEST_CHANNEL, getChannelId(this.node, s)));
return attributes;
}

private Map<String, Object> handlePassthru(
Map<String, Object> attributes, Set<EipNode> predecessors, Set<EipNode> successors) {
if (predecessors.size() > 1 || successors.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import static org.codice.keip.flow.xml.XmlComparisonUtil.readTestXml
class FlowToSpringIntegrationTest extends Specification {
private static final List<NamespaceSpec> NAMESPACES = [
new NamespaceSpec("jms", "http://www.springframework.org/schema/integration/jms", "https://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"),
new NamespaceSpec("http", "http://www.springframework.org/schema/integration/http", "https://www.springframework.org/schema/integration/http/spring-integration-http.xsd")
new NamespaceSpec("http", "http://www.springframework.org/schema/integration/http", "https://www.springframework.org/schema/integration/http/spring-integration-http.xsd"),
new NamespaceSpec("ftp", "http://www.springframework.org/schema/integration/ftp", "https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd")
]

private static final JsonMapper MAPPER =
Expand Down Expand Up @@ -47,6 +48,7 @@ class FlowToSpringIntegrationTest extends Specification {
"flowGraph1.json" | Path.of("end-to-end", "spring-integration-1.xml").toString()
"flowGraph2.json" | Path.of("end-to-end", "spring-integration-2.xml").toString()
"flowGraph3.json" | Path.of("end-to-end", "spring-integration-3.xml").toString()
"flowGraph4.json" | Path.of("end-to-end", "spring-integration-4.xml").toString()
}

def "Verify transformation error list is populated on node transformation error"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.codice.keip.flow.model.FlowEdge
import org.codice.keip.flow.model.Role
import spock.lang.Specification

import java.util.stream.Collectors
import java.util.stream.Stream

class GuavaGraphTest extends Specification {
Expand Down Expand Up @@ -183,7 +184,7 @@ class GuavaGraphTest extends Specification {
graph.getEdgeProps(n1, n2).get().id() == parallelEdge.id()
}

def "graph with a cycle throws exception"() {
def "graph with a single cycle -> each node is traversed once"() {
given:
def n1 = newNode("1")
def n2 = newNode("2")
Expand All @@ -196,10 +197,43 @@ class GuavaGraphTest extends Specification {
def flow = new Flow([n1, n2, n3], [e1, e2, e3])

when:
GuavaGraph.from(flow)
def graph = GuavaGraph.from(flow)

then:
thrown(IllegalArgumentException)
graph.traverse().map { it.id() }.collect(Collectors.toSet()).size() == 3
}

def "graph with multiple cycles is allowed -> each node is traversed once"() {
given:
def n1 = newNode("1")
def n2 = newNode("2")
def n3 = newNode("3")

def n4 = newNode("4")
def n5 = newNode("5")
def n6 = newNode("6")

// cycle free connected component
def n7 = newNode("7")
def n8 = newNode("8")

def e1 = new FlowEdge("a", n1.id(), n2.id())
def e2 = new FlowEdge("b", n2.id(), n3.id())
def e3 = new FlowEdge("c", n3.id(), n1.id())

def e4 = new FlowEdge("d", n4.id(), n5.id())
def e5 = new FlowEdge("e", n5.id(), n6.id())
def e6 = new FlowEdge("f", n6.id(), n4.id())

def e7 = new FlowEdge("g", n7.id(), n8.id())

def flow = new Flow([n1, n2, n3, n4, n5, n6, n7, n8], [e1, e2, e3, e4, e5, e6, e7])

when:
def graph = GuavaGraph.from(flow)

then:
graph.traverse().toList().size() == 8
}

private static EipNode newNode(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ class DefaultNodeTransformerTest extends Specification {
[] | [createNodeStub("n1"), createNodeStub("n2")]
}

def "multi-input/multi-output 'inbound_request_reply' node fails validation"(Set<EipNode> predecessors, Set<EipNode> successors) {
given:
graph.predecessors(testNode) >> predecessors
graph.successors(testNode) >> successors

when:
transformer.apply(testNode, graph)

then:
testNode.connectionType() >> ConnectionType.INBOUND_REQUEST_REPLY
thrown(IllegalArgumentException)

where:
predecessors | successors
[createNodeStub("n1"), createNodeStub("n2")] | []
[] | [createNodeStub("n1"), createNodeStub("n2")]
}

def "multi-input/single-output 'sink' node fails validation"(Set<EipNode> predecessors, Set<EipNode> successors) {
given:
graph.predecessors(testNode) >> predecessors
Expand Down Expand Up @@ -283,6 +301,29 @@ class DefaultNodeTransformerTest extends Specification {
attributes[REPLY_CHANNEL] == "out"
}

def "addChannelAttributes with 'inbound_request_reply' node -> input and output channel attributes added"() {
given:
def preNode = createNodeStub("pre1")
def postNode = createNodeStub("post1")

graph.predecessors(testNode) >> [preNode]
graph.successors(testNode) >> [postNode]

graph.getEdgeProps(preNode, testNode) >> createEdgeProps("in")
graph.getEdgeProps(testNode, postNode) >> createEdgeProps("out")

when:
def transformation = new DefaultNodeTransformer.DefaultTransformation(testNode, graph)
def attributes = transformation.createChannelAttributes()

then:
testNode.connectionType() >> ConnectionType.INBOUND_REQUEST_REPLY

attributes.size() == 2
attributes[REQUEST_CHANNEL] == "out"
attributes[REPLY_CHANNEL] == "in"
}

def "addChannelAttributes with 'sink' node -> channel attribute is added"() {
given:
def preNode = createNodeStub("pre1")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"nodes": [
{
"id": "testIn",
"eipId": {
"namespace": "http",
"name": "inbound-gateway"
},
"role": "endpoint",
"connectionType": "inbound_request_reply",
"attributes": {},
"children": []
},
{
"id": "testIn-reply-channel",
"eipId": {
"namespace": "integration",
"name": "channel"
},
"role": "channel",
"connectionType": "passthru",
"attributes": {},
"children": []
},
{
"id": "updatePayload",
"eipId": {
"namespace": "integration",
"name": "transformer"
},
"role": "transformer",
"connectionType": "passthru",
"attributes": {},
"children": []
},
{
"id": "httpIn",
"eipId": {
"namespace": "http",
"name": "inbound-channel-adapter"
},
"role": "endpoint",
"connectionType": "source",
"attributes": {},
"children": []
},
{
"id": "ftpOut",
"eipId": {
"namespace": "ftp",
"name": "outbound-channel-adapter"
},
"role": "endpoint",
"connectionType": "sink",
"attributes": {},
"children": []
},
{
"id": "ftpToReply",
"eipId": {
"namespace": "ftp",
"name": "inbound-channel-adapter"
},
"role": "endpoint",
"connectionType": "source",
"attributes": {},
"children": []
}
],
"edges": [
{
"id": "ch-testIn-updatePayload",
"source": "testIn",
"target": "updatePayload",
"type": "default"
},
{
"id": "ch-updatePayload-testIn-reply-channel",
"source": "updatePayload",
"target": "testIn-reply-channel",
"type": "default"
},
{
"id": "ch-httpIn-ftpOut",
"source": "httpIn",
"target": "ftpOut",
"type": "default"
},
{
"id": "ch-ftpToReply-testIn-reply-channel",
"source": "ftpToReply",
"target": "testIn-reply-channel",
"type": "default"
},
{
"id": "ch-testIn-reply-channel-testIn",
"source": "testIn-reply-channel",
"target": "testIn",
"type": "default"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:http="http://www.springframework.org/schema/integration/http"
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/http https://www.springframework.org/schema/integration/http/spring-integration-http.xsd http://www.springframework.org/schema/integration/ftp https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">

<http:inbound-channel-adapter id="httpIn" channel="ch-httpIn-ftpOut"/>

<integration:channel id="ch-httpIn-ftpOut"/>

<ftp:outbound-channel-adapter id="ftpOut" channel="ch-httpIn-ftpOut"/>

<ftp:inbound-channel-adapter id="ftpToReply" channel="testIn-reply-channel"/>

<integration:channel id="testIn-reply-channel"/>

<http:inbound-gateway id="testIn" reply-channel="testIn-reply-channel" request-channel="ch-testIn-updatePayload"/>

<integration:channel id="ch-testIn-updatePayload"/>

<integration:transformer id="updatePayload" input-channel="ch-testIn-updatePayload"
output-channel="testIn-reply-channel"/>
</beans>
4 changes: 2 additions & 2 deletions flow-translator/flow-translator-webapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>org.codice.keip</groupId>
<artifactId>flow-translator-webapp</artifactId>
<version>0.1.0</version>
<version>0.2.0</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>org.codice.keip</groupId>
<artifactId>flow-translator-lib</artifactId>
<version>0.1.0</version>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
2 changes: 1 addition & 1 deletion schemas/model/json/attributeType.schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.1.0/attributeType.schema.json",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.2.0/attributeType.schema.json",
"title": "AttributeType",
"description": "The attribute's value type (attribute keys are always strings)",
"oneOf": [
Expand Down
3 changes: 2 additions & 1 deletion schemas/model/json/connectionType.schema.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.1.0/connectionType.schema.json",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.2.0/connectionType.schema.json",
"title": "ConnectionType",
"description": "Defines a connection pattern for an EIP component",
"type": "string",
"enum": [
"content_based_router",
"inbound_request_reply",
"passthru",
"request_reply",
"sink",
Expand Down
2 changes: 1 addition & 1 deletion schemas/model/json/eipComponentDef.schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.1.0/eipComponentDef.schema.json",
"$id": "https://github.com/codice/keip-canvas/schemas/v0.2.0/eipComponentDef.schema.json",
"title": "EipComponentDefinition",
"description": "Defines the collection of EIP components available for use",
"type": "object",
Expand Down
Loading