Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public AbstractStream(final AbstractStream<K, V> stream) {
this.valueSerde = valueSerde;
this.subTopologySourceNodes = subTopologySourceNodes;
this.graphNode = graphNode;

this.graphNode.setValueSerde(valueSerde);
}

// This method allows to expose the InternalTopologyBuilder instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) {
if (node.isKeyChangingOperation()) {
keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet<>());
} else if (node instanceof OptimizableRepartitionNode) {
final GraphNode parentNode = getKeyChangingParentNode(node);
final GraphNode parentNode = findParentNodeMatching(node, GraphNode::isKeyChangingOperation);
if (parentNode != null) {
keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode<?, ?>) node);
}
Expand Down Expand Up @@ -488,7 +488,8 @@ private void mergeRepartitionTopics() {
continue;
}

final GroupedInternal<?, ?> groupedInternal = new GroupedInternal<>(getRepartitionSerdes(entry.getValue()));
// Resolve key value serdes for merging repartition nodes under the common key changing node
final GroupedInternal<?, ?> groupedInternal = getRepartitionSerdes(entry.getKey(), entry.getValue());

final String repartitionTopicName = getFirstRepartitionTopicName(entry.getValue());
//passing in the name of the first repartition topic, re-used to create the optimized repartition topic
Expand All @@ -503,19 +504,19 @@ private void mergeRepartitionTopics() {

final GraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode));

if (keyChangingNodeChild == null) {
throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced));
}

LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced);
// Remove any child to the key-changing node
// Such child may be common to all corresponding repartition nodes, so only remove it once
if (keyChangingNodeChild != null) {
LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced);

// need to add children of key-changing node as children of optimized repartition
// in order to process records from re-partitioning
optimizedSingleRepartition.addChild(keyChangingNodeChild);
// need to add children of key-changing node as children of optimized repartition
// in order to process records from re-partitioning
optimizedSingleRepartition.addChild(keyChangingNodeChild);

LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children());
// now remove children from key-changing node
keyChangingNode.removeChild(keyChangingNodeChild);
LOG.debug("Removing {} from {} children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children());
// now remove children from key-changing node
keyChangingNode.removeChild(keyChangingNodeChild);
}

// now need to get children of repartition node so we can remove repartition node
final Collection<GraphNode> repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children();
Expand Down Expand Up @@ -607,39 +608,26 @@ private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final Stri

}

private GraphNode getKeyChangingParentNode(final GraphNode repartitionNode) {
final GraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation());

final GraphNode keyChangingNode = findParentNodeMatching(repartitionNode, GraphNode::isKeyChangingOperation);
if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) {
return keyChangingNode;
}
return null;
}

private String getFirstRepartitionTopicName(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
return repartitionNodes.iterator().next().repartitionTopic();
}

@SuppressWarnings("unchecked")
private <K, V> GroupedInternal<K, V> getRepartitionSerdes(final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
private <K, V> GroupedInternal<K, V> getRepartitionSerdes(final GraphNode keyChangingNode, final Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
Serde<K> keySerde = null;
Serde<V> valueSerde = null;

for (final OptimizableRepartitionNode<?, ?> repartitionNode : repartitionNodes) {
if (keySerde == null && repartitionNode.keySerde() != null) {
keySerde = (Serde<K>) repartitionNode.keySerde();
}

if (valueSerde == null && repartitionNode.valueSerde() != null) {
valueSerde = (Serde<V>) repartitionNode.valueSerde();
}

if (keySerde != null && valueSerde != null) {
break;
}
}

// Resolve repartition nodes' value serde for any value-changing nodes upstream until key changing node
Serde<V> valueSerde = (Serde<V>) keyChangingNode.valueSerde();
if (valueSerde == null) {
final GraphNode parent = findParentNodeMatching(keyChangingNode, gn -> gn.valueSerde() != null);
valueSerde = parent == null ? null : (Serde<V>) parent.valueSerde();
}
return new GroupedInternal<>(Grouped.with(keySerde, valueSerde));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals.graph;


import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

import java.util.Arrays;
Expand All @@ -42,6 +43,7 @@ public abstract class GraphNode {
// explicitly materialized (as either a versioned or an unversioned store) and therefore
// whether the output is to be considered versioned or not depends on its parent(s)
private Optional<Boolean> outputVersioned = Optional.empty();
private Serde<?> valueSerde = null;

public GraphNode(final String nodeName) {
this.nodeName = nodeName;
Expand Down Expand Up @@ -143,6 +145,14 @@ public void setOutputVersioned(final boolean outputVersioned) {
this.outputVersioned = Optional.of(outputVersioned);
}

public void setValueSerde(final Serde<?> valueSerde) {
this.valueSerde = valueSerde;
}

public Serde<?> valueSerde() {
return valueSerde;
}

@Override
public String toString() {
final String[] parentNames = parentNodeNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public Serde<K> keySerde() {
return keySerde;
}

public Serde<V> valueSerde() {
return valueSerde;
}

public String repartitionTopic() {
return repartitionTopic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,16 @@ public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKe
" <-- KTABLE-TOSTREAM-0000000024\n\n",
noOptimization.describe().toString()
);
assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}

@Test
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() {
public void shouldOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() {
final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true);
final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true);

assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(1, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
}

Expand Down Expand Up @@ -578,13 +577,13 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
" --> KSTREAM-MERGE-0000000022\n" +
" <-- KSTREAM-PEEK-0000000020\n" +
" Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000024\n" +
" --> KSTREAM-MERGE-0000000022-repartition-filter\n" +
" <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" +
" Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" +
" --> KSTREAM-SINK-0000000023\n" +
" Processor: KSTREAM-MERGE-0000000022-repartition-filter (stores: [])\n" +
" --> KSTREAM-MERGE-0000000022-repartition-sink\n" +
" <-- KSTREAM-MERGE-0000000022\n" +
" Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" +
" <-- KSTREAM-FILTER-0000000024\n" +
" Sink: KSTREAM-MERGE-0000000022-repartition-sink (topic: KSTREAM-MERGE-0000000022-repartition)\n" +
" <-- KSTREAM-MERGE-0000000022-repartition-filter\n" +
"\n" +
" Sub-topology: 2\n" +
" Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" +
Expand All @@ -599,11 +598,11 @@ private int getCountOfRepartitionTopicsFound(final String topologyString) {
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition-filter\n" +
"\n" +
" Sub-topology: 3\n" +
" Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" +
" Source: KSTREAM-MERGE-0000000022-repartition-source (topics: [KSTREAM-MERGE-0000000022-repartition])\n" +
" --> KSTREAM-LEFTJOIN-0000000026\n" +
" Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
" --> KSTREAM-BRANCH-0000000027\n" +
" <-- KSTREAM-SOURCE-0000000025\n" +
" <-- KSTREAM-MERGE-0000000022-repartition-source\n" +
" Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" +
" --> KSTREAM-BRANCH-00000000270, KSTREAM-BRANCH-00000000271\n" +
" <-- KSTREAM-LEFTJOIN-0000000026\n" +
Expand Down
Loading