Skip to content

Commit 66fae0b

Browse files
committed
address type changes
1 parent f94e94f commit 66fae0b

File tree

3 files changed

+193
-39
lines changed

3 files changed

+193
-39
lines changed

controllers/spec/function_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) {
173173
"initialPositionInStream": "TRIM_HORIZON",
174174
})
175175
function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{
176-
Archive: "builtin://kinesis",
176+
SourceType: "kinesis",
177177
Configs: &sourceConfigs,
178178
TypeClassName: "java.lang.Object",
179179
}

controllers/spec/utils.go

Lines changed: 148 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -118,25 +118,123 @@ func encodeConnectorConfigs(config *v1alpha1.Config) string {
118118
return payload
119119
}
120120

121+
func stringFromConfig(config *v1alpha1.Config, key string) string {
122+
if config == nil || config.Data == nil {
123+
return ""
124+
}
125+
if val, ok := config.Data[key]; ok && val != nil {
126+
switch typed := val.(type) {
127+
case string:
128+
return typed
129+
default:
130+
return fmt.Sprint(typed)
131+
}
132+
}
133+
return ""
134+
}
135+
136+
func addConfigEntries(dst map[string]interface{}, value interface{}) {
137+
switch typed := value.(type) {
138+
case map[string]interface{}:
139+
for k, v := range typed {
140+
if k == "" || v == nil {
141+
continue
142+
}
143+
dst[k] = v
144+
}
145+
case map[interface{}]interface{}:
146+
for k, v := range typed {
147+
if k == nil || v == nil {
148+
continue
149+
}
150+
key := fmt.Sprint(k)
151+
if key == "" {
152+
continue
153+
}
154+
dst[key] = v
155+
}
156+
}
157+
}
158+
159+
func extractConnectorConfigs(config *v1alpha1.Config, reservedKeys map[string]struct{}) map[string]interface{} {
160+
if config == nil || config.Data == nil {
161+
return nil
162+
}
163+
164+
result := make(map[string]interface{})
165+
if nested, ok := config.Data["configs"]; ok {
166+
addConfigEntries(result, nested)
167+
}
168+
for key, value := range config.Data {
169+
if value == nil || key == "configs" {
170+
continue
171+
}
172+
if reservedKeys != nil {
173+
if _, skip := reservedKeys[key]; skip {
174+
continue
175+
}
176+
}
177+
result[key] = value
178+
}
179+
if len(result) == 0 {
180+
return nil
181+
}
182+
return result
183+
}
184+
185+
func resolveBuiltinFromConfig(config *v1alpha1.Config) string {
186+
if builtin := stringFromConfig(config, "builtin"); builtin != "" {
187+
return builtin
188+
}
189+
if archive := stringFromConfig(config, "archive"); strings.HasPrefix(archive, builtinURLPrefix) {
190+
return strings.TrimPrefix(archive, builtinURLPrefix)
191+
}
192+
return ""
193+
}
194+
121195
func buildSourceConnectorDetails(cfg *v1alpha1.SourceConnectorSpec) *connectorConfigDetails {
122196
if cfg == nil {
123197
return nil
124198
}
125199

126-
details := &connectorConfigDetails{
127-
className: cfg.ClassName,
128-
typeClassName: cfg.TypeClassName,
200+
details := &connectorConfigDetails{}
201+
202+
className := cfg.ClassName
203+
if className == "" {
204+
className = stringFromConfig(cfg.Configs, "className")
205+
}
206+
if className != "" {
207+
details.className = className
129208
}
130209

131-
builtin := cfg.Builtin
132-
if builtin == "" {
133-
if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) {
134-
builtin = strings.TrimPrefix(archive, builtinURLPrefix)
135-
}
210+
typeClassName := cfg.TypeClassName
211+
if typeClassName == "" {
212+
typeClassName = stringFromConfig(cfg.Configs, "typeClassName")
213+
}
214+
if typeClassName != "" {
215+
details.typeClassName = typeClassName
136216
}
137-
details.builtin = builtin
138217

139-
if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" {
218+
builtin := cfg.SourceType
219+
if builtin == "" {
220+
builtin = resolveBuiltinFromConfig(cfg.Configs)
221+
}
222+
if builtin != "" {
223+
details.builtin = builtin
224+
}
225+
226+
configMap := extractConnectorConfigs(cfg.Configs, map[string]struct{}{
227+
"archive": {},
228+
"builtin": {},
229+
"className": {},
230+
"typeClassName": {},
231+
})
232+
if len(configMap) > 0 {
233+
tmp := v1alpha1.NewConfig(configMap)
234+
if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" {
235+
details.configs = configsJSON
236+
}
237+
} else if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" {
140238
details.configs = configsJSON
141239
}
142240

@@ -151,39 +249,54 @@ func buildSinkConnectorDetails(cfg *v1alpha1.SinkConnectorSpec) *connectorConfig
151249
return nil
152250
}
153251

