Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 241 additions & 28 deletions _data-prepper/pipelines/configuration/sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@

The following example shows the `kafka` source in a Data Prepper pipeline:

```json
```yaml
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- 127.0.0.1:9093
- 127.0.0.1:9092
topics:
- name: Topic1
group_id: groupID1
- name: Topic2
group_id: groupID1
sink:
- stdout: {}
```

## Configuration
Expand All @@ -44,7 +46,7 @@

### Topics

Use the following options in the `topics` array.
Use the following options in the `topics` array for each topic.

Option | Required | Type | Description
:--- | :--- | :--- | :---
Expand All @@ -65,55 +67,102 @@
`retry_backoff` | No | Integer | The amount of time to wait before attempting to retry a failed request to a given topic partition. Default is `10s`.
`max_poll_interval` | No | Integer | The maximum delay between invocations of a `poll()` when using group management through Kafka's `max.poll.interval.ms` option. Default is `300s`.
`consumer_max_poll_records` | No | Integer | The maximum number of records returned in a single `poll()` call through Kafka's `max.poll.records` setting. Default is `500`.
`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key.
`key_mode` | No | String | Indicates how the key field of the Kafka message should be handled. The default setting is `include_as_field`, which includes the key in the `kafka_key` event. The `include_as_metadata` setting includes the key in the event's metadata. The `discard` setting discards the key.

### Schema

The following option is required inside the `schema` configuration.
The `schema` configuration has the following options.

Option | Type | Description
Option | Type | Required | Description
:--- | :--- | :---
`type` | String | Sets the type of schema based on your registry, either the AWS Glue Schema Registry, `aws_glue`, or the Confluent Schema Registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options.
`type` | String | Yes | Sets the type of schema based on your registry, either the AWS Glue schema registry, `aws_glue`, or the Confluent schema registry, `confluent`. When using the `aws_glue` registry, set any [AWS](#aws) configuration options.
`basic_auth_credentials_source` | String | No | Where schema registry credentials come from. Use `USER_INFO` when providing `api_key/api_secret`. Other valid values are `URL` and `SASL_INHERIT`. Default typically aligns with the underlying client.

The following configuration options are only required when using a `confluent` registry.

Option | Type | Description
:--- | :--- | :---
`registry_url` | String | Deserializes a record value from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`.
`version` | String | Deserializes a record key from a `bytearray` into a string. Default is `org.apache.kafka.common.serialization.StringDeserializer`.
`schema_registry_api_key` | String | The schema registry API key.
`schema_registry_api_secret` | String | The schema registry API secret.
`registry_url` | String | Base URL of the schema registry, for example, `http://schema-registry:8081` or `https://sr.example.com`.
`version` | String | Schema version to use per subject. Use an integer or `"latest"`.
`api_key` | String | The schema registry API key.
`api_secret` | String | The schema registry API secret.

See following example of configuring schema registry:

```yaml
schema:
type: confluent
registry_url: "http://schema-registry:8081"
api_key: "<optional if using basic/key auth>"
api_secret: "<optional if using basic/key auth>"
version: "latest"
```
{% include copy.html %}

### Authentication
#### schema registry over TLS

Check failure on line 102 in _data-prepper/pipelines/configuration/sources/kafka.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'schema registry over TLS' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'schema registry over TLS' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kafka.md", "range": {"start": {"line": 102, "column": 6}}}, "severity": "ERROR"}

The following option is required inside the `authentication` object.
The Kafka source uses the JVM truststore when connecting to schema registry over `https`. If schema registry is signed by a custom CA, add that CA to the Data Prepper JVM truststore or provide a custom truststore using environment variables.

Option | Type | Description
:--- | :--- | :---
`sasl` | JSON object | The Simple Authentication and Security Layer (SASL) authentication configuration.
You can use the following command to build a truststore with your CA certificate:

### SASL
```bash
keytool -importcert -noprompt -alias sr-ca -file sr-ca.pem -keystore /usr/share/data-prepper/certs/sr.truststore.jks -storepass changeit
```
{% include copy.html %}

Use one of the following options when configuring SASL authentication.
The following command configures Data Prepper using `JAVA_TOOL_OPTIONS`:

```yaml
JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit
```
{% include copy.html %}

Option | Type | Description
:--- | :--- | :---
`plaintext` | JSON object | The [PLAINTEXT](#sasl-plaintext) authentication configuration.
`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`.
You can configure Data Pepper in `docker-compose.yaml` using the following method:

```yaml
environment:
- JAVA_TOOL_OPTIONS=-Djavax.net.ssl.trustStore=/usr/share/data-prepper/certs/sr.truststore.jks -Djavax.net.ssl.trustStorePassword=changeit
volumes:
- ./certs:/usr/share/data-prepper/certs:ro
```
{% include copy.html %}

### Authentication

The `authentication` section configures SASL.

```yaml
authentication:
sasl:
plain:
username: alice
password: secret
```
{% include copy.html %}

| Option | Type | Description |
|:---|:---|:---|
| `sasl` | Object | SASL configuration. |

#### SASL PLAINTEXT
#### SASL

Check warning on line 147 in _data-prepper/pipelines/configuration/sources/kafka.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.AcronymParentheses] 'SASL': Spell out acronyms the first time that you use them on a page and follow them with the acronym in parentheses. Subsequently, use the acronym alone. Raw Output: {"message": "[OpenSearch.AcronymParentheses] 'SASL': Spell out acronyms the first time that you use them on a page and follow them with the acronym in parentheses. Subsequently, use the acronym alone.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kafka.md", "range": {"start": {"line": 147, "column": 6}}}, "severity": "WARNING"}

