Skip to content

Commit 83f48a5

Browse files
timo95ChrizZz110
andauthored
[#1544] Make temporal grouping result temporal (#1554)
fixes #1544 Co-authored-by: timo95 <[email protected]> Co-authored-by: ChrizZz110 <[email protected]>
1 parent 3107654 commit 83f48a5

File tree

20 files changed

+334
-164
lines changed

20 files changed

+334
-164
lines changed

gradoop-examples/gradoop-examples-temporal/src/main/java/org/gradoop/examples/aggregation/TemporalAggregationExample.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ public static void main(String[] args) throws Exception {
6767
bikeGraph
6868
// apply four aggregate functions to get the earliest and latest start and end of a duration
6969
.aggregate(
70-
new MinVertexTime("earliestStart", TimeDimension.VALID_TIME, TimeDimension.Field.FROM),
71-
new MinVertexTime("earliestEnd", TimeDimension.VALID_TIME, TimeDimension.Field.TO),
72-
new MaxVertexTime("lastStart", TimeDimension.VALID_TIME, TimeDimension.Field.FROM),
73-
new MaxVertexTime("lastEnd", TimeDimension.VALID_TIME, TimeDimension.Field.TO))
70+
new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, "earliestStart"),
71+
new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO, "earliestEnd"),
72+
new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, "lastStart"),
73+
new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO, "lastEnd"))
7474
// since the aggregated values are 'long' values, we transform them into 'LocalDateTime' values
7575
.transformGraphHead(
7676
new TransformLongPropertiesToDateTime<>("earliestStart", "earliestEnd", "lastStart", "lastEnd"))

gradoop-examples/gradoop-examples-temporal/src/main/java/org/gradoop/examples/grouping/TemporalGroupingExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ public static void main(String[] args) throws Exception {
8989
Arrays.asList(
9090
new Count("count"),
9191
new AverageDuration("avgDur", dim),
92-
new MinTime("firstStart", dim, TimeDimension.Field.FROM),
93-
new MaxTime("lastStart", dim, TimeDimension.Field.FROM)),
92+
new MinTime(dim, TimeDimension.Field.FROM, "firstStart"),
93+
new MaxTime(dim, TimeDimension.Field.FROM, "lastStart")),
9494
// Edge grouping keys (label)
9595
Collections.singletonList(GroupingKeys.label()),
9696
// Edge aggregation functions (count)

gradoop-flink/src/main/java/org/gradoop/flink/model/api/functions/AggregateFunction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,17 @@ default boolean isEdgeAggregation() {
8181
default PropertyValue postAggregate(PropertyValue result) {
8282
return result;
8383
}
84+
85+
/**
86+
* Add result to aggregated element.
87+
*
88+
* @param element aggregated element to add result to
89+
* @param aggregate aggregation result
90+
* @param <E> element type
91+
* @return aggregated element
92+
*/
93+
default <E extends Element> E applyResult(E element, PropertyValue aggregate) {
94+
element.setProperty(getAggregatePropertyKey(), aggregate);
95+
return element;
96+
}
8497
}

gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/aggregation/functions/AggregateTransactions.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ public class AggregateTransactions implements MapFunction<GraphTransaction, Grap
4444
* Set of aggregate edge functions.
4545
*/
4646
private final Set<AggregateFunction> edgeAggregateFunctions;
47-
/**
48-
* Set of aggregate default values.
49-
*/
50-
private final Map<String, PropertyValue> aggregateDefaultValues;
5147

