Skip to content

Commit 58ea846

Browse files
committed
HTTP99 address review feedback
Signed-off-by: davidradl <[email protected]>
1 parent 7c81ffe commit 58ea846

File tree

5 files changed

+91
-159
lines changed

5 files changed

+91
-159
lines changed

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java

Lines changed: 22 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.nio.charset.StandardCharsets;
1111
import java.util.*;
1212

13+
import lombok.Builder;
14+
import lombok.extern.slf4j.Slf4j;
1315
import org.apache.flink.annotation.VisibleForTesting;
1416
import org.apache.flink.api.common.serialization.SerializationSchema;
1517
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -23,8 +25,6 @@
2325
import org.apache.flink.types.Row;
2426
import org.apache.flink.util.FlinkRuntimeException;
2527
import org.apache.flink.util.Preconditions;
26-
import org.apache.logging.log4j.LogManager;
27-
import org.apache.logging.log4j.Logger;
2828

2929
import com.getindata.connectors.http.LookupArg;
3030
import com.getindata.connectors.http.LookupQueryCreator;
@@ -53,9 +53,10 @@
5353
* http/base/aaaa will be issued.
5454
*
5555
*/
56+
@Slf4j
57+
@Builder
5658
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
5759
private static final long serialVersionUID = 1L;
58-
private static final Logger log = LogManager.getLogger(GenericJsonAndUrlQueryCreator.class);
5960

6061
// not final so we can mutate for unit test
6162
private SerializationSchema<RowData> serializationSchema;
@@ -66,30 +67,6 @@ public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
6667
private final List<String> requestBodyFields;
6768
private final Map<String, String> requestUrlMap;
6869

