Skip to content

Commit b1b552f

Browse files
committed
feat(flow-translator): handle nodes with 'inbound_request_reply' connectionType
1 parent e62cb6d commit b1b552f

File tree

10 files changed

+235
-12
lines changed

10 files changed

+235
-12
lines changed

flow-translator/flow-translator-lib/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.codice.keip</groupId>
88
<artifactId>flow-translator-lib</artifactId>
9-
<version>0.1.0</version>
9+
<version>0.2.0</version>
1010

1111
<packaging>jar</packaging>
1212

@@ -48,7 +48,7 @@
4848
<dependency>
4949
<groupId>org.codice.keip.schemas</groupId>
5050
<artifactId>validation</artifactId>
51-
<version>0.1.0</version>
51+
<version>0.2.0</version>
5252
<scope>test</scope>
5353
</dependency>
5454

flow-translator/flow-translator-lib/src/main/java/org/codice/keip/flow/graph/GuavaGraph.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ public class GuavaGraph implements EipGraph {
2525
private final ImmutableValueGraph<EipNode, EdgeProps> graph;
2626

2727
private GuavaGraph(ImmutableValueGraph<EipNode, EdgeProps> graph) {
28-
if (Graphs.hasCycle(graph.asGraph())) {
29-
throw new IllegalArgumentException("Graphs with cycles are not allowed");
30-
}
3128
this.graph = graph;
3229
}
3330

@@ -63,7 +60,12 @@ public Optional<EdgeProps> getEdgeProps(EipNode source, EipNode target) {
6360

6461
/** Finds the nodes in the graph that have no incoming edges (indicating the start of a flow). */
6562
private Stream<EipNode> findRoots() {
66-
return graph.nodes().stream().filter(node -> graph.inDegree(node) == 0);
63+
Stream<EipNode> roots = graph.nodes().stream().filter(node -> graph.inDegree(node) == 0);
64+
if (Graphs.hasCycle(graph.asGraph())) {
65+
// traverse acyclic components first
66+
return Stream.concat(roots, graph.nodes().stream());
67+
}
68+
return roots;
6769
}
6870

6971
private static void addNodes(

flow-translator/flow-translator-lib/src/main/java/org/codice/keip/flow/model/ConnectionType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
public enum ConnectionType {
44
CONTENT_BASED_ROUTER,
5+
INBOUND_REQUEST_REPLY,
56
PASSTHRU,
67
REQUEST_REPLY,
78
SINK,

flow-translator/flow-translator-lib/src/main/java/org/codice/keip/flow/xml/spring/DefaultNodeTransformer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ Map<String, Object> createChannelAttributes() {
9595
Map<String, Object> attributes = new LinkedHashMap<>();
9696
return switch (this.node.connectionType()) {
9797
case CONTENT_BASED_ROUTER -> handleRouter(attributes, predecessors);
98+
case INBOUND_REQUEST_REPLY ->
99+
handleInboundRequestReply(attributes, predecessors, successors);
98100
case PASSTHRU -> handlePassthru(attributes, predecessors, successors);
99101
case REQUEST_REPLY -> handleRequestReply(attributes, predecessors, successors);
100102
case SINK -> handleSink(attributes, predecessors, successors);
@@ -115,6 +117,21 @@ private Map<String, Object> handleRouter(
115117
return attributes;
116118
}
117119

120+
private Map<String, Object> handleInboundRequestReply(
121+
Map<String, Object> attributes, Set<EipNode> predecessors, Set<EipNode> successors) {
122+
if (predecessors.size() > 1 || successors.size() > 1) {
123+
throw new IllegalArgumentException(
124+
"'RequestReply' nodes can have at most one incoming and one outgoing edge");
125+
}
126+
predecessors.stream()
127+
.findFirst()
128+
.ifPresent(p -> attributes.put(REPLY_CHANNEL, getChannelId(p, this.node)));
129+
successors.stream()
130+
.findFirst()
131+
.ifPresent(s -> attributes.put(REQUEST_CHANNEL, getChannelId(this.node, s)));
132+
return attributes;
133+
}
134+
118135
private Map<String, Object> handlePassthru(
119136
Map<String, Object> attributes, Set<EipNode> predecessors, Set<EipNode> successors) {
120137
if (predecessors.size() > 1 || successors.size() > 1) {

flow-translator/flow-translator-lib/src/test/groovy/org/codice/keip/flow/FlowToSpringIntegrationTest.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import static org.codice.keip.flow.xml.XmlComparisonUtil.readTestXml
1919
class FlowToSpringIntegrationTest extends Specification {
2020
private static final List<NamespaceSpec> NAMESPACES = [
2121
new NamespaceSpec("jms", "http://www.springframework.org/schema/integration/jms", "https://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd"),
22-
new NamespaceSpec("http", "http://www.springframework.org/schema/integration/http", "https://www.springframework.org/schema/integration/http/spring-integration-http.xsd")
22+
new NamespaceSpec("http", "http://www.springframework.org/schema/integration/http", "https://www.springframework.org/schema/integration/http/spring-integration-http.xsd"),
23+
new NamespaceSpec("ftp", "http://www.springframework.org/schema/integration/ftp", "https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd")
2324
]
2425

2526
private static final JsonMapper MAPPER =
@@ -47,6 +48,7 @@ class FlowToSpringIntegrationTest extends Specification {
4748
"flowGraph1.json" | Path.of("end-to-end", "spring-integration-1.xml").toString()
4849
"flowGraph2.json" | Path.of("end-to-end", "spring-integration-2.xml").toString()
4950
"flowGraph3.json" | Path.of("end-to-end", "spring-integration-3.xml").toString()
51+
"flowGraph4.json" | Path.of("end-to-end", "spring-integration-4.xml").toString()
5052
}
5153

5254
def "Verify transformation error list is populated on node transformation error"() {

flow-translator/flow-translator-lib/src/test/groovy/org/codice/keip/flow/graph/GuavaGraphTest.groovy

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.codice.keip.flow.model.FlowEdge
99
import org.codice.keip.flow.model.Role
1010
import spock.lang.Specification
1111

12+
import java.util.stream.Collectors
1213
import java.util.stream.Stream
1314

1415
class GuavaGraphTest extends Specification {
@@ -183,7 +184,7 @@ class GuavaGraphTest extends Specification {
183184
graph.getEdgeProps(n1, n2).get().id() == parallelEdge.id()
184185
}
185186

186-
def "graph with a cycle throws exception"() {
187+
def "graph with a single cycle -> each node is traversed once"() {
187188
given:
188189
def n1 = newNode("1")
189190
def n2 = newNode("2")
@@ -196,10 +197,43 @@ class GuavaGraphTest extends Specification {
196197
def flow = new Flow([n1, n2, n3], [e1, e2, e3])
197198

198199
when:
199-
GuavaGraph.from(flow)
200+
def graph = GuavaGraph.from(flow)
200201

201202
then:
202-
thrown(IllegalArgumentException)
203+
graph.traverse().map { it.id() }.collect(Collectors.toSet()).size() == 3
204+
}
205+
206+
def "graph with multiple cycles is allowed -> each node is traversed once"() {
207+
given:
208+
def n1 = newNode("1")
209+
def n2 = newNode("2")
210+
def n3 = newNode("3")
211+
212+
def n4 = newNode("4")
213+
def n5 = newNode("5")
214+
def n6 = newNode("6")
215+
216+
// cycle free connected component
217+
def n7 = newNode("7")
218+
def n8 = newNode("8")
219+
220+
def e1 = new FlowEdge("a", n1.id(), n2.id())
221+
def e2 = new FlowEdge("b", n2.id(), n3.id())
222+
def e3 = new FlowEdge("c", n3.id(), n1.id())
223+
224+
def e4 = new FlowEdge("d", n4.id(), n5.id())
225+
def e5 = new FlowEdge("e", n5.id(), n6.id())
226+
def e6 = new FlowEdge("f", n6.id(), n4.id())
227+
228+
def e7 = new FlowEdge("g", n7.id(), n8.id())
229+
230+
def flow = new Flow([n1, n2, n3, n4, n5, n6, n7, n8], [e1, e2, e3, e4, e5, e6, e7])
231+
232+
when:
233+
def graph = GuavaGraph.from(flow)
234+
235+
then:
236+
graph.traverse().toList().size() == 8
203237
}
204238

205239
private static EipNode newNode(String id) {

flow-translator/flow-translator-lib/src/test/groovy/org/codice/keip/flow/xml/spring/DefaultNodeTransformerTest.groovy

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ class DefaultNodeTransformerTest extends Specification {
7777
[] | [createNodeStub("n1"), createNodeStub("n2")]
7878
}
7979

80+
def "multi-input/multi-output 'inbound_request_reply' node fails validation"(Set<EipNode> predecessors, Set<EipNode> successors) {
81+
given:
82+
graph.predecessors(testNode) >> predecessors
83+
graph.successors(testNode) >> successors
84+
85+
when:
86+
transformer.apply(testNode, graph)
87+
88+
then:
89+
testNode.connectionType() >> ConnectionType.INBOUND_REQUEST_REPLY
90+
thrown(IllegalArgumentException)
91+
92+
where:
93+
predecessors | successors
94+
[createNodeStub("n1"), createNodeStub("n2")] | []
95+
[] | [createNodeStub("n1"), createNodeStub("n2")]
96+
}
97+
8098
def "multi-input/single-output 'sink' node fails validation"(Set<EipNode> predecessors, Set<EipNode> successors) {
8199
given:
82100
graph.predecessors(testNode) >> predecessors
@@ -283,6 +301,29 @@ class DefaultNodeTransformerTest extends Specification {
283301
attributes[REPLY_CHANNEL] == "out"
284302
}
285303

304+
def "addChannelAttributes with 'inbound_request_reply' node -> input and output channel attributes added"() {
305+
given:
306+
def preNode = createNodeStub("pre1")
307+
def postNode = createNodeStub("post1")
308+
309+
graph.predecessors(testNode) >> [preNode]
310+
graph.successors(testNode) >> [postNode]
311+
312+
graph.getEdgeProps(preNode, testNode) >> createEdgeProps("in")
313+
graph.getEdgeProps(testNode, postNode) >> createEdgeProps("out")
314+
315+
when:
316+
def transformation = new DefaultNodeTransformer.DefaultTransformation(testNode, graph)
317+
def attributes = transformation.createChannelAttributes()
318+
319+
then:
320+
testNode.connectionType() >> ConnectionType.INBOUND_REQUEST_REPLY
321+
322+
attributes.size() == 2
323+
attributes[REQUEST_CHANNEL] == "out"
324+
attributes[REPLY_CHANNEL] == "in"
325+
}
326+
286327
def "addChannelAttributes with 'sink' node -> channel attribute is added"() {
287328
given:
288329
def preNode = createNodeStub("pre1")
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
{
2+
"nodes": [
3+
{
4+
"id": "testIn",
5+
"eipId": {
6+
"namespace": "http",
7+
"name": "inbound-gateway"
8+
},
9+
"role": "endpoint",
10+
"connectionType": "inbound_request_reply",
11+
"attributes": {},
12+
"children": []
13+
},
14+
{
15+
"id": "testIn-reply-channel",
16+
"eipId": {
17+
"namespace": "integration",
18+
"name": "channel"
19+
},
20+
"role": "channel",
21+
"connectionType": "passthru",
22+
"attributes": {},
23+
"children": []
24+
},
25+
{
26+
"id": "updatePayload",
27+
"eipId": {
28+
"namespace": "integration",
29+
"name": "transformer"
30+
},
31+
"role": "transformer",
32+
"connectionType": "passthru",
33+
"attributes": {},
34+
"children": []
35+
},
36+
{
37+
"id": "httpIn",
38+
"eipId": {
39+
"namespace": "http",
40+
"name": "inbound-channel-adapter"
41+
},
42+
"role": "endpoint",
43+
"connectionType": "source",
44+
"attributes": {},
45+
"children": []
46+
},
47+
{
48+
"id": "ftpOut",
49+
"eipId": {
50+
"namespace": "ftp",
51+
"name": "outbound-channel-adapter"
52+
},
53+
"role": "endpoint",
54+
"connectionType": "sink",
55+
"attributes": {},
56+
"children": []
57+
},
58+
{
59+
"id": "ftpToReply",
60+
"eipId": {
61+
"namespace": "ftp",
62+
"name": "inbound-channel-adapter"
63+
},
64+
"role": "endpoint",
65+
"connectionType": "source",
66+
"attributes": {},
67+
"children": []
68+
}
69+
],
70+
"edges": [
71+
{
72+
"id": "ch-testIn-updatePayload",
73+
"source": "testIn",
74+
"target": "updatePayload",
75+
"type": "default"
76+
},
77+
{
78+
"id": "ch-updatePayload-testIn-reply-channel",
79+
"source": "updatePayload",
80+
"target": "testIn-reply-channel",
81+
"type": "default"
82+
},
83+
{
84+
"id": "ch-httpIn-ftpOut",
85+
"source": "httpIn",
86+
"target": "ftpOut",
87+
"type": "default"
88+
},
89+
{
90+
"id": "ch-ftpToReply-testIn-reply-channel",
91+
"source": "ftpToReply",
92+
"target": "testIn-reply-channel",
93+
"type": "default"
94+
},
95+
{
96+
"id": "ch-testIn-reply-channel-testIn",
97+
"source": "testIn-reply-channel",
98+
"target": "testIn",
99+
"type": "default"
100+
}
101+
]
102+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns:integration="http://www.springframework.org/schema/integration"
4+
xmlns:http="http://www.springframework.org/schema/integration/http"
5+
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
6+
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/http https://www.springframework.org/schema/integration/http/spring-integration-http.xsd http://www.springframework.org/schema/integration/ftp https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">
7+
8+
<http:inbound-channel-adapter id="httpIn" channel="ch-httpIn-ftpOut"/>
9+
10+
<integration:channel id="ch-httpIn-ftpOut"/>
11+
12+
<ftp:outbound-channel-adapter id="ftpOut" channel="ch-httpIn-ftpOut"/>
13+
14+
<ftp:inbound-channel-adapter id="ftpToReply" channel="testIn-reply-channel"/>
15+
16+
<integration:channel id="testIn-reply-channel"/>
17+
18+
<http:inbound-gateway id="testIn" reply-channel="testIn-reply-channel" request-channel="ch-testIn-updatePayload"/>
19+
20+
<integration:channel id="ch-testIn-updatePayload"/>
21+
22+
<integration:transformer id="updatePayload" input-channel="ch-testIn-updatePayload"
23+
output-channel="testIn-reply-channel"/>
24+
</beans>

flow-translator/flow-translator-webapp/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
<groupId>org.codice.keip</groupId>
1313
<artifactId>flow-translator-webapp</artifactId>
14-
<version>0.1.0</version>
14+
<version>0.2.0</version>
1515

1616
<packaging>jar</packaging>
1717

@@ -43,7 +43,7 @@
4343
<dependency>
4444
<groupId>org.codice.keip</groupId>
4545
<artifactId>flow-translator-lib</artifactId>
46-
<version>0.1.0</version>
46+
<version>0.2.0</version>
4747
</dependency>
4848
<dependency>
4949
<groupId>org.springframework.boot</groupId>

0 commit comments

Comments
 (0)