Skip to content

Commit 09b9d01

Browse files
authored
[#1569] Added vertex-centric min/max/avg degree operator (#1581)
fixes #1569
1 parent 5f1b5d5 commit 09b9d01

File tree

13 files changed

+883
-0
lines changed

13 files changed

+883
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.flink.model.impl.functions.epgm;
17+
18+
import org.gradoop.common.model.api.entities.Edge;
19+
import org.gradoop.common.model.impl.id.GradoopId;
20+
import org.gradoop.flink.model.impl.functions.filters.CombinableFilter;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* Filters edges having the specified vertex id as source or target.
26+
*
27+
* @param <E> the edge type
28+
*/
29+
public class BySourceOrTargetId<E extends Edge> implements CombinableFilter<E> {
30+
/**
31+
* Vertex id to filter on
32+
*/
33+
private final GradoopId vertexId;
34+
35+
/**
36+
* Creates a new filter instance.
37+
*
38+
* @param vertexId vertex id to filter on
39+
*/
40+
public BySourceOrTargetId(GradoopId vertexId) {
41+
this.vertexId = Objects.requireNonNull(vertexId);
42+
}
43+
44+
@Override
45+
public boolean filter(E e) throws Exception {
46+
return e.getSourceId().equals(vertexId) || e.getTargetId().equals(vertexId);
47+
}
48+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.flink.model.impl.functions.tuple;
17+
18+
import org.apache.flink.api.common.functions.MapFunction;
19+
import org.apache.flink.api.java.tuple.Tuple1;
20+
21+
/**
22+
* A map function casting each Tuple1 from Integer to Double.
23+
*/
24+
public class CastTuple1IntToDouble implements MapFunction<Tuple1<Integer>, Tuple1<Double>> {
25+
@Override
26+
public Tuple1<Double> map(Tuple1<Integer> tuple1) throws Exception {
27+
return new Tuple1<>((double) tuple1.f0);
28+
}
29+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.temporal.model.impl.operators.metric;
17+
18+
import org.apache.flink.api.java.DataSet;
19+
import org.apache.flink.api.java.aggregation.Aggregations;
20+
import org.apache.flink.api.java.tuple.Tuple1;
21+
import org.apache.flink.api.java.tuple.Tuple4;
22+
import org.gradoop.common.model.impl.id.GradoopId;
23+
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
24+
import org.gradoop.flink.model.impl.functions.epgm.BySourceOrTargetId;
25+
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
26+
import org.gradoop.temporal.model.api.TimeDimension;
27+
import org.gradoop.temporal.model.impl.TemporalGraph;
28+
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
29+
import org.gradoop.temporal.model.impl.operators.metric.functions.FilterEdgesInInterval;
30+
import org.gradoop.flink.model.impl.functions.tuple.CastTuple1IntToDouble;
31+
import org.gradoop.temporal.model.impl.operators.metric.functions.MapCalculatePartialAverageDegree;
32+
import org.gradoop.temporal.model.impl.operators.metric.functions.MapCalculateAverageDegree;
33+
34+
import java.util.Objects;
35+
36+
/**
37+
* An abstract base class which calculates the minimum, maximum or average degree of a given vertex
38+
* referenced via its {@code vertexId} within a given time interval: start {@code queryFrom},
39+
* end {@code queryTo}. The result is a single value (Double) in a DataSet. This class has three
40+
* subclasses for each aggregation type (min, max and average).
41+
* <p>
42+
* The type of the degree (IN, OUT, BOTH) can be chosen by the arguments.
43+
*/
44+
public abstract class BaseVertexCentricDegreeEvolution
45+
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple1<Double>>> {
46+
/**
47+
* The time dimension that will be considered.
48+
*/
49+
private final TimeDimension dimension;
50+
/**
51+
* The degree type (IN, OUT, BOTH);
52+
*/
53+
private final VertexDegree degreeType;
54+
/**
55+
* The vertex to be considered.
56+
*/
57+
private final GradoopId vertexId;
58+
/**
59+
* The start of the interval specified by the user.
60+
*/
61+
private final Long queryFrom;
62+
/**
63+
* The end of the interval specified by the user.
64+
*/
65+
private final Long queryTo;
66+
/**
67+
* The type of aggregation to be performed (min or max)
68+
*/
69+
private final AggregationType aggregationType;
70+
71+
/**
72+
* Creates an instance of this temporal vertex degree aggregation operator.
73+
*
74+
* @param degreeType the degree type to consider
75+
* @param dimension the time dimension to consider
76+
* @param vertexId the id of the vertex to consider
77+
* @param queryFrom the start of the interval
78+
* @param queryTo the end of the interval
79+
* @param aggregationType the type of aggregation (min, max or avg)
80+
*/
81+
public BaseVertexCentricDegreeEvolution(VertexDegree degreeType, TimeDimension dimension,
82+
GradoopId vertexId, Long queryFrom, Long queryTo, AggregationType aggregationType) {
83+
this.degreeType = Objects.requireNonNull(degreeType);
84+
this.dimension = Objects.requireNonNull(dimension);
85+
this.vertexId = Objects.requireNonNull(vertexId);
86+
this.queryFrom = Objects.requireNonNull(queryFrom);
87+
this.queryTo = Objects.requireNonNull(queryTo);
88+
this.aggregationType = Objects.requireNonNull(aggregationType);
89+
}
90+
91+
@Override
92+
public DataSet<Tuple1<Double>> execute(TemporalGraph graph) {
93+
// Find relevant subgraph (vertex and all its edges)
94+
TemporalGraph subGraph = graph
95+
.edgeInducedSubgraph(new BySourceOrTargetId<>(vertexId));
96+
97+
// Apply TemporalVertexDegree on subgraph
98+
TemporalVertexDegree temporalVertexDegree = new TemporalVertexDegree(degreeType, dimension);
99+
temporalVertexDegree.setIncludeVertexTime(true);
100+
DataSet<Tuple4<GradoopId, Long, Long, Integer>> filteredEdges = temporalVertexDegree.execute(subGraph)
101+
// Find relevant edges which exist within the given time
102+
.filter(new FilterEdgesInInterval(queryFrom, queryTo, vertexId));
103+
104+
switch (aggregationType) {
105+
case MIN:
106+
return filteredEdges
107+
// Group dataset and find minimum degree
108+
.groupBy(0)
109+
.aggregate(Aggregations.MIN, 3)
110+
// get field 3 which contains the minimum degree
111+
.<Tuple1<Integer>>project(3)
112+
// Convert to Double
113+
.map(new CastTuple1IntToDouble());
114+
case MAX:
115+
return filteredEdges
116+
// group dataset and find maximum degree
117+
.groupBy(0)
118+
.aggregate(Aggregations.MAX, 3)
119+
// get field 3 which contains the maximum degree
120+
.<Tuple1<Integer>>project(3)
121+
// Convert to Double
122+
.map(new CastTuple1IntToDouble());
123+
case AVG:
124+
return filteredEdges
125+
// Map each tuple to an interim result from which we can calculate the overall average degree
126+
.map(new MapCalculatePartialAverageDegree(queryFrom, queryTo))
127+
// Group dataset and sum all the interim results from before
128+
.groupBy(0)
129+
.aggregate(Aggregations.SUM, 1)
130+
// Now divide this sum by the length of the time interval
131+
.map(new MapCalculateAverageDegree(queryFrom, queryTo));
132+
default:
133+
throw new IllegalArgumentException("Aggregate type not specified.");
134+
}
135+
}
136+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.temporal.model.impl.operators.metric;
17+
18+
import org.gradoop.common.model.impl.id.GradoopId;
19+
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
20+
import org.gradoop.temporal.model.api.TimeDimension;
21+
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
22+
23+
/**
24+
* A TPGM operator which calculates the average degree of a given vertex referenced via its {@code vertexId}
25+
* within a given time interval: start {@code queryFrom}, end {@code queryTo}. The result is a single
26+
* value (Double). The logic is implemented in the superclass {@link BaseVertexCentricDegreeEvolution}.
27+
* <p>
28+
* The type of the degree (IN, OUT, BOTH) can be chosen by the arguments.
29+
*/
30+
public class VertexCentricAverageDegreeEvolution extends BaseVertexCentricDegreeEvolution {
31+
/**
32+
* Creates an instance of this temporal average vertex degree operator.
33+
*
34+
* @param degreeType the degree type to consider
35+
* @param dimension the time dimension to consider
36+
* @param vertexId the id of the vertex to consider
37+
* @param queryFrom the start of the interval
38+
* @param queryTo the end of the interval
39+
*/
40+
public VertexCentricAverageDegreeEvolution(VertexDegree degreeType, TimeDimension dimension,
41+
GradoopId vertexId, Long queryFrom, Long queryTo) {
42+
super(degreeType, dimension, vertexId, queryFrom, queryTo, AggregationType.AVG);
43+
}
44+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.temporal.model.impl.operators.metric;
17+
18+
import org.gradoop.common.model.impl.id.GradoopId;
19+
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
20+
import org.gradoop.temporal.model.api.TimeDimension;
21+
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
22+
23+
/**
24+
* A TPGM operator calculating the maximum degree of a given vertex referenced via its {@code vertexId}
25+
* within a given time interval: start {@code queryFrom}, end {@code queryTo}. The result is a single
26+
* value (Double). The logic is implemented in the superclass {@link BaseVertexCentricDegreeEvolution}.
27+
* <p>
28+
* The type of the degree (IN, OUT, BOTH) can be chosen by the arguments.
29+
*/
30+
public class VertexCentricMaxDegreeEvolution extends BaseVertexCentricDegreeEvolution {
31+
/**
32+
* Creates an instance of this temporal vertex-centric maximum degree aggregation operator.
33+
*
34+
* @param degreeType the degree type to consider
35+
* @param dimension the time dimension to consider
36+
* @param vertexId the id of the vertex to consider
37+
* @param queryFrom the start of the interval
38+
* @param queryTo the end of the interval
39+
*/
40+
public VertexCentricMaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension,
41+
GradoopId vertexId, Long queryFrom, Long queryTo) {
42+
super(degreeType, dimension, vertexId, queryFrom, queryTo, AggregationType.MAX);
43+
}
44+
}
45+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.temporal.model.impl.operators.metric;
17+
18+
import org.gradoop.common.model.impl.id.GradoopId;
19+
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
20+
import org.gradoop.temporal.model.api.TimeDimension;
21+
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
22+
23+
/**
24+
* A TPGM operator calculating the minimum degree of a given vertex referenced via its {@code vertexId}
25+
* within a given time interval: start {@code queryFrom}, end {@code queryTo}. The result is a single
26+
* value (Double). The logic is implemented in the superclass {@link BaseVertexCentricDegreeEvolution}.
27+
* <p>
28+
* The type of the degree (IN, OUT, BOTH) can be chosen by the arguments.
29+
*/
30+
public class VertexCentricMinDegreeEvolution extends BaseVertexCentricDegreeEvolution {
31+
/**
32+
* Creates an instance of this temporal vertex-centric minimum degree aggregation operator.
33+
*
34+
* @param degreeType the degree type to consider
35+
* @param dimension the time dimension to consider
36+
* @param vertexId the id of the vertex to consider
37+
* @param queryFrom the start of the interval
38+
* @param queryTo the end of the interval
39+
*/
40+
public VertexCentricMinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension,
41+
GradoopId vertexId, Long queryFrom, Long queryTo) {
42+
super(degreeType, dimension, vertexId, queryFrom, queryTo, AggregationType.MIN);
43+
}
44+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.temporal.model.impl.operators.metric.functions;
17+
18+
/**
19+
* Enum which holds three types of aggregation.
20+
*/
21+
public enum AggregationType {
22+
/**
23+
* Aggregate for minimum.
24+
*/
25+
MIN,
26+
/**
27+
* Aggregate for maximum.
28+
*/
29+
MAX,
30+
/**
31+
* Aggregate for average.
32+
*/
33+
AVG
34+
}

0 commit comments

Comments
 (0)