154-
details := &connectorConfigDetails{
155-
className: cfg.ClassName,
156-
typeClassName: cfg.TypeClassName,
252+
details := &connectorConfigDetails{}
253+
254+
className := cfg.ClassName
255+
if className == "" {
256+
className = stringFromConfig(cfg.Configs, "className")
257+
}
258+
if className != "" {
259+
details.className = className
260+
}
261+
262+
typeClassName := cfg.TypeClassName
263+
if typeClassName == "" {
264+
typeClassName = stringFromConfig(cfg.Configs, "typeClassName")
265+
}
266+
if typeClassName != "" {
267+
details.typeClassName = typeClassName
157268
}
158269

159-
builtin := cfg.Builtin
270+
builtin := cfg.SinkType
160271
if builtin == "" {
161-
if archive := cfg.Archive; strings.HasPrefix(archive, builtinURLPrefix) {
162-
builtin = strings.TrimPrefix(archive, builtinURLPrefix)
163-
} else if sinkType := cfg.SinkType; sinkType != "" {
164-
builtin = sinkType
165-
}
272+
builtin = resolveBuiltinFromConfig(cfg.Configs)
273+
}
274+
if builtin != "" {
275+
details.builtin = builtin
166276
}
167-
details.builtin = builtin
168277

169-
if cfg.Configs != nil || cfg.SinkType != "" {
170-
merged := map[string]interface{}{}
171-
if cfg.Configs != nil && cfg.Configs.Data != nil {
172-
for k, v := range cfg.Configs.Data {
173-
merged[k] = v
174-
}
278+
configMap := extractConnectorConfigs(cfg.Configs, map[string]struct{}{
279+
"archive": {},
280+
"builtin": {},
281+
"className": {},
282+
"typeClassName": {},
283+
})
284+
if builtin != "" {
285+
if configMap == nil {
286+
configMap = map[string]interface{}{}
175287
}
176-
if cfg.SinkType != "" {
177-
if _, ok := merged["sinkType"]; !ok {
178-
merged["sinkType"] = cfg.SinkType
179-
}
288+
if _, ok := configMap["sinkType"]; !ok {
289+
configMap["sinkType"] = builtin
180290
}
181-
if len(merged) > 0 {
182-
tmp := v1alpha1.NewConfig(merged)
183-
if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" {
184-
details.configs = configsJSON
185-
}
291+
}
292+
293+
if len(configMap) > 0 {
294+
tmp := v1alpha1.NewConfig(configMap)
295+
if configsJSON := encodeConnectorConfigs(&tmp); configsJSON != "" {
296+
details.configs = configsJSON
186297
}
298+
} else if configsJSON := encodeConnectorConfigs(cfg.Configs); configsJSON != "" {
299+
details.configs = configsJSON
187300
}
188301

189302
if details.builtin == "" && details.className == "" && details.typeClassName == "" && details.configs == "" {

controllers/spec/utils_test.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ func TestGenerateFunctionInputSpecWithConnector(t *testing.T) {
118118
},
119119
}
120120
function.Spec.SourceConfig = &v1alpha1.SourceConnectorSpec{
121-
Archive: "builtin://kafka",
122-
ClassName: "org.apache.pulsar.io.kafka.KafkaSource",
123-
Configs: configs,
121+
SourceType: "kafka",
122+
ClassName: "org.apache.pulsar.io.kafka.KafkaSource",
123+
Configs: configs,
124124
}
125125

126126
sourceSpec := generateFunctionInputSpec(function)
@@ -149,3 +149,44 @@ func TestGenerateFunctionOutputSpecWithConnector(t *testing.T) {
149149
assert.Equal(t, "org.apache.pulsar.io.kafka.KafkaSink", sinkSpec.ClassName)
150150
assert.Equal(t, `{"bootstrapServers":"kafka:9092","sinkType":"kafka","topic":"kafka-output"}`, sinkSpec.Configs)
151151
}
152+
153+
func TestBuildSourceConnectorDetailsFromConfig(t *testing.T) {
154+
connectorConfig := v1alpha1.NewConfig(map[string]interface{}{
155+
"archive": "builtin://filesystem",
156+
"className": "org.apache.pulsar.io.fs.FileSource",
157+
"typeClassName": "java.lang.String",
158+
"configs": map[string]interface{}{
159+
"inputDirectory": "/var/data",
160+
"recurse": true,
161+
},
162+
})
163+
164+
details := buildSourceConnectorDetails(&v1alpha1.SourceConnectorSpec{
165+
Configs: &connectorConfig,
166+
})
167+
168+
assert.NotNil(t, details)
169+
assert.Equal(t, "filesystem", details.builtin)
170+
assert.Equal(t, "org.apache.pulsar.io.fs.FileSource", details.className)
171+
assert.Equal(t, "java.lang.String", details.typeClassName)
172+
assert.Equal(t, `{"inputDirectory":"/var/data","recurse":true}`, details.configs)
173+
}
174+
175+
func TestBuildSinkConnectorDetailsFromConfig(t *testing.T) {
176+
connectorConfig := v1alpha1.NewConfig(map[string]interface{}{
177+
"archive": "builtin://jms",
178+
"className": "org.apache.pulsar.io.jms.JMSSink",
179+
"configs": map[string]interface{}{
180+
"queueName": "demo-queue",
181+
},
182+
})
183+
184+
details := buildSinkConnectorDetails(&v1alpha1.SinkConnectorSpec{
185+
Configs: &connectorConfig,
186+
})
187+
188+
assert.NotNil(t, details)
189+
assert.Equal(t, "jms", details.builtin)
190+
assert.Equal(t, "org.apache.pulsar.io.jms.JMSSink", details.className)
191+
assert.Equal(t, `{"queueName":"demo-queue","sinkType":"jms"}`, details.configs)
192+
}

0 commit comments

Comments
 (0)