1- <<<<<<< HEAD
2- =======
31/*
42 * © Copyright IBM Corp. 2025
53 */
64
7- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
85package com .getindata .connectors .http .internal .table .lookup .querycreators ;
96
107import java .io .IOException ;
1310import java .nio .charset .StandardCharsets ;
1411import java .util .*;
1512
16- <<<<<<< HEAD
17- import lombok .Builder ;
1813import lombok .extern .slf4j .Slf4j ;
19- =======
20- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
2114import org .apache .flink .annotation .VisibleForTesting ;
2215import org .apache .flink .api .common .serialization .SerializationSchema ;
2316import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
2417import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
2518import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .ObjectNode ;
26- <<<<<<< HEAD
27- import org .apache .flink .table .data .RowData ;
28- import org .apache .flink .util .FlinkRuntimeException ;
29- import org .apache .flink .util .Preconditions ;
30- =======
3119import org .apache .flink .table .api .DataTypes .Field ;
3220import org .apache .flink .table .data .GenericRowData ;
3321import org .apache .flink .table .data .RowData ;
3624import org .apache .flink .types .Row ;
3725import org .apache .flink .util .FlinkRuntimeException ;
3826import org .apache .flink .util .Preconditions ;
39- import org .apache .logging .log4j .LogManager ;
40- import org .apache .logging .log4j .Logger ;
41- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
4227
4328import com .getindata .connectors .http .LookupArg ;
4429import com .getindata .connectors .http .LookupQueryCreator ;
4732import com .getindata .connectors .http .internal .utils .SerializationSchemaUtils ;
4833
4934/**
50- <<<<<<< HEAD
51- * <p>Generic JSON and URL query creator; in addition to be able to map columns to json requests,
52- * it allows url inserts to be mapped to column names using templating.</p>
53- * <p>For GETs, column names are mapped to query parameters. e.g. for
54- * <code>GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS</code> = "id1;id2"
55- * and url of http://base. At lookup time with values of id1=1 and id2=2 a call of
56- * http/base?id1=1&id2=2 will be issued.</p>
57- * <p>For PUT and POST, parameters are mapped to the json body e.g. for
58- * REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and
59- * id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}</p>
60- * <p>For all http methods, url segments can be used to include lookup up values. Using the map from
61- =======
6235 * Generic JSON and URL query creator; in addition to be able to map columns to json requests,
6336 * it allows url inserts to be mapped to column names using templating.
6437 * <br>
7245 * id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}
7346 * <br>
7447 * For all http methods, url segments can be used to include lookup up values. Using the map from
75- >>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
7648 * <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
7749 * value of the associated column.
7850 * e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> = "key1":"col1"
7951 * and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of
80- <<<<<<< HEAD
81- * http/base/aaaa will be issued.</p>
82- */
83- @ Slf4j
84- @ Builder
85- public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
86- private static final long serialVersionUID = 1L ;
87- private final String httpMethod ;
88- // not final so we can mutate for unit test
89- private SerializationSchema <RowData > serializationSchema ;
90- private final List <String > requestQueryParamsFields ;
91- private boolean schemaOpened = false ;
92- private final List <String > requestBodyFields ;
93- private final Map <String , String > requestUrlMap ;
94- private final LookupRow lookupRow ;
95- =======
9652 * http/base/aaaa will be issued.
9753 *
9854 */
55+ @ Slf4j
9956public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
10057 private static final long serialVersionUID = 1L ;
101- private static final Logger log = LogManager .getLogger (GenericJsonAndUrlQueryCreator .class );
10258
10359 // not final so we can mutate for unit test
10460 private SerializationSchema <RowData > serializationSchema ;
@@ -133,7 +89,6 @@ public GenericJsonAndUrlQueryCreator(final String httpMethod,
13389 this .requestBodyFields = requestBodyFields ;
13490 this .requestUrlMap = requestUrlMap ;
13591 }
136- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
13792 @ VisibleForTesting
13893 void setSerializationSchema (SerializationSchema <RowData >
13994 serializationSchema ) {
@@ -153,13 +108,9 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
153108 jsonObject = (ObjectNode ) ObjectMapperAdapter .instance ().readTree (
154109 serializationSchema .serialize (lookupDataRow ));
155110 } catch (IOException e ) {
156- <<<<<<< HEAD
157- throw new RuntimeException ("Unable to parse the lookup arguments to json." , e );
158- =======
159111 String message = "Unable to parse the lookup arguments to json." ;
160112 log .error (message , e );
161113 throw new RuntimeException (message , e );
162- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
163114 }
164115 // Parameters are encoded as query params for GET and none GET.
165116 // Later code will turn these query params into the body for PUTs and POSTs
@@ -180,34 +131,24 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
180131 lookupQuery = ObjectMapperAdapter .instance ()
181132 .writeValueAsString (jsonObject .retain (requestBodyFields ));
182133 } catch (JsonProcessingException e ) {
183- <<<<<<< HEAD
184- throw new RuntimeException ("Unable to convert Json Object to a string" , e );
185- =======
186134 final String message = "Unable to convert Json Object to a string" ;
187135 log .error (message , e );
188136 throw new RuntimeException (message ,e );
189- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
190137 }
191138 // body parameters
192139 // use the request json object to scope the required fields and the lookupArgs as values
193140 bodyBasedUrlQueryParams = createBodyBasedParams (lookupArgs ,
194141 jsonObjectForQueryParams );
195142 }
196143 // add the path map
197- <<<<<<< HEAD
198- final Map <String , String > pathBasedUrlParams = createURLPathBasedParams (lookupArgs ,
199- =======
200144 final Map <String , String > pathBasedUrlParams = createPathBasedParams (lookupArgs ,
201- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
202145 requestUrlMap );
203146
204147 return new LookupQueryInfo (lookupQuery , bodyBasedUrlQueryParams , pathBasedUrlParams );
205148
206149 }
207150
208151 /**
209- <<<<<<< HEAD
210- =======
211152 * Create a Row from a RowData and DataType
212153 * @param lookupRowData the lookup RowData
213154 * @param rowType the datatype
@@ -230,19 +171,14 @@ static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
230171 }
231172
232173 /**
233- >>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
234174 * Create map of the json key to the lookup argument
235175 * value. This is used for body based content.
236176 * @param args lookup arguments
237177 * @param objectNode object node
238178 * @return map of field content to the lookup argument value.
239179 */
240180 private Map <String , String > createBodyBasedParams (final Collection <LookupArg > args ,
241- <<<<<<< HEAD
242- ObjectNode objectNode ) {
243- =======
244181 ObjectNode objectNode ) {
245- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
246182 Map <String , String > mapOfJsonKeyToLookupArg = new LinkedHashMap <>();
247183 Iterator <Map .Entry <String , JsonNode >> iterator = objectNode .fields ();
248184 iterator .forEachRemaining (field -> {
@@ -258,45 +194,26 @@ private Map<String, String> createBodyBasedParams(final Collection<LookupArg> ar
258194 return mapOfJsonKeyToLookupArg ;
259195 }
260196 /**
261- <<<<<<< HEAD
262- * Create map of insert name to column name for path inserts
263- =======
264197 * Create map of the json key to the lookup argument
265198 * value. This is used for body based content.
266- >>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
267199 * @param args lookup arguments
268200 * @param urlMap map of insert name to column name
269201 * @return map of field content to the lookup argument value.
270202 */
271- <<<<<<< HEAD
272- private Map <String , String > createURLPathBasedParams (final Collection <LookupArg > args ,
273- Map <String ,
274- String > urlMap ) {
275- Map <String , String > mapOfinsertKeyToLookupArg = new LinkedHashMap <>();
276- =======
277203 private Map <String , String > createPathBasedParams (final Collection <LookupArg > args ,
278204 Map <String , String > urlMap ) {
279205 Map <String , String > mapOfJsonKeyToLookupArg = new LinkedHashMap <>();
280- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
281206 if (urlMap != null ) {
282207 for (String key : urlMap .keySet ()) {
283208 for (final LookupArg arg : args ) {
284209 if (arg .getArgName ().equals (key )) {
285- <<<<<<< HEAD
286- mapOfinsertKeyToLookupArg .put (
287- =======
288210 mapOfJsonKeyToLookupArg .put (
289- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
290211 urlMap .get (key ), arg .getArgValue ());
291212 }
292213 }
293214 }
294215 }
295- <<<<<<< HEAD
296- return mapOfinsertKeyToLookupArg ;
297- =======
298216 return mapOfJsonKeyToLookupArg ;
299- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
300217 }
301218 /**
302219 * Convert json object to query params string
@@ -317,23 +234,13 @@ static String convertToQueryParameters(final ObjectNode jsonObject, String enc)
317234 result .add (fieldName + "="
318235 + URLEncoder .encode (fieldValue , enc ));
319236 } catch (UnsupportedEncodingException e ) {
320- <<<<<<< HEAD
321- throw new RuntimeException ("Failed to encode the value of the query parameter name "
322- + fieldName
323- + ": "
324- + fieldValue
325- + " using encoding "
326- + enc ,
327- e );
328- =======
329237 final String message =
330238 "Failed to encode the value of the query parameter name "
331239 + fieldName
332240 + ": "
333241 + fieldValue ;
334242 log .error (message , e );
335243 throw new RuntimeException (message , e );
336- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
337244 }
338245 });
339246
@@ -349,16 +256,11 @@ private void checkOpened() {
349256 GenericJsonAndUrlQueryCreator .class ));
350257 this .schemaOpened = true ;
351258 } catch (final Exception e ) {
352- <<<<<<< HEAD
353- throw new FlinkRuntimeException ("Failed to initialize serialization schema for "
354- + GenericJsonAndUrlQueryCreator .class , e );
355- =======
356259 final String message =
357260 "Failed to initialize serialization schema for "
358261 + GenericJsonAndUrlQueryCreator .class ;
359262 log .error (message , e );
360263 throw new FlinkRuntimeException (message , e );
361- >>>>>>> 6 c68722 (HTTP -99 Generic Json url query creator )
362264 }
363265 }
364266 }
0 commit comments