69-
/**
70-
* Construct a Generic JSON and URL query creator.
71-
*
72-
* @param httpMethod the requested http method
73-
* @param serializationSchema serialization schema for RowData
74-
* @param requestQueryParamsFields query param fields
75-
* @param requestBodyFields body fields used for PUT and POSTs
76-
* @param requestUrlMap url map
77-
* @param lookupRow lookup row itself.
78-
*/
79-
public GenericJsonAndUrlQueryCreator(final String httpMethod,
80-
final SerializationSchema<RowData>
81-
serializationSchema,
82-
final List<String> requestQueryParamsFields,
83-
final List<String> requestBodyFields,
84-
final Map<String, String> requestUrlMap,
85-
final LookupRow lookupRow) {
86-
this.httpMethod = httpMethod;
87-
this.serializationSchema = serializationSchema;
88-
this.lookupRow = lookupRow;
89-
this.requestQueryParamsFields = requestQueryParamsFields;
90-
this.requestBodyFields = requestBodyFields;
91-
this.requestUrlMap = requestUrlMap;
92-
}
9370
@VisibleForTesting
9471
void setSerializationSchema(SerializationSchema<RowData>
9572
serializationSchema) {
@@ -109,9 +86,7 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
10986
jsonObject = (ObjectNode) ObjectMapperAdapter.instance().readTree(
11087
serializationSchema.serialize(lookupDataRow));
11188
} catch (IOException e) {
112-
String message = "Unable to parse the lookup arguments to json.";
113-
log.error(message, e);
114-
throw new RuntimeException(message, e);
89+
throw new RuntimeException("Unable to parse the lookup arguments to json.", e);
11590
}
11691
// Parameters are encoded as query params for GET and none GET.
11792
// Later code will turn these query params into the body for PUTs and POSTs
@@ -132,17 +107,15 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
132107
lookupQuery = ObjectMapperAdapter.instance()
133108
.writeValueAsString(jsonObject.retain(requestBodyFields));
134109
} catch (JsonProcessingException e) {
135-
final String message = "Unable to convert Json Object to a string";
136-
log.error(message, e);
137-
throw new RuntimeException(message,e);
110+
throw new RuntimeException("Unable to convert Json Object to a string", e);
138111
}
139112
// body parameters
140113
// use the request json object to scope the required fields and the lookupArgs as values
141114
bodyBasedUrlQueryParams = createBodyBasedParams(lookupArgs,
142115
jsonObjectForQueryParams);
143116
}
144117
// add the path map
145-
final Map<String, String> pathBasedUrlParams = createPathBasedParams(lookupArgs,
118+
final Map<String, String> pathBasedUrlParams = createURLPathBasedParams(lookupArgs,
146119
requestUrlMap);
147120

148121
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams, pathBasedUrlParams);
@@ -195,26 +168,25 @@ private Map<String, String> createBodyBasedParams(final Collection<LookupArg> ar
195168
return mapOfJsonKeyToLookupArg;
196169
}
197170
/**
198-
* Create map of the json key to the lookup argument
199-
* value. This is used for body based content.
171+
* Create map of insert name to column name for path inserts
200172
* @param args lookup arguments
201173
* @param urlMap map of insert name to column name
202174
* @return map of field content to the lookup argument value.
203175
*/
204-
private Map<String, String> createPathBasedParams(final Collection<LookupArg> args,
205-
Map<String, String> urlMap ) {
206-
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
176+
private Map<String, String> createURLPathBasedParams(final Collection<LookupArg> args,
177+
Map<String, String> urlMap ) {
178+
Map<String, String> mapOfinsertKeyToLookupArg = new LinkedHashMap<>();
207179
if (urlMap != null) {
208180
for (String key: urlMap.keySet()) {
209181
for (final LookupArg arg : args) {
210182
if (arg.getArgName().equals(key)) {
211-
mapOfJsonKeyToLookupArg.put(
183+
mapOfinsertKeyToLookupArg.put(
212184
urlMap.get(key), arg.getArgValue());
213185
}
214186
}
215187
}
216188
}
217-
return mapOfJsonKeyToLookupArg;
189+
return mapOfinsertKeyToLookupArg;
218190
}
219191
/**
220192
* Convert json object to query params string
@@ -235,13 +207,13 @@ static String convertToQueryParameters(final ObjectNode jsonObject, String enc)
235207
result.add(fieldName + "="
236208
+ URLEncoder.encode(fieldValue, enc));
237209
} catch (UnsupportedEncodingException e) {
238-
final String message =
239-
"Failed to encode the value of the query parameter name "
240-
+ fieldName
241-
+ ": "
242-
+ fieldValue;
243-
log.error(message, e);
244-
throw new RuntimeException(message, e);
210+
throw new RuntimeException("Failed to encode the value of the query parameter name "
211+
+ fieldName
212+
+ ": "
213+
+ fieldValue
214+
+ " using encoding "
215+
+ enc,
216+
e);
245217
}
246218
});
247219

@@ -257,11 +229,8 @@ private void checkOpened() {
257229
GenericJsonAndUrlQueryCreator.class));
258230
this.schemaOpened = true;
259231
} catch (final Exception e) {
260-
final String message =
261-
"Failed to initialize serialization schema for "
262-
+ GenericJsonAndUrlQueryCreator.class;
263-
log.error(message, e);
264-
throw new FlinkRuntimeException(message, e);
232+
throw new FlinkRuntimeException("Failed to initialize serialization schema for "
233+
+ GenericJsonAndUrlQueryCreator.class, e);
265234
}
266235
}
267236
}

src/main/java/com/getindata/connectors/http/internal/utils/ApiLookupConstants.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class CustomFormatFactory implements SerializationFormatFactory {
2020
public static final String REQUIRED_OPTION = "required-option-one";
2121

2222
/**
23-
* Consider removing this static only used for testing only
23+
* Used for testing custom format.
2424
*/
2525
static boolean requiredOptionsWereUsed = false;
2626

src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java

Lines changed: 66 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.flink.table.types.DataType;
2121
import org.apache.flink.types.Row;
2222
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.params.ParameterizedTest;
24+
import org.junit.jupiter.params.provider.ValueSource;
2325
import static org.assertj.core.api.Assertions.assertThat;
2426
import static org.junit.jupiter.api.Assertions.assertThrows;
2527

@@ -30,84 +32,67 @@
3032
import static com.getindata.connectors.http.internal.table.lookup.querycreators.QueryCreatorUtils.getTableContext;
3133

3234
class GenericJsonAndUrlQueryCreatorTest {
33-
enum Operation {
34-
Get("GET"),
35-
Put("PUT"),
36-
Post("POST")
37-
;
38-
public final String label;
39-
Operation() {
40-
this("GET");
41-
}
4235

43-
Operation(String label) {
44-
this.label = label;
45-
}
46-
}
47-
@Test
48-
public void createLookupQueryTestStrAllOps() {
49-
for (Operation op: Operation.values()) {
50-
String operation = op.label;
51-
String key = "key1";
52-
String value = "val1";
53-
// for GET this is the minimum config
54-
List<String> query_params = List.of(key);
55-
// Path param ArgPath required a stringified json object. As we have PersonBean
56-
// we can use that.
57-
Map<String, String> url_params = Map.of(key, key);
58-
LookupRow lookupRow = new LookupRow()
59-
.addLookupEntry(
60-
new RowDataSingleValueLookupSchemaEntry(
61-
key,
62-
RowData.createFieldGetter(
63-
DataTypes.STRING().getLogicalType(), 0)
64-
));
65-
DataType dataType = row(List.of(
66-
DataTypes.FIELD(key, DataTypes.STRING())
67-
));
68-
lookupRow.setLookupPhysicalRowDataType(dataType);
69-
ResolvedSchema resolvedSchema = ResolvedSchema.of(Column.physical(key,
70-
DataTypes.STRING()));
71-
Configuration config = new Configuration();
72-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS,
36+
@ParameterizedTest
37+
@ValueSource(strings = {"GET", "PUT", "POST" })
38+
public void createLookupQueryTestStrAllOps(String operation) {
39+
String key = "key1";
40+
String value = "val1";
41+
// for GET this is the minimum config
42+
List<String> query_params = List.of(key);
43+
// Path param ArgPath required a stringified json object. As we have PersonBean
44+
// we can use that.
45+
Map<String, String> url_params = Map.of(key, key);
46+
LookupRow lookupRow = new LookupRow()
47+
.addLookupEntry(
48+
new RowDataSingleValueLookupSchemaEntry(
49+
key,
50+
RowData.createFieldGetter(
51+
DataTypes.STRING().getLogicalType(), 0)
52+
));
53+
DataType dataType = row(List.of(
54+
DataTypes.FIELD(key, DataTypes.STRING())
55+
));
56+
lookupRow.setLookupPhysicalRowDataType(dataType);
57+
ResolvedSchema resolvedSchema = ResolvedSchema.of(Column.physical(key,
58+
DataTypes.STRING()));
59+
Configuration config = new Configuration();
60+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS,
61+
query_params);
62+
if (!operation.equals("GET")) {
63+
// add the body content for PUT and POST
64+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
7365
query_params);
74-
if (!op.equals("GET")) {
75-
// add the body content for PUT and POST
76-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
77-
query_params);
78-
}
79-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, url_params);
80-
config.setString(LOOKUP_METHOD, operation);
81-
// GIVEN
82-
83-
GenericJsonAndUrlQueryCreator universalJsonQueryCreator =
84-
(GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory()
85-
.createLookupQueryCreator(
86-
config,
87-
lookupRow,
88-
getTableContext(config,
89-
resolvedSchema)
90-
);
91-
var row = new GenericRowData(1);
92-
row.setField(0, StringData.fromString(value));
93-
var createdQuery = universalJsonQueryCreator.createLookupQuery(row);
94-
// THEN
95-
if (operation.equals("GET")) {
96-
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
97-
assertThat(createdQuery.getLookupQuery()).isEqualTo(key + "=" + value);
98-
} else {
99-
assertThat(createdQuery
100-
.getBodyBasedUrlQueryParameters())
101-
.isEqualTo(key + "=" + value);
102-
assertThat(createdQuery.getLookupQuery()).isEqualTo(
103-
"{\""
104-
+ key
105-
+ "\":\"" + value
106-
+ "\"}");
107-
}
108-
assertThat(createdQuery.getPathBasedUrlParameters().size() == 1).isTrue();
109-
assertThat(createdQuery.getPathBasedUrlParameters().get(key)).isEqualTo(value);
11066
}
67+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, url_params);
68+
config.setString(LOOKUP_METHOD, operation);
69+
GenericJsonAndUrlQueryCreator universalJsonQueryCreator =
70+
(GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory()
71+
.createLookupQueryCreator(
72+
config,
73+
lookupRow,
74+
getTableContext(config,
75+
resolvedSchema)
76+
);
77+
var row = new GenericRowData(1);
78+
row.setField(0, StringData.fromString(value));
79+
var createdQuery = universalJsonQueryCreator.createLookupQuery(row);
80+
// THEN
81+
if (operation.equals("GET")) {
82+
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
83+
assertThat(createdQuery.getLookupQuery()).isEqualTo(key + "=" + value);
84+
} else {
85+
assertThat(createdQuery
86+
.getBodyBasedUrlQueryParameters())
87+
.isEqualTo(key + "=" + value);
88+
assertThat(createdQuery.getLookupQuery()).isEqualTo(
89+
"{\""
90+
+ key
91+
+ "\":\"" + value
92+
+ "\"}");
93+
}
94+
assertThat(createdQuery.getPathBasedUrlParameters().size() == 1).isTrue();
95+
assertThat(createdQuery.getPathBasedUrlParameters().get(key)).isEqualTo(value);
11196
}
11297
@Test
11398
public void createLookupQueryTest() {
@@ -167,7 +152,6 @@ public void failserializationOpenTest() {
167152
List<String> paths_config =List.of("key1");
168153
String operation = "GET";
169154
String key = "key1";
170-
String value = "val1";
171155

172156
LookupRow lookupRow = new LookupRow()
173157
.addLookupEntry(
@@ -183,7 +167,6 @@ public void failserializationOpenTest() {
183167
Configuration config = new Configuration();
184168
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS, paths_config);
185169
config.setString(LOOKUP_METHOD, operation);
186-
// GIVEN
187170

188171
lookupRow.setLookupPhysicalRowDataType(dataType);
189172

@@ -213,15 +196,19 @@ public byte[] serialize(RowData element) {
213196
});
214197
}
215198
@Test void convertToQueryParametersUnsupportedEncodingTest() {
199+
// GIVEN
216200
ObjectMapper mapper = ObjectMapperAdapter.instance();
217201
PersonBean person = new PersonBean("aaa", "bbb");
202+
// WHEN
218203
JsonNode personNode = mapper.valueToTree(person);
204+
// THEN
219205
assertThrows(RuntimeException.class, () -> {
220206
GenericJsonAndUrlQueryCreator.convertToQueryParameters(
221207
(ObjectNode) personNode, "bad encoding");
222208
});
223209
}
224210
@Test void rowDataToRowTest() {
211+
// GIVEN
225212
// String
226213
String KEY1 = "key1";
227214
String KEY2 = "key2";
@@ -238,7 +225,9 @@ public byte[] serialize(RowData element) {
238225
DataTypes.FIELD(KEY2, DataTypes.DATE()),
239226
DataTypes.FIELD(KEY3, DataTypes.TIMESTAMP_LTZ())
240227
));
228+
// WHEN
241229
Row row = GenericJsonAndUrlQueryCreator.rowDataToRow(rowData, dataType);
230+
// THEN
242231
assertThat(row.getField(KEY1).equals(VALUE));
243232
assertThat(row.getField(KEY2).equals("1970-01-01T00:00:00.010"));
244233
assertThat(row.getField(KEY3).equals("1970-01-01T00:00:00.010Z"));

0 commit comments

Comments
 (0)