Skip to content

Commit db3642c

Browse files
dborowitzrahulpinto19
authored andcommitted
feat(serverless-spark): add create_spark_batch tool
This tool is almost identical to create_pyspark_batch, but for Java Spark batches. There are some minor differences in how the main files and args are provided.
1 parent b925536 commit db3642c

File tree

10 files changed

+455
-5
lines changed

10 files changed

+455
-5
lines changed

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ import (
199199
_ "github.com/googleapis/genai-toolbox/internal/tools/redis"
200200
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcancelbatch"
201201
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcreatepysparkbatch"
202+
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcreatesparkbatch"
202203
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkgetbatch"
203204
_ "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparklistbatches"
204205
_ "github.com/googleapis/genai-toolbox/internal/tools/singlestore/singlestoreexecutesql"

cmd/root_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1558,7 +1558,7 @@ func TestPrebuiltTools(t *testing.T) {
15581558
wantToolset: server.ToolsetConfigs{
15591559
"serverless_spark_tools": tools.ToolsetConfig{
15601560
Name: "serverless_spark_tools",
1561-
ToolNames: []string{"list_batches", "get_batch", "cancel_batch", "create_pyspark_batch"},
1561+
ToolNames: []string{"list_batches", "get_batch", "cancel_batch", "create_pyspark_batch", "create_spark_batch"},
15621562
},
15631563
},
15641564
},

docs/en/resources/sources/serverless-spark.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Apache Spark.
2323
Cancel a running Serverless Spark batch operation.
2424
- [`serverless-spark-create-pyspark-batch`](../tools/serverless-spark/serverless-spark-create-pyspark-batch.md)
2525
Create a Serverless Spark PySpark batch operation.
26+
- [`serverless-spark-create-spark-batch`](../tools/serverless-spark/serverless-spark-create-spark-batch.md)
27+
Create a Serverless Spark Java batch operation.
2628

2729
## Requirements
2830

docs/en/resources/tools/serverless-spark/_index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ description: >
1010
- [serverless-spark-list-batches](./serverless-spark-list-batches.md)
1111
- [serverless-spark-cancel-batch](./serverless-spark-cancel-batch.md)
1212
- [serverless-spark-create-pyspark-batch](./serverless-spark-create-pyspark-batch.md)
13+
- [serverless-spark-create-spark-batch](./serverless-spark-create-spark-batch.md)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
---
2+
title: "serverless-spark-create-spark-batch"
3+
type: docs
4+
weight: 2
5+
description: >
6+
A "serverless-spark-create-spark-batch" tool submits a Spark batch to run asynchronously.
7+
aliases:
8+
- /resources/tools/serverless-spark-create-spark-batch
9+
---
10+
11+
## About
12+
13+
A `serverless-spark-create-spark-batch` tool submits a Java Spark batch to a
14+
Google Cloud Serverless for Apache Spark source. The workload executes
15+
asynchronously and takes around a minute to begin executing; status can be
16+
polled using the [get batch](serverless-spark-get-batch.md) tool.
17+
18+
It's compatible with the following sources:
19+
20+
- [serverless-spark](../../sources/serverless-spark.md)
21+
22+
`serverless-spark-create-spark-batch` accepts the following parameters:
23+
24+
- **`mainJarFile`**: Optional. The gs:// URI of the jar file that contains the
25+
main class. Exactly one of mainJarFile or mainClass must be specified.
26+
- **`mainClass`**: Optional. The name of the driver's main class. Exactly one of
27+
mainJarFile or mainClass must be specified.
28+
- **`jarFiles`**: Optional. A list of gs:// URIs of jar files to add to the CLASSPATHs of
29+
the Spark driver and tasks.
30+
- **`args`** Optional. A list of arguments passed to the driver.
31+
- **`version`** Optional. The Serverless [runtime
32+
version](https://docs.cloud.google.com/dataproc-serverless/docs/concepts/versions/dataproc-serverless-versions)
33+
to execute with.
34+
35+
## Custom Configuration
36+
37+
This tool supports custom
38+
[`runtimeConfig`](https://docs.cloud.google.com/dataproc-serverless/docs/reference/rest/v1/RuntimeConfig)
39+
and
40+
[`environmentConfig`](https://docs.cloud.google.com/dataproc-serverless/docs/reference/rest/v1/EnvironmentConfig)
41+
settings, which can be specified in a `tools.yaml` file. These configurations
42+
are parsed as YAML and passed to the Dataproc API.
43+
44+
**Note:** If your project requires custom runtime or environment configuration,
45+
you must write a custom `tools.yaml`, you cannot use the `serverless-spark`
46+
prebuilt config.
47+
48+
### Example `tools.yaml`
49+
50+
```yaml
51+
tools:
52+
- name: "serverless-spark-create-spark-batch"
53+
kind: "serverless-spark-create-spark-batch"
54+
source: "my-serverless-spark-source"
55+
runtimeConfig:
56+
properties:
57+
spark.driver.memory: "1024m"
58+
environmentConfig:
59+
executionConfig:
60+
networkUri: "my-network"
61+
```
62+
63+
## Response Format
64+
65+
The response is an
66+
[operation](https://docs.cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.operations#resource:-operation)
67+
metadata JSON object corresponding to [batch operation
68+
metadata](https://pkg.go.dev/cloud.google.com/go/dataproc/v2/apiv1/dataprocpb#BatchOperationMetadata)
69+
Example:
70+
71+
```json
72+
{
73+
"batch": "projects/myproject/locations/us-central1/batches/aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
74+
"batchUuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
75+
"createTime": "2025-11-19T16:36:47.607119Z",
76+
"description": "Batch",
77+
"labels": {
78+
"goog-dataproc-batch-uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
79+
"goog-dataproc-location": "us-central1"
80+
},
81+
"operationType": "BATCH",
82+
"warnings": [
83+
"No runtime version specified. Using the default runtime version."
84+
]
85+
}
86+
```
87+
88+
## Reference
89+
90+
| **field** | **type** | **required** | **description** |
91+
| ----------------- | :------: | :----------: | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
92+
| kind | string | true | Must be "serverless-spark-create-spark-batch". |
93+
| source | string | true | Name of the source the tool should use. |
94+
| description | string | false | Description of the tool that is passed to the LLM. |
95+
| runtimeConfig | map | false | [Runtime config](https://docs.cloud.google.com/dataproc-serverless/docs/reference/rest/v1/RuntimeConfig) for all batches created with this tool. |
96+
| environmentConfig | map | false | [Environment config](https://docs.cloud.google.com/dataproc-serverless/docs/reference/rest/v1/EnvironmentConfig) for all batches created with this tool. |
97+
| authRequired | string[] | false | List of auth services required to invoke this tool. |

internal/prebuiltconfigs/tools/serverless-spark.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@ tools:
3131
create_pyspark_batch:
3232
kind: serverless-spark-create-pyspark-batch
3333
source: serverless-spark-source
34+
create_spark_batch:
35+
kind: serverless-spark-create-spark-batch
36+
source: serverless-spark-source
3437

3538
toolsets:
3639
serverless_spark_tools:
3740
- list_batches
3841
- get_batch
3942
- cancel_batch
4043
- create_pyspark_batch
44+
- create_spark_batch

internal/tools/serverlessspark/createbatch/tool.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@ func (t *Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, par
8989
return nil, fmt.Errorf("failed to build batch: %w", err)
9090
}
9191

92-
if t.Config.RuntimeConfig != nil {
93-
batch.RuntimeConfig = proto.Clone(t.Config.RuntimeConfig).(*dataproc.RuntimeConfig)
92+
if t.RuntimeConfig != nil {
93+
batch.RuntimeConfig = proto.Clone(t.RuntimeConfig).(*dataproc.RuntimeConfig)
9494
}
9595

96-
if t.Config.EnvironmentConfig != nil {
97-
batch.EnvironmentConfig = proto.Clone(t.Config.EnvironmentConfig).(*dataproc.EnvironmentConfig)
96+
if t.EnvironmentConfig != nil {
97+
batch.EnvironmentConfig = proto.Clone(t.EnvironmentConfig).(*dataproc.EnvironmentConfig)
9898
}
9999

100100
// Common override for version if present in params
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package serverlesssparkcreatesparkbatch
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
dataproc "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
22+
"github.com/goccy/go-yaml"
23+
"github.com/googleapis/genai-toolbox/internal/sources"
24+
"github.com/googleapis/genai-toolbox/internal/tools"
25+
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/createbatch"
26+
"github.com/googleapis/genai-toolbox/internal/util/parameters"
27+
)
28+
29+
const kind = "serverless-spark-create-spark-batch"
30+
31+
func init() {
32+
if !tools.Register(kind, newConfig) {
33+
panic(fmt.Sprintf("tool kind %q already registered", kind))
34+
}
35+
}
36+
37+
func newConfig(ctx context.Context, name string, decoder *yaml.Decoder) (tools.ToolConfig, error) {
38+
baseCfg, err := createbatch.NewConfig(ctx, name, decoder)
39+
if err != nil {
40+
return nil, err
41+
}
42+
return Config{baseCfg}, nil
43+
}
44+
45+
type Config struct {
46+
createbatch.Config
47+
}
48+
49+
// validate interface
50+
var _ tools.ToolConfig = Config{}
51+
52+
// ToolConfigKind returns the unique name for this tool.
53+
func (cfg Config) ToolConfigKind() string {
54+
return kind
55+
}
56+
57+
// Initialize creates a new Tool instance.
58+
func (cfg Config) Initialize(srcs map[string]sources.Source) (tools.Tool, error) {
59+
return createbatch.NewTool(cfg.Config, cfg, srcs, &SparkBatchBuilder{})
60+
}
61+
62+
type SparkBatchBuilder struct{}
63+
64+
func (b *SparkBatchBuilder) Parameters() parameters.Parameters {
65+
return parameters.Parameters{
66+
parameters.NewStringParameterWithRequired("mainJarFile", "Optional. The gs:// URI of the jar file that contains the main class. Exactly one of mainJarFile or mainClass must be specified.", false),
67+
parameters.NewStringParameterWithRequired("mainClass", "Optional. The name of the driver's main class. Exactly one of mainJarFile or mainClass must be specified.", false),
68+
parameters.NewArrayParameterWithRequired("jarFiles", "Optional. A list of gs:// URIs of jar files to add to the CLASSPATHs of the Spark driver and tasks.", false, parameters.NewStringParameter("jarFile", "A jar file URI.")),
69+
parameters.NewArrayParameterWithRequired("args", "Optional. A list of arguments passed to the driver.", false, parameters.NewStringParameter("arg", "An argument.")),
70+
parameters.NewStringParameterWithRequired("version", "Optional. The Serverless runtime version to execute with.", false),
71+
}
72+
}
73+
74+
func (b *SparkBatchBuilder) BuildBatch(params parameters.ParamValues) (*dataproc.Batch, error) {
75+
paramMap := params.AsMap()
76+
77+
mainJar, _ := paramMap["mainJarFile"].(string)
78+
mainClass, _ := paramMap["mainClass"].(string)
79+
80+
if mainJar == "" && mainClass == "" {
81+
return nil, fmt.Errorf("must provide either mainJarFile or mainClass")
82+
}
83+
if mainJar != "" && mainClass != "" {
84+
return nil, fmt.Errorf("cannot provide both mainJarFile and mainClass")
85+
}
86+
87+
sparkBatch := &dataproc.SparkBatch{}
88+
if mainJar != "" {
89+
sparkBatch.Driver = &dataproc.SparkBatch_MainJarFileUri{MainJarFileUri: mainJar}
90+
} else {
91+
sparkBatch.Driver = &dataproc.SparkBatch_MainClass{MainClass: mainClass}
92+
}
93+
94+
if jarFileUris, ok := paramMap["jarFiles"].([]any); ok {
95+
for _, uri := range jarFileUris {
96+
sparkBatch.JarFileUris = append(sparkBatch.JarFileUris, fmt.Sprintf("%v", uri))
97+
}
98+
} else if mainClass != "" {
99+
return nil, fmt.Errorf("jarFiles is required when mainClass is provided")
100+
}
101+
102+
if args, ok := paramMap["args"].([]any); ok {
103+
for _, arg := range args {
104+
sparkBatch.Args = append(sparkBatch.Args, fmt.Sprintf("%v", arg))
105+
}
106+
}
107+
108+
return &dataproc.Batch{
109+
BatchConfig: &dataproc.Batch_SparkBatch{
110+
SparkBatch: sparkBatch,
111+
},
112+
}, nil
113+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package serverlesssparkcreatesparkbatch_test
16+
17+
import (
18+
"testing"
19+
20+
"github.com/googleapis/genai-toolbox/internal/tools"
21+
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/createbatch"
22+
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/serverlesssparkcreatesparkbatch"
23+
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/testutils"
24+
)
25+
26+
func TestParseFromYaml(t *testing.T) {
27+
testutils.RunParseFromYAMLTests(t, "serverless-spark-create-spark-batch", func(c createbatch.Config) tools.ToolConfig {
28+
return serverlesssparkcreatesparkbatch.Config{Config: c}
29+
})
30+
}

0 commit comments

Comments
 (0)