5248
/**
5349
* Creates a new instance of a AggregateTransactions map function.
@@ -65,12 +61,6 @@ public AggregateTransactions(Set<AggregateFunction> aggregateFunctions) {
6561
edgeAggregateFunctions = aggregateFunctions.stream()
6662
.filter(AggregateFunction::isEdgeAggregation)
6763
.collect(Collectors.toSet());
68-
69-
aggregateDefaultValues = new HashMap<>();
70-
for (AggregateFunction func : aggregateFunctions) {
71-
aggregateDefaultValues.put(func.getAggregatePropertyKey(),
72-
AggregateUtil.getDefaultAggregate(func));
73-
}
7464
}
7565

7666
@Override
@@ -82,10 +72,9 @@ public GraphTransaction map(GraphTransaction graphTransaction) throws Exception
8272
for (AggregateFunction function : aggregateFunctions) {
8373
aggregate.computeIfPresent(function.getAggregatePropertyKey(),
8474
(k, v) -> function.postAggregate(v));
75+
function.applyResult(graphTransaction.getGraphHead(), aggregate
76+
.getOrDefault(function.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(function)));
8577
}
86-
aggregateDefaultValues.forEach(aggregate::putIfAbsent);
87-
88-
aggregate.forEach(graphTransaction.getGraphHead()::setProperty);
8978
return graphTransaction;
9079
}
9180

gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/aggregation/functions/BaseAggregateFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
*/
2525
public abstract class BaseAggregateFunction implements AggregateFunction {
2626
/**
27-
* Key of the aggregate property.
27+
* The key of the property where the aggregated result is saved.
2828
*/
2929
private String aggregatePropertyKey;
3030

3131
/**
3232
* Creates a new instance of a base aggregate function.
3333
*
34-
* @param aggregatePropertyKey aggregate property key
34+
* @param aggregatePropertyKey the aggregate property key
3535
*/
3636
public BaseAggregateFunction(String aggregatePropertyKey) {
3737
setAggregatePropertyKey(aggregatePropertyKey);

gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/aggregation/functions/SetAggregateProperties.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.gradoop.common.model.impl.properties.PropertyValue;
2424
import org.gradoop.flink.model.api.functions.AggregateFunction;
2525

26-
import java.util.HashMap;
2726
import java.util.Map;
2827
import java.util.Objects;
2928
import java.util.Set;
@@ -36,11 +35,6 @@
3635
public class SetAggregateProperties<G extends GraphHead>
3736
implements CoGroupFunction<G, Tuple2<GradoopId, Map<String, PropertyValue>>, G> {
3837

39-
/**
40-
* default values used to replace aggregate values in case of NULL.
41-
*/
42-
private final Map<String, PropertyValue> defaultValues;
43-
4438
/**
4539
* Aggregate functions from the aggregation step.
4640
*/
@@ -52,14 +46,7 @@ public class SetAggregateProperties<G extends GraphHead>
5246
* @param aggregateFunctions aggregate functions
5347
*/
5448
public SetAggregateProperties(final Set<AggregateFunction> aggregateFunctions) {
55-
56-
defaultValues = new HashMap<>();
5749
this.aggregateFunctions = Objects.requireNonNull(aggregateFunctions);
58-
59-
for (AggregateFunction func : aggregateFunctions) {
60-
Objects.requireNonNull(func);
61-
defaultValues.put(func.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(func));
62-
}
6350
}
6451

6552
@Override
@@ -74,13 +61,17 @@ public void coGroup(Iterable<G> left, Iterable<Tuple2<GradoopId, Map<String, Pro
7461
for (AggregateFunction function : aggregateFunctions) {
7562
values.computeIfPresent(function.getAggregatePropertyKey(),
7663
(k, v) -> function.postAggregate(v));
64+
function.applyResult(leftElem, values.getOrDefault(function.getAggregatePropertyKey(),
65+
AggregateUtil.getDefaultAggregate(function)));
7766
}
78-
values.forEach(leftElem::setProperty);
7967
out.collect(leftElem);
8068
rightEmpty = false;
8169
}
70+
// For example if the graph is empty
8271
if (rightEmpty) {
83-
defaultValues.forEach(leftElem::setProperty);
72+
for (AggregateFunction function : aggregateFunctions) {
73+
function.applyResult(leftElem, AggregateUtil.getDefaultAggregate(function));
74+
}
8475
out.collect(leftElem);
8576
}
8677
}

gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/aggregation/functions/SetAggregateProperty.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,6 @@ public class SetAggregateProperty<G extends GraphHead>
5151
*/
5252
private Map<String, PropertyValue> aggregateValues;
5353

54-
/**
55-
* map from aggregate property key to its default value
56-
* used to replace aggregate value in case of NULL.
57-
*/
58-
private final Map<String, PropertyValue> defaultValues;
59-
6054

6155
/**
6256
* Creates a new instance of a SetAggregateProperty rich map function.
@@ -65,13 +59,6 @@ public class SetAggregateProperty<G extends GraphHead>
6559
*/
6660
public SetAggregateProperty(Set<AggregateFunction> aggregateFunctions) {
6761
this.aggregateFunctions = Objects.requireNonNull(aggregateFunctions);
68-
69-
defaultValues = new HashMap<>();
70-
71-
for (AggregateFunction func : aggregateFunctions) {
72-
Objects.requireNonNull(func);
73-
defaultValues.put(func.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(func));
74-
}
7562
}
7663

7764
@SuppressWarnings("unchecked")
@@ -80,7 +67,7 @@ public void open(Configuration parameters) throws Exception {
8067
super.open(parameters);
8168

8269
if (getRuntimeContext().getBroadcastVariable(VALUE).isEmpty()) {
83-
aggregateValues = defaultValues;
70+
aggregateValues = new HashMap<>();
8471
} else {
8572
aggregateValues = (Map<String, PropertyValue>) getRuntimeContext()
8673
.getBroadcastVariable(VALUE).get(0);
@@ -89,13 +76,13 @@ public void open(Configuration parameters) throws Exception {
8976
aggregateValues.computeIfPresent(function.getAggregatePropertyKey(),
9077
(k, v) -> function.postAggregate(v));
9178
}
92-
defaultValues.forEach(aggregateValues::putIfAbsent);
9379
}
9480
}
9581

9682
@Override
9783
public G map(G graphHead) throws Exception {
98-
aggregateValues.forEach(graphHead::setProperty);
84+
aggregateFunctions.forEach(f -> f.applyResult(graphHead,
85+
aggregateValues.getOrDefault(f.getAggregatePropertyKey(), AggregateUtil.getDefaultAggregate(f))));
9986
return graphHead;
10087
}
10188
}

gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/keyedgrouping/functions/BuildSuperElementFromTuple.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ E setAggregatePropertiesAndKeys(E element, T tupleData) {
8989
final PropertyValue postAggregateValue = function.postAggregate(
9090
tupleData.getField(tupleDataOffset + keyFunctions.size() + i));
9191
if (postAggregateValue != null) {
92-
element.setProperty(function.getAggregatePropertyKey(), postAggregateValue.isNull() ?
92+
element = function.applyResult(element, postAggregateValue.isNull() ?
9393
AggregateUtil.getDefaultAggregate(function) : postAggregateValue);
9494
}
9595
}

gradoop-temporal/src/main/java/org/gradoop/temporal/model/api/TemporalGraphOperators.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package org.gradoop.temporal.model.api;
1717

1818
import org.gradoop.flink.model.api.epgm.BaseGraphOperators;
19+
import org.gradoop.flink.model.api.functions.AggregateFunction;
20+
import org.gradoop.flink.model.api.functions.KeyFunction;
1921
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
22+
import org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping;
2023
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
2124
import org.gradoop.temporal.model.api.functions.TemporalPredicate;
2225
import org.gradoop.temporal.model.impl.TemporalGraph;
@@ -28,6 +31,10 @@
2831
import org.gradoop.temporal.model.impl.functions.predicates.DeletedIn;
2932
import org.gradoop.temporal.model.impl.functions.predicates.FromTo;
3033
import org.gradoop.temporal.model.impl.functions.predicates.ValidDuring;
34+
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxEdgeTime;
35+
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MaxVertexTime;
36+
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinEdgeTime;
37+
import org.gradoop.temporal.model.impl.operators.aggregation.functions.MinVertexTime;
3138
import org.gradoop.temporal.model.impl.operators.diff.Diff;
3239
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
3340
import org.gradoop.temporal.model.impl.operators.matching.common.statistics.TemporalGraphStatistics;
@@ -39,6 +46,9 @@
3946
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
4047
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
4148

49+
import java.util.ArrayList;
50+
import java.util.List;
51+
4252
/**
4353
* Defines the operators that are available on a {@link TemporalGraph}.
4454
*/
@@ -336,6 +346,66 @@ default TemporalGraphCollection temporalQuery(String temporalGdlQuery, String co
336346
new CypherTemporalPatternMatching(temporalGdlQuery, constructionPattern, attachData, vertexStrategy,
337347
edgeStrategy, stats, new CNFPostProcessing()));
338348
}
349+
350+
/**
351+
* Grouping operator that aggregates valid times per group and sets it as new valid time.
352+
* The grouped validFrom value will be computed by min over all validFrom values.
353+
* The grouped validTo value will be computed by max over all validTo values.
354+
*
355+
* @param vertexGroupingKeys property keys to group vertices
356+
* @return summary graph
357+
* @see KeyedGrouping
358+
*/
359+
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys) {
360+
return temporalGroupBy(vertexGroupingKeys, null);
361+
}
362+
363+
/**
364+
* Grouping operator that aggregates valid times per group and sets it as new valid time.
365+
* The grouped validFrom value will be computed by min over all validFrom values.
366+
* The grouped validTo value will be computed by max over all validTo values.
367+
*
368+
* @param vertexGroupingKeys property keys to group vertices
369+
* @param edgeGroupingKeys property keys to group edges
370+
* @return summary graph
371+
* @see KeyedGrouping
372+
*/
373+
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
374+
List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys) {
375+
return temporalGroupBy(vertexGroupingKeys, new ArrayList<>(), edgeGroupingKeys, new ArrayList<>());
376+
}
377+
378+
/**
379+
* Grouping operator that aggregates valid times per group and sets it as new valid time.
380+
* The grouped validFrom value will be computed by min over all validFrom values.
381+
* The grouped validTo value will be computed by max over all validTo values.
382+
*
383+
* @param vertexGroupingKeys property keys to group vertices
384+
* @param vertexAggregateFunctions aggregate functions to apply on super vertices
385+
* @param edgeGroupingKeys property keys to group edges
386+
* @param edgeAggregateFunctions aggregate functions to apply on super edges
387+
* @return summary graph
388+
* @see KeyedGrouping
389+
*/
390+
default TemporalGraph temporalGroupBy(List<KeyFunction<TemporalVertex, ?>> vertexGroupingKeys,
391+
List<AggregateFunction> vertexAggregateFunctions, List<KeyFunction<TemporalEdge, ?>> edgeGroupingKeys,
392+
List<AggregateFunction> edgeAggregateFunctions) {
393+
// Add min/max valid time aggregations that will result in the new valid times
394+
List<AggregateFunction> tempVertexAgg = new ArrayList<>(vertexAggregateFunctions);
395+
tempVertexAgg.add(new MinVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
396+
.setAsValidTime(TimeDimension.Field.FROM));
397+
tempVertexAgg.add(new MaxVertexTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
398+
.setAsValidTime(TimeDimension.Field.TO));
399+
List<AggregateFunction> tempEdgeAgg = new ArrayList<>(edgeAggregateFunctions);
400+
tempEdgeAgg.add(new MinEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.FROM)
401+
.setAsValidTime(TimeDimension.Field.FROM));
402+
tempEdgeAgg.add(new MaxEdgeTime(TimeDimension.VALID_TIME, TimeDimension.Field.TO)
403+
.setAsValidTime(TimeDimension.Field.TO));
404+
405+
return callForGraph(new KeyedGrouping<>(vertexGroupingKeys, tempVertexAgg, edgeGroupingKeys,
406+
tempEdgeAgg));
407+
}
408+
339409
//----------------------------------------------------------------------------
340410
// Utilities
341411
//----------------------------------------------------------------------------

0 commit comments

Comments
 (0)