The following options are required when using the [SASL PLAINTEXT](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol.
Use one of the following options when configuring SASL authentication.

Option | Type | Description
:--- | :--- | :---
`username` | String | The username for the PLAINTEXT auth.
`password` | String | The password for the PLAINTEXT auth.
`plain` | JSON object | The plaintext authentication configuration. See [SASL/PLAIN](#sasl-plaintext) fr further details.
`aws_msk_iam` | String | The Amazon MSK AWS Identity and Access Management (IAM) configuration. If set to `role`, the `sts_role_arm` set in the `aws` configuration is used. Default is `default`.

#### Encryption
##### SASL plaintext

The following options are required when using the [SASL.plain](https://kafka.apache.org/10/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html) protocol.

Check warning on line 158 in _data-prepper/pipelines/configuration/sources/kafka.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.AcronymParentheses] 'SASL': Spell out acronyms the first time that you use them on a page and follow them with the acronym in parentheses. Subsequently, use the acronym alone. Raw Output: {"message": "[OpenSearch.AcronymParentheses] 'SASL': Spell out acronyms the first time that you use them on a page and follow them with the acronym in parentheses. Subsequently, use the acronym alone.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kafka.md", "range": {"start": {"line": 158, "column": 52}}}, "severity": "WARNING"}

| Option | Type | Description |
|:---|:---|:---|
| `username` | String | SASL/PLAIN username. |
| `password` | String | SASL/PLAIN password. |

### Encryption

Use the following options when setting SSL encryption.

Expand All @@ -122,6 +171,13 @@
`type` | No | String | The encryption type. Use `none` to disable encryption. Default is `ssl`.
`insecure` | No | Boolean | A Boolean flag used to turn off SSL certificate verification. If set to `true`, certificate authority (CA) certificate verification is turned off and insecure HTTP requests are sent. Default is `false`.

```yaml
encryption:
type: ssl
# With public CA: no extra config needed.
# With private CA: trust using JVM truststore.
```
{% include copy.html %}

#### AWS

Expand All @@ -140,5 +196,162 @@
Option | Required | Type | Description
:--- | :--- | :--- | :---
`arn` | Yes | String | The [MSK ARN](https://docs.aws.amazon.com/msk/1.0/apireference/configurations-arn.html) to use.
`broker_connection_type` No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`.
`broker_connection_type` | No | String | The type of connector to use with the MSK broker, either `public`, `single_vpc`, or `multip_vpc`. Default is `single_vpc`.

## Configuration examples

This section demonstrates different pipeline configuration options.

### Basic Kafka source

Check failure on line 205 in _data-prepper/pipelines/configuration/sources/kafka.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'Basic Kafka source' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'Basic Kafka source' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kafka.md", "range": {"start": {"line": 205, "column": 5}}}, "severity": "ERROR"}

The following example pipeline reads JSON messages from a single plaintext Kafka topic with multiple consumer workers, parses them, and indexes into OpenSearch:

```yaml
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- localhost:9092
topics:
- name: my-topic
group_id: data-prepper-group
workers: 4
processor:
- parse_json:
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: kafka-data
```
{% include copy.html %}

### Kafka source with SSL encryption

The following example pipeline connects to a Kafka broker over TLS, consumes from a secure topic and writes results to OpenSearch:

```yaml
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- kafka-broker.example.com:9093
topics:
- name: secure-topic
group_id: secure-group
encryption:
type: ssl
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: secure-kafka-data
```
{% include copy.html %}

### Kafka source with SASL PLAIN authentication

The following example pipeline authenticates to Kafka using SASL/PLAIN over TLS, consumes from the topic, and indexes into OpenSearch.

```yaml
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- kafka-broker.example.com:9094
topics:
- name: authenticated-topic
group_id: auth-group
encryption:
type: ssl
authentication:
sasl:
plaintext:
username: kafka-user
password: kafka-password
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index: authenticated-kafka-data
```
{% include copy.html %}

### Amazon MSK with AWS Glue schema registry

The following example configures Amazon MSK with AWS Glue schema registry, consumes from an MSK cluster using AWS settings, deserializes payload using AWS Glue schema registry, normalizes timestamps, and writes to an Amazon OpenSearch domain:

```yaml
msk-pipeline:
source:
kafka:
acknowledgments: true
topics:
- name: my-msk-topic
group_id: msk-consumer-group
auto_offset_reset: earliest
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/data-prepper-role
msk:
arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster-name/uuid
schema:
type: aws_glue
registry_name: my-glue-registry
processor:
- date:
match:
- key: timestamp
patterns: ["epoch_milli"]
destination: "@timestamp"
sink:
- opensearch:
hosts: ["https://search-my-domain.us-east-1.opensearch.amazonaws.com"]
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/opensearch-role
index: msk-data
index_type: custom
```
{% include copy.html %}

### Confluent Kafka with schema registry

The following example configures Confluent Kafka with schema registry, connects to Confluent Cloud over TLS with SASL and Confluent schema registry credentials, decodes payloads, and indexes them into OpenSearch:

```yaml
confluent-pipeline:
source:
kafka:
bootstrap_servers:
- pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
topics:
- name: confluent-topic
group_id: confluent-group
auto_offset_reset: earliest
encryption:
type: ssl
authentication:
sasl:
plain:
username: confluent-api-key
password: confluent-api-secret
schema:
type: confluent
registry_url: https://psrc-xxxxx.us-east-1.aws.confluent.cloud
api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
basic_auth_credentials_source: USER_INFO
sink:
- opensearch:
hosts: ["https://localhost:9200"]
username: admin
password: admin_password
index_type: custom
index: confluent-data
```
{% include copy.html %}

Loading