Skip to content

Commit 11fb36b

Browse files
authored
[#1459] Added data sink/source for Parquet (TPGM+EPGM) (#1586)
[#1459] DataSource and -Sink for Parquet and Parquet+Protobuf added for EPGM and TPGM
1 parent 83f48a5 commit 11fb36b

File tree

72 files changed

+4347
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+4347
-1
lines changed

gradoop-checkstyle/src/main/resources/gradoop/checkstyle-suppressions.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,8 @@
5252
files="[/\\]src[/\\]test[/\\]java[/\\]"/>
5353
<suppress checks="StaticVariableName"
5454
files="[/\\]src[/\\]test[/\\]java[/\\]"/>
55+
56+
<!-- no checkstyle for auto generated code -->
57+
<suppress checks=".*"
58+
files="[/\\]target[/\\]generated-sources[/\\]"/>
5559
</suppressions>

gradoop-flink/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@
5454

5555
<build>
5656
<plugins>
57+
<plugin>
58+
<groupId>com.github.os72</groupId>
59+
<artifactId>protoc-jar-maven-plugin</artifactId>
60+
</plugin>
5761
<plugin>
5862
<groupId>org.apache.maven.plugins</groupId>
5963
<artifactId>maven-compiler-plugin</artifactId>
@@ -124,6 +128,12 @@
124128
<artifactId>flink-shaded-hadoop-2</artifactId>
125129
</dependency>
126130

131+
<!-- Parquet -->
132+
<dependency>
133+
<groupId>org.apache.parquet</groupId>
134+
<artifactId>parquet-protobuf</artifactId>
135+
</dependency>
136+
127137
<!-- Test dependencies -->
128138

129139
<!-- Gradoop -->
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.flink.io.impl.parquet.common;
17+
18+
import org.apache.flink.api.common.typeinfo.TypeInformation;
19+
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase;
20+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
21+
import org.apache.hadoop.mapreduce.InputFormat;
22+
import org.apache.hadoop.mapreduce.Job;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats which don't provide a key with
28+
* Flink.
29+
*
30+
* @param <T> Value Type
31+
*/
32+
public class HadoopValueInputFormat<T> extends HadoopInputFormatBase<Void, T, T> implements
33+
ResultTypeQueryable<T> {
34+
35+
/**
36+
* Creates a new Flink input format.
37+
*
38+
* @param mapreduceInputFormat Hadoop (mapreduce) input format
39+
* @param value value type class
40+
* @param job job instance for configuration
41+
*/
42+
public HadoopValueInputFormat(InputFormat<Void, T> mapreduceInputFormat, Class<T> value, Job job) {
43+
super(mapreduceInputFormat, Void.class, value, job);
44+
}
45+
46+
@Override
47+
public T nextRecord(T record) throws IOException {
48+
if (!this.fetched) {
49+
fetchNext();
50+
}
51+
if (!this.hasNext) {
52+
return null;
53+
}
54+
try {
55+
record = recordReader.getCurrentValue();
56+
} catch (InterruptedException e) {
57+
throw new IOException("Could not get KeyValue pair.", e);
58+
}
59+
this.fetched = false;
60+
61+
return record;
62+
}
63+
64+
@Override
65+
public TypeInformation<T> getProducedType() {
66+
return TypeInformation.of(this.valueClass);
67+
}
68+
69+
@Override
70+
public String toString() {
71+
String jobName = this.getConfiguration().get("mapreduce.job.name");
72+
if (jobName != null) {
73+
return String.format("HadoopValueInputFormat[%s]", jobName);
74+
}
75+
return super.toString();
76+
}
77+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.flink.io.impl.parquet.common;
17+
18+
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase;
19+
import org.apache.hadoop.mapreduce.Job;
20+
import org.apache.hadoop.mapreduce.OutputFormat;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats which don't provide a key
26+
* with Flink.
27+
*
28+
* @param <T> Value Type
29+
*/
30+
public class HadoopValueOutputFormat<T> extends HadoopOutputFormatBase<Void, T, T> {
31+
32+
/**
33+
* Creates a new Flink output format.
34+
*
35+
* @param mapreduceOutputFormat Hadoop (mapreduce) output format
36+
* @param job job instance for configuration
37+
*/
38+
public HadoopValueOutputFormat(OutputFormat<Void, T> mapreduceOutputFormat, Job job) {
39+
super(mapreduceOutputFormat, job);
40+
}
41+
42+
@Override
43+
public void writeRecord(T record) throws IOException {
44+
try {
45+
this.recordWriter.write(null, record);
46+
} catch (InterruptedException e) {
47+
throw new IOException("Could not write Record.", e);
48+
}
49+
}
50+
51+
@Override
52+
public String toString() {
53+
String jobName = this.getConfiguration().get("mapreduce.job.name");
54+
if (jobName != null) {
55+
return String.format("HadoopValueOutputFormat[%s]", jobName);
56+
}
57+
return super.toString();
58+
}
59+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.gradoop.flink.io.impl.parquet.common;
17+
18+
import org.apache.hadoop.mapreduce.JobContext;
19+
import org.apache.hadoop.mapreduce.RecordWriter;
20+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
21+
import org.apache.parquet.hadoop.ParquetFileWriter;
22+
import org.apache.parquet.hadoop.ParquetOutputFormat;
23+
import org.apache.parquet.hadoop.util.ContextUtil;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* Extension of the {@link ParquetOutputFormat<T>} that allows specifying a file creation mode.
29+
*
30+
* @param <T> the type of the materialized records
31+
*/
32+
public class ParquetOutputFormatWithMode<T> extends ParquetOutputFormat<T> {
33+
34+
/**
35+
* File creation mode configuration key
36+
*/
37+
private static final String PARQUET_WRITER_MODE = "parquet.writer.mode";
38+
39+
/**
40+
* Sets the file creation mode of a {@link JobContext}.
41+
*
42+
* @param context the job's context
43+
* @param mode the file creation mode
44+
*/
45+
public static void setFileCreationMode(JobContext context, ParquetFileWriter.Mode mode) {
46+
ContextUtil.getConfiguration(context).setEnum(PARQUET_WRITER_MODE, mode);
47+
}
48+
49+
/**
50+
* Gets the file creation mode of a {@link JobContext}.
51+
*
52+
* @param context the job's context
53+
* @return the specified mode or defaults to {@link ParquetFileWriter.Mode#CREATE} if not found
54+
*/
55+
public static ParquetFileWriter.Mode getFileCreationMode(JobContext context) {
56+
return ContextUtil.getConfiguration(context).getEnum(PARQUET_WRITER_MODE, ParquetFileWriter.Mode.CREATE);
57+
}
58+
59+
@Override
60+
public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException,
61+
InterruptedException {
62+
return super.getRecordWriter(taskAttemptContext, getFileCreationMode(taskAttemptContext));
63+
}
64+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Contains all commonly used classes related to parquet input and output to Flink.
19+
*/
20+
package org.gradoop.flink.io.impl.parquet.common;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
/**
17+
* Contains all classes related to parquet input and output to Flink.
18+
*/
19+
package org.gradoop.flink.io.impl.parquet;

0 commit comments

Comments
 (0)