diff --git a/cmd/tracee/cmd/analyze.go b/cmd/tracee/cmd/analyze.go index 1f92f60b5935..211c618d5bb3 100644 --- a/cmd/tracee/cmd/analyze.go +++ b/cmd/tracee/cmd/analyze.go @@ -101,9 +101,12 @@ func command(cmd *cobra.Command, args []string) { outputArg := viper.GetString("output") // placeholder printer for legacy mode - p, err := printer.New(config.PrinterConfig{ - OutFile: os.Stdout, - Kind: "ignore", + p, err := printer.New([]config.Destination{ + { + Name: "ignore", + File: os.Stdout, + Type: "ignore", + }, }) if err != nil { @@ -136,7 +139,7 @@ func command(cmd *cobra.Command, args []string) { if err != nil { logger.Fatalw("Failed to prepare output configuration", "error", err) } - p, err = printer.New(printerCfg) + p, err = printer.New([]config.Destination{printerCfg}) if err != nil { logger.Fatalw("Failed to create printer", "error", err) } diff --git a/docs/docs/outputs/index.md b/docs/docs/outputs/index.md index 22906778fa8f..7673a40f3d68 100644 --- a/docs/docs/outputs/index.md +++ b/docs/docs/outputs/index.md @@ -6,5 +6,6 @@ For examples on each configuration, please refer to: * Documentation on configuring [Tracee logs](./logging.md) * Documentation on configuring [output options](./output-options.md) * Documentation on configuring [output formats](./output-formats.md) +* Documentation on configuring [output streams](./streams.md) Note that example configuration for each can be found in the Tracee root directory within the examples folder < [examples/config/global_config.yaml](https://github.com/aquasecurity/tracee/tree/main/examples/config) \ No newline at end of file diff --git a/docs/docs/outputs/output-formats.md b/docs/docs/outputs/output-formats.md index f4e64b9c8c9b..9790bcf8cbcc 100644 --- a/docs/docs/outputs/output-formats.md +++ b/docs/docs/outputs/output-formats.md @@ -6,20 +6,47 @@ Note that only one output format can be used in the Tracee configuration. ## Available Formats -The following examples will have to be added into a Tracee configuration file. +The following examples will have to be added into a Tracee configuration file or CLI flags. ### JSON Displays output events in json format. The default path to a file is stdout. +**yaml** + ```yaml output: - json: - files: - - stdout + destinations: + - name: stdout_destination + type: file + format: json + path: stdout +``` + +**cli** + +```bash +tracee --output destinations.stdout_destination.type=file \ + --output destinations.stdout_destination.format=json \ + --output destinations.stdout_destination.path=stdout ``` -Note: the `files: key` must also be defined, even if it's just for stdout. This is mandatory for the parser. +Note: the `name` is mandatory. `type` has `file` as a default value. `format` has `table` as a default value. `path` has `stdout` as a default value. The following configuration is valid as well + +**yaml** + +```yaml +output: + destinations: + - name: stdout_destination + format: json +``` + +**cli** + +```bash +tracee --output destinations.stdout_destination.format=json +``` !!! Tip A good tip is to pipe **tracee** json output to [jq](https://jqlang.github.io/jq/) tool, this way @@ -31,26 +58,35 @@ This sends events in json format to the webhook url Below is an example for configuring webhooks in the Tracee output section: -``` +**yaml** + +```yaml output: - # webhook: - # - webhook1: - # protocol: http - # host: localhost - # port: 8000 - # timeout: 5s - # gotemplate: /path/to/template/test.tmpl - # content-type: application/json - # - webhook2: - # protocol: http - # host: localhost - # port: 9000 - # timeout: 3s - # gotemplate: /path/to/template/test.tmpl - # content-type: application/json -``` - -Note: Please ensure that the respective fields will have to be uncommented. + destinations: + - name: webhook1 + type: webhook + url: http://localhost:8080?timeout=5s + format: gotemplate=/path/to/template/test.tmpl + + - name: webhook2 + type: webhook + url: http://localhost:9000 + format: gotemplate=/path/to/template/test.tmpl +``` + +**cli** + +```bash +tracee --output destinations.webhook1.type=webhook \ + --output destinations.webhook1.url=http://localhost:8080?timeout=5s \ + --output destinations.webhook1.format=gotemplate=/path/to/template/test.tmpl \ + --output destinations.webhook2.type=webhook \ + --output destinations.webhook2.url=http://localhost:9000 \ + --output destinations.webhook2.format=gotemplate=/path/to/template/test.tmpl +``` + +Note: `gotemplate=/path/to/template.tmpl` can be specified in `format` and as a parameter in the webhook url as well. +Be aware that the url parameters has the priority on the format. ### Forward @@ -58,54 +94,111 @@ This sends events to a FluentBit receiver. More information on FluentBit can be Below is an example for forwarding Tracee output: -``` +**yaml** + +```yaml output: - # forward: - # - forward1: - # protocol: tcp - # user: user - # password: pass - # host: 127.0.0.1 - # port: 24224 - # tag: tracee1 - # - forward2: - # protocol: udp - # user: user - # password: pass - # host: 127.0.0.1 - # port: 24225 - # tag: tracee2 -``` - -Note: Please ensure that the respective fields will have to be uncommented. + destinations: + - name: forward1 + type: forward + url: tpc://user:password@localhost:24224?tag=tracee1 + format: gotemplate=/path/to/template/test.tmpl + + - name: forward2 + type: forward + url: http://localhost:24224?tag=tracee2 + format: json +``` + +**cli** + +```bash +tracee --output destinations.forward1.type=forward \ + --output destinations.forward1.url=tpc://user:password@localhost:24224?tag=tracee1 \ + --output destinations.forward1.format=gotemplate=/path/to/template/test.tmpl \ + --output destinations.forward2.type=forward \ + --output destinations.forward2.url=http://localhost:24224?tag=tracee2 \ + --output destinations.forward2.format=json +``` ### Table Displays output events in table format. The default path to a file is stdout. +**yaml** + +```yaml +output: + destinations: + - name: stdout_table_destination + type: file + format: table + path: stdout +``` + +**cli** + +```bash +tracee --output destinations.stdout_table_destination.type=file \ + --output destinations.stdout_table_destination.path=stdout \ + --output destinations.stdout_table_destination.format=table +``` + +or + +**yaml** + ```yaml output: - table: - files: - - /path/to/table1.out - - /path/to/table2.out + destinations: + - name: stdout_table_destination ``` -Note: the `files: key` must also be defined, even if it's just for stdout. This is mandatory for the parser. +**cli** + +```bash +tracee --output destinations.stdout_table_destination.format=table +``` ### Table (Verbose) Displays the output events in table format with extra fields per event. The default path to a file is stdout. +**yaml** + +```yaml +output: + destinations: + - name: stdout_table_verbose_destination + type: file + format: table-verbose + path: stdout +``` + +**cli** + +```bash +tracee --output destinations.stdout_table_verbose_destination.type=file \ + --output destinations.stdout_table_verbose_destination.path=stdout \ + --output destinations.stdout_table_verbose_destination.format=table-verbose +``` + +or a smaller version without default values explicitly declared + +**yaml** ```yaml output: - table-verbose: - files: - - stdout + destinations: + - name: stdout_table_verbose_destination + format: table-verbose ``` -Note: the `files: key` must also be defined, even if it's just for stdout. This is mandatory for the parser. +**cli** + +```bash +tracee --output destinations.stdout_table_verbose_destination.format=table-verbose +``` ### GOTEMPLATE @@ -117,11 +210,42 @@ For example templates, see the templates directory in the source repository. The following sections can be specified as part of go templates: -``` +```yaml output: - # gotemplate: - # template: /path/to/my_template1.tmpl - # files: - # - /path/to/output1.out - # - /path/to/output2.out + destinations: + - name: file_destination_1 + type: file + format: gotemplate=/path/to/template_1.tmpl + path: /path/to/file.log + + - name: file_destination_2 + type: file + format: gotemplate=/path/to/template_2.tmpl + path: /path/to/file_2.log ``` + +or the following flags can be used: + +```bash +tracee --output destinations.stdout_destination_1.type=file \ + --output destinations.stdout_destination_1.format=gotemplate=/path/to/template_1.tmpl \ + --output destinations.stdout_destination_1.path=/path/to/file.log \ + --output destinations.stdout_destination_2.type=file \ + --output destinations.stdout_destination_2.format=gotemplate=/path/to/template_2.tmpl \ + --output destinations.stdout_destination_2.path=/path/to/file_2.log \ +``` + +## CLI flags + +A destination can be configured using CLI flags as well. The format of a flag is `--output destinations..=`. + +### Available fields + +| Field | Usage | Default | +| :------: | -------------------------------------------------------------------------------------------------- | ------------------------------------------------ | +| type | type of the destination. One of `file`, `webhook` or `forward`. | file | +| format | format of the event. One of `json`, `table`, `table-verbose` or gotemplate=/path/to/template.yaml. | `table` for file, `json` for webhook and forward | +| url | only for `webhook` and `forward` specify the destination url. | | +| path | only for `file` specify the file path to create, default to `stdout`. | | + +Note: not specifying the `type` of destination will result in default value `file` which invalidates the presence of `url` field \ No newline at end of file diff --git a/docs/docs/outputs/streams.md b/docs/docs/outputs/streams.md new file mode 100644 index 000000000000..a6c5818da809 --- /dev/null +++ b/docs/docs/outputs/streams.md @@ -0,0 +1,123 @@ +# Streams + +Streams are a Tracee mechanism that let you redirect events to different destinations with custom filters. + +Using streams allows you to build complex event-routing pipelines, applying filters based on events and policies. Stream configuration is straightforward: + +**yaml** + +```yaml +output: + destinations: + - name: json_destination + format: json + + streams: + - name: tracing + destinations: ["json_destination"] + filters: + events: [] + policies: [] + buffer: + mode: block # or drop + size: 1024 +``` + +**cli** + +```bash +tracee --output destinations.json_destination.format=json \ + --output streams.tracing.destinations=json_destination \ + --output streams.tracing.buffer.mode=block \ + --output streams.tracing.buffer.size=1024 +``` + +A stream must always reference one or more destinations. +When a stream contains multiple destinations, every event emitted by the stream is broadcast to **all** destinations. +Be aware that if one destination is slow, it may delay all others. + +**Important:** A stream can be use multiple destinations, but a destination cannot be reused in multiple streams. + +Streams are first-class citizens in Tracee. If you [declare a destination](./output-formats.md) without explicitly creating a stream, Tracee automatically creates an implicit filter-less stream to emit events to that destination. + +!!! Tip + A common pattern is to use streams to send different verbosity levels or event types to different destinations. + +## Example + +**yaml** + +```yaml +output: + destinations: + - name: json_destination + format: json + + - name: webhook_destination + format: json + url: http://localhost:8080/processes?timeout=5s + + - name: fluent_destination + format: json + url: http://localhost:2222?tag=severe + + streams: + - name: tracing + destinations: + - json_destination + + - name: processes + destinations: + - webhook_destination + filters: + events: + - sched_process_exec + - sched_process_fork + - sched_process_exit + buffer: + mode: drop + + - name: severe_events + destinations: + - fluent_destination + filters: + policies: + - severe-events-policy + buffer: + mode: drop # using drop to avoid losing events +``` + +**cli** + +```bash +tracee --output destinations.json_destination.format=json \ + --output destinations.webhook_destination.format=json \ + --output destinations.webhook_destination.type=webhook \ + --output destinations.webhook_destination.url=http://localhost:8080/processes?timeout=5s \ + --output destinations.fluent_destination.format=json \ + --output destinations.fluent_destination.type=forward \ + --output destinations.fluent_destination.url=http://localhost:2222?tag=severe \ + --output streams.tracing.destinations=json_destination \ + --output streams.processes.destinations=webhook_destination \ + --output streams.processes.filters.events=sched_process_exec,sched_process_fork,sched_process_exit \ + --output streams.processes.buffer.mode=drop \ + --output streams.severe_events.destinations=fluent_destination \ + --output streams.severe_events.filters.policies=severe-events-policy \ + --output streams.severe_events.buffer.mode=drop +``` + +## CLI flags format + +CLI flags have a the following structure: `--output streams..[.]=`. The following table describes all the available fields and subfields + +### Available fields + +| Field / Subfield | Usage | Default | +| :--------------: | ------------------------------------------------------------------------------------------------------------------- | ------- | +| destinations | list of destinations' names the stream refers to. | [] | +| filters | filters applied to the stream. | {} | +| filters.events | list of events' names to filters. | [] | +| filters.policies | list of policies' names to filter. | [] | +| buffer | buffer settings. | {} | +| buffer.size | number of maximum elements allowed in the stream buffer queue. | 1024 | +| buffer.mode | `drop` of `block`. `drop` allows dropping events when the `buffer` is full. `block` does not allow events dropping. | block | diff --git a/examples/config/global_config.json b/examples/config/global_config.json index 1cbdf7a5af98..f1ad9deb17af 100644 --- a/examples/config/global_config.json +++ b/examples/config/global_config.json @@ -15,11 +15,35 @@ "info" ], "metrics": false, - "output": [ - "json" - ], + "output": { + "destinations": [ + { + "name": "default_destination", + "type": "file", + "path": "stdout", + "url": "", + "format": "table" + } + ], + "streams": [ + { + "name": "default_stream", + "destinations": [ + "default_destination" + ], + "filters": { + "events": [], + "policies": [] + }, + "buffer": { + "mode": "block", + "size": 1024 + } + } + ] + }, "perf-buffer-size": 1024, "pprof": false, "pyroscope": false, "signatures-dir": "" -} +} \ No newline at end of file diff --git a/examples/config/global_config.yaml b/examples/config/global_config.yaml index 8cfba796c2a7..4ea12037ea94 100644 --- a/examples/config/global_config.yaml +++ b/examples/config/global_config.yaml @@ -112,56 +112,25 @@ scope: metrics: false output: - json: - files: - - stdout - - # table: - # files: - # - /path/to/table1.out - # - /path/to/table2.out - - # table-verbose: - # files: - # - stdout - - # gotemplate: - # template: /path/to/my_template1.tmpl - # files: - # - /path/to/output1.out - # - /path/to/output2.out - - # forward: - # - forward1: - # protocol: tcp - # user: user - # password: pass - # host: 127.0.0.1 - # port: 24224 - # tag: tracee1 - # - forward2: - # protocol: udp - # user: user - # password: pass - # host: 127.0.0.1 - # port: 24225 - # tag: tracee2 - - # webhook: - # - webhook1: - # protocol: http - # host: localhost - # port: 8000 - # timeout: 5s - # gotemplate: /path/to/template/test.tmpl - # content-type: application/json - # - webhook2: - # protocol: http - # host: localhost - # port: 9000 - # timeout: 3s - # gotemplate: /path/to/template/test.tmpl - # content-type: application/json + # destinations: + # - name: destination_name + # type: file | webhook | forward + # path: path in case of type == file + # url: url in case of type == webhook or type == forward + # format: json | gotemplate=/path/to/template.tpl | table | table-verbose + + # streams: + # - name: stream_name + # destinations: + # - destination_name + # filters: + # events: + # - event_to_filter + # policies: + # - policy_to_filter + # buffer: + # mode: drop + # size: 1024 # options: # none: false diff --git a/pkg/cmd/cobra/cobra.go b/pkg/cmd/cobra/cobra.go index e203aa8c435a..34f263e51fad 100644 --- a/pkg/cmd/cobra/cobra.go +++ b/pkg/cmd/cobra/cobra.go @@ -14,7 +14,6 @@ import ( "github.com/aquasecurity/tracee/pkg/cmd/flags/server" "github.com/aquasecurity/tracee/pkg/cmd/initialize" "github.com/aquasecurity/tracee/pkg/cmd/initialize/sigs" - "github.com/aquasecurity/tracee/pkg/cmd/printer" "github.com/aquasecurity/tracee/pkg/config" "github.com/aquasecurity/tracee/pkg/events" "github.com/aquasecurity/tracee/pkg/k8s" @@ -253,21 +252,7 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) { } cfg.InitialPolicies = ps - // Output command line flags - - outputFlags, err := flags.GetFlagsFromViper("output") - if err != nil { - return runner, err - } - - output, err := flags.PrepareOutput(outputFlags) - if err != nil { - return runner, err - } - cfg.Output = output.TraceeConfig - - // Create printer - + // Output containerFilterEnabled := func() bool { for _, p := range initialPolicies { if p.ContainerFilterEnabled() { @@ -278,16 +263,22 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) { return false } - p, err := printer.NewBroadcast( - output.PrinterConfigs, - cmd.GetContainerMode(containerFilterEnabled(), cfg.NoContainersEnrich), - ) + outputFlags, err := flags.GetFlagsFromViper("output") if err != nil { return runner, err } - // Check kernel lockdown + containerMode := cmd.GetContainerMode( + containerFilterEnabled(), cfg.NoContainersEnrich) + output, err := flags.PrepareOutput(outputFlags, containerMode) + if err != nil { + return runner, err + } + + cfg.Output = output + + // Check kernel lockdown lockdown, err := environment.Lockdown() if err != nil { logger.Debugw("OSInfo", "lockdown", err) @@ -339,7 +330,6 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) { cfg.MetricsEnabled = runner.HTTP.MetricsEndpointEnabled() cfg.HealthzEnabled = runner.HTTP.HealthzEnabled() runner.TraceeConfig = cfg - runner.Printer = p runner.InstallPath = traceeInstallPath noSignaturesMode := viper.GetBool("no-signatures") diff --git a/pkg/cmd/flags/config.go b/pkg/cmd/flags/config.go index 474642c10b98..2aeed29b6b64 100644 --- a/pkg/cmd/flags/config.go +++ b/pkg/cmd/flags/config.go @@ -345,14 +345,52 @@ func getLogFilterAttrFlags(filterOut bool, attrs LogFilterAttributes) []string { // output flag // +type StreamBufferMode string + +const ( + StreamBufferBlock StreamBufferMode = "block" + StreamBufferDrop StreamBufferMode = "drop" +) + +type StreamFiltersConfig struct { + Policies []string `mapstructure:"policies"` + Events []string `mapstructure:"events"` +} + +type StreamBufferConfig struct { + Size int `mapstructure:"size"` + Mode StreamBufferMode `mapstructure:"mode"` +} + +type StreamConfig struct { + Name string `mapstructure:"name"` + Destinations []string `mapstructure:"destinations"` + Filters StreamFiltersConfig `mapstructure:"filters"` + Buffer StreamBufferConfig `mapstructure:"buffer"` +} + +type DestinationsConfig struct { + Name string `mapstructure:"name"` + Type string `mapstructure:"type"` + Format string `mapstructure:"format"` + Path string `mapstructure:"path"` + Url string `mapstructure:"url"` +} + +type OutputOptsConfig struct { + None bool `mapstructure:"none"` + StackAddresses bool `mapstructure:"stack-addresses"` + ExecEnv bool `mapstructure:"exec-env"` + ExecHash string `mapstructure:"exec-hash"` + ParseArguments bool `mapstructure:"parse-arguments"` + ParseArgumentsFDs bool `mapstructure:"parse-arguments-fds"` + SortEvents bool `mapstructure:"sort-events"` +} + type OutputConfig struct { - Options OutputOptsConfig `mapstructure:"options"` - Table OutputFormatConfig `mapstructure:"table"` - TableVerbose OutputFormatConfig `mapstructure:"table-verbose"` - JSON OutputFormatConfig `mapstructure:"json"` - GoTemplate OutputGoTemplateConfig `mapstructure:"gotemplate"` - Forwards map[string]OutputForwardConfig `mapstructure:"forward"` - Webhooks map[string]OutputWebhookConfig `mapstructure:"webhook"` + Options OutputOptsConfig `mapstructure:"options"` + Destinations []DestinationsConfig `mapstructure:"destinations"` + Streams []StreamConfig `mapstructure:"streams"` } func (c *OutputConfig) flags() []string { @@ -381,102 +419,47 @@ func (c *OutputConfig) flags() []string { flags = append(flags, "option:sort-events") } - // formats with files - formatFilesMap := map[string][]string{ - "table": c.Table.Files, - "table-verbose": c.TableVerbose.Files, - "json": c.JSON.Files, - } - for format, files := range formatFilesMap { - for _, file := range files { - flags = append(flags, fmt.Sprintf("%s:%s", format, file)) + // destinations + for _, destination := range c.Destinations { + if destination.Format != "" { + flags = append(flags, fmt.Sprintf("destinations.%s.format=%s", destination.Name, destination.Format)) } - } - // gotemplate - if c.GoTemplate.Template != "" { - templateFlag := fmt.Sprintf("gotemplate=%s", c.GoTemplate.Template) - if len(c.GoTemplate.Files) > 0 { - templateFlag += ":" + strings.Join(c.GoTemplate.Files, ",") + if destination.Type != "" { + flags = append(flags, fmt.Sprintf("destinations.%s.type=%s", destination.Name, destination.Type)) } - flags = append(flags, templateFlag) - } - - // forward - for forwardName, forward := range c.Forwards { - _ = forwardName - url := fmt.Sprintf("%s://", forward.Protocol) - - if forward.User != "" && forward.Password != "" { - url += fmt.Sprintf("%s:%s@", forward.User, forward.Password) + if destination.Path != "" { + flags = append(flags, fmt.Sprintf("destinations.%s.path=%s", destination.Name, destination.Path)) } - url += fmt.Sprintf("%s:%d", forward.Host, forward.Port) - - if forward.Tag != "" { - url += fmt.Sprintf("?tag=%s", forward.Tag) + if destination.Url != "" { + flags = append(flags, fmt.Sprintf("destinations.%s.url=%s", destination.Name, destination.Url)) } - - flags = append(flags, fmt.Sprintf("forward:%s", url)) } - // webhook - for webhookName, webhook := range c.Webhooks { - _ = webhookName - delim := "?" - url := fmt.Sprintf("%s://%s:%d", webhook.Protocol, webhook.Host, webhook.Port) - if webhook.Timeout != "" { - url += fmt.Sprintf("%stimeout=%s", delim, webhook.Timeout) - delim = "&" + // streams + for _, stream := range c.Streams { + if stream.Buffer.Mode != "" { + flags = append(flags, fmt.Sprintf("streams.%s.buffer.mode=%s", stream.Name, stream.Buffer.Mode)) } - if webhook.GoTemplate != "" { - url += fmt.Sprintf("%sgotemplate=%s", delim, webhook.GoTemplate) - delim = "&" + + if stream.Buffer.Size >= 0 { + flags = append(flags, fmt.Sprintf("streams.%s.buffer.size=%d", stream.Name, stream.Buffer.Size)) } - if webhook.ContentType != "" { - url += fmt.Sprintf("%scontentType=%s", delim, webhook.ContentType) + + if len(stream.Destinations) > 0 { + flags = append(flags, fmt.Sprintf("streams.%s.destinations=%s", stream.Name, strings.Join(stream.Destinations, ","))) } - flags = append(flags, fmt.Sprintf("webhook:%s", url)) + if len(stream.Filters.Events) > 0 { + flags = append(flags, fmt.Sprintf("streams.%s.filters.events=%s", stream.Name, strings.Join(stream.Filters.Events, ","))) + } + + if len(stream.Filters.Policies) > 0 { + flags = append(flags, fmt.Sprintf("streams.%s.filters.policies=%s", stream.Name, strings.Join(stream.Filters.Policies, ","))) + } } return flags } - -type OutputOptsConfig struct { - None bool `mapstructure:"none"` - StackAddresses bool `mapstructure:"stack-addresses"` - ExecEnv bool `mapstructure:"exec-env"` - ExecHash string `mapstructure:"exec-hash"` - ParseArguments bool `mapstructure:"parse-arguments"` - ParseArgumentsFDs bool `mapstructure:"parse-arguments-fds"` - SortEvents bool `mapstructure:"sort-events"` -} - -type OutputFormatConfig struct { - Files []string `mapstructure:"files"` -} - -type OutputGoTemplateConfig struct { - Template string `mapstructure:"template"` - Files []string `mapstructure:"files"` -} - -type OutputForwardConfig struct { - Protocol string `mapstructure:"protocol"` - User string `mapstructure:"user"` - Password string `mapstructure:"password"` - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Tag string `mapstructure:"tag"` -} - -type OutputWebhookConfig struct { - Protocol string `mapstructure:"protocol"` - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Timeout string `mapstructure:"timeout"` - GoTemplate string `mapstructure:"gotemplate"` - ContentType string `mapstructure:"content-type"` -} diff --git a/pkg/cmd/flags/config_test.go b/pkg/cmd/flags/config_test.go index 6b080447b064..20349c1ea8c3 100644 --- a/pkg/cmd/flags/config_test.go +++ b/pkg/cmd/flags/config_test.go @@ -243,78 +243,44 @@ output: }, }, { - name: "Test output configuration (structured flags)", + name: "Test output configuration (config file)", yamlContent: ` output: - options: - none: false - stack-addresses: true - exec-env: true - exec-hash: dev-inode - parse-arguments: true - parse-arguments-fds: true - sort-events: true - table: - files: - - file1 - table-verbose: - files: - - stdout - json: - files: - - /path/to/json1.out - gotemplate: - template: template1 - files: - - file3 - - file4 - forward: - - forward1: - protocol: tcp - user: user - password: pass - host: 127.0.0.1 - port: 24224 - tag: tracee1 - - forward2: - protocol: udp - user: user - password: pass - host: 127.0.0.1 - port: 24225 - tag: tracee2 - webhook: - - webhook1: - protocol: http - host: localhost - port: 8000 - timeout: 5s - gotemplate: /path/to/template1 - content-type: application/json - - webhook2: - protocol: http - host: localhost - port: 9000 - timeout: 3s - gotemplate: /path/to/template2 - content-type: application/ld+json + destinations: + - name: d1 + type: file + format: json + path: stdout + - name: d2 + type: webhook + format: json + url: http://localhost:8080 + streams: + - name: s1 + destinations: + - d1 + buffer: + size: 1024 + mode: drop + filters: + events: + - e1 + policies: + - p1 `, key: "output", expectedFlags: []string{ - "option:stack-addresses", - "option:exec-env", - "option:exec-hash=dev-inode", - "option:parse-arguments", - "option:parse-arguments-fds", - "option:sort-events", - "table:file1", - "table-verbose:stdout", - "json:/path/to/json1.out", - "gotemplate=template1:file3,file4", - "forward:tcp://user:pass@127.0.0.1:24224?tag=tracee1", - "forward:udp://user:pass@127.0.0.1:24225?tag=tracee2", - "webhook:http://localhost:8000?timeout=5s&gotemplate=/path/to/template1&contentType=application/json", - "webhook:http://localhost:9000?timeout=3s&gotemplate=/path/to/template2&contentType=application/ld+json", + "destinations.d1.type=file", + "destinations.d1.format=json", + "destinations.d1.path=stdout", + "destinations.d2.type=webhook", + "destinations.d2.format=json", + "destinations.d2.url=http://localhost:8080", + "streams.s1.destinations=d1", + "streams.s1.buffer.size=1024", + "streams.s1.buffer.mode=drop", + "streams.s1.filters.events=e1", + "streams.s1.filters.policies=p1", }, }, { @@ -863,165 +829,6 @@ func TestLogConfigFlags(t *testing.T) { // output // -func TestOutputConfigFlags(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - config OutputConfig - expected []string - }{ - { - name: "empty config", - config: OutputConfig{}, - expected: []string{}, - }, - { - name: "options set", - config: OutputConfig{ - Options: OutputOptsConfig{ - None: true, - StackAddresses: true, - ExecEnv: true, - ExecHash: "dev-inode", - ParseArguments: true, - ParseArgumentsFDs: true, - SortEvents: true, - }, - }, - expected: []string{ - "none", - "option:stack-addresses", - "option:exec-env", - "option:exec-hash=dev-inode", - "option:parse-arguments", - "option:parse-arguments-fds", - "option:sort-events", - }, - }, - { - name: "formats set", - config: OutputConfig{ - Table: OutputFormatConfig{ - Files: []string{"file1"}, - }, - JSON: OutputFormatConfig{ - Files: []string{"file2"}, - }, - }, - expected: []string{ - "table:file1", - "json:file2", - }, - }, - { - name: "gotemplate set", - config: OutputConfig{ - GoTemplate: OutputGoTemplateConfig{ - Template: "template1", - Files: []string{"file3", "file4"}, - }, - }, - expected: []string{ - "gotemplate=template1:file3,file4", - }, - }, - { - name: "test forward with tag", - config: OutputConfig{ - Forwards: map[string]OutputForwardConfig{ - "example1": { - Protocol: "tcp", - User: "", - Password: "", - Host: "example.com", - Port: 8080, - Tag: "sample", - }, - }, - }, - expected: []string{ - "forward:tcp://example.com:8080?tag=sample", - }, - }, - { - name: "test forward with user and password", - config: OutputConfig{ - Forwards: map[string]OutputForwardConfig{ - "example2": { - Protocol: "tcp", - User: "user123", - Password: "pass123", - Host: "secure.com", - Port: 443, - Tag: "", - }, - }, - }, - expected: []string{ - "forward:tcp://user123:pass123@secure.com:443", - }, - }, - { - name: "test webhook with all fields", - config: OutputConfig{ - Webhooks: map[string]OutputWebhookConfig{ - "example3": { - Protocol: "http", - Host: "webhook.com", - Port: 9090, - Timeout: "5s", - GoTemplate: "/path/to/template1", - ContentType: "application/json", - }, - }, - }, - expected: []string{ - "webhook:http://webhook.com:9090?timeout=5s&gotemplate=/path/to/template1&contentType=application/json", - }, - }, - { - name: "test combined forward and webhook", - config: OutputConfig{ - Forwards: map[string]OutputForwardConfig{ - "example4": { - Protocol: "http", - User: "", - Password: "", - Host: "combined.com", - Port: 8000, - Tag: "taggy", - }, - }, - Webhooks: map[string]OutputWebhookConfig{ - "example5": { - Protocol: "http", - Host: "hooky.com", - Port: 8088, - Timeout: "10s", - }, - }, - }, - expected: []string{ - "forward:http://combined.com:8000?tag=taggy", - "webhook:http://hooky.com:8088?timeout=10s", - }, - }, - } - - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - got := tt.config.flags() - if !slicesEqualIgnoreOrder(got, tt.expected) { - t.Errorf("flags() = %v, want %v", got, tt.expected) - } - }) - } -} func TestServerConfigFlags(t *testing.T) { t.Parallel() diff --git a/pkg/cmd/flags/errors.go b/pkg/cmd/flags/errors.go index 4a6ef2f6240b..e1b2ce25867c 100644 --- a/pkg/cmd/flags/errors.go +++ b/pkg/cmd/flags/errors.go @@ -77,3 +77,30 @@ func UnrecognizedOutputFormatError(format string) error { func UnsupportedContainerRuntimeError() error { return errors.New("unsupported container runtime in sockets flag (see 'tracee man containers' for supported runtimes)") } + +func DestinationFlagIncorrectError(flag string) error { + return fmt.Errorf("destination flag format incorrect %s", flag) +} + +func WrongFunctionInvocationError(functionName, flag string) error { + return fmt.Errorf("%s function called on wrong flag %s", functionName, flag) +} + +func InvalidDestinationFieldError(field, value, destinationName string) error { + return fmt.Errorf("validation error: destination %s %s not valid for destination %s", + field, value, destinationName) +} + +func MandatoryDestinationFieldError(destinationType, destinationName string) error { + return fmt.Errorf("validation error: url is mandatory for %s in destination %s", destinationName, destinationType) +} + +func StreamFlagIncorrect(flag string) error { + return fmt.Errorf("stream flag format incorrect %s", flag) +} + +func DestinationNotFoundError(destinationName, streamName string) error { + return fmt.Errorf( + "destination %s references in stream %s was not declared in destinations", + destinationName, streamName) +} diff --git a/pkg/cmd/flags/output.go b/pkg/cmd/flags/output.go index a37007c6436f..f7da49d73076 100644 --- a/pkg/cmd/flags/output.go +++ b/pkg/cmd/flags/output.go @@ -4,6 +4,7 @@ import ( "net/url" "os" "path/filepath" + "strconv" "strings" "github.com/aquasecurity/tracee/common/digest" @@ -11,25 +12,39 @@ import ( "github.com/aquasecurity/tracee/pkg/config" ) -type PrepareOutputResult struct { - TraceeConfig *config.OutputConfig - PrinterConfigs []config.PrinterConfig -} - -func PrepareOutput(outputSlice []string) (PrepareOutputResult, error) { - outConfig := PrepareOutputResult{} +func PrepareOutput(outputSlice []string, containerMode config.ContainerMode) (*config.OutputConfig, error) { traceeConfig := &config.OutputConfig{} // outpath:format - printerMap := make(map[string]string) + destinationMap := make(map[string]string) + streamsFlags := []string{} + declaredDestinations := map[string]*config.Destination{} + // This for loop handle the simple output cases. Backward compatible with --output flags for _, o := range outputSlice { + if strings.HasPrefix(o, "streams.") { + // skip streams computation because we need to have all + // the destinations ready before processing them + + streamsFlags = append(streamsFlags, o) + continue + } + + if strings.HasPrefix(o, "destinations.") { + err := parseDestinationFlag(o, declaredDestinations) + if err != nil { + return nil, err + } + + continue + } + outputParts := strings.SplitN(o, ":", 2) if strings.HasPrefix(outputParts[0], "gotemplate=") { - err := parseFormat(outputParts, printerMap) + err := parseFormat(outputParts, destinationMap) if err != nil { - return outConfig, err + return nil, err } continue } @@ -37,74 +52,325 @@ func PrepareOutput(outputSlice []string) (PrepareOutputResult, error) { switch outputParts[0] { case "none": if len(outputParts) > 1 { - return outConfig, NoneOutputPathError() + return nil, NoneOutputPathError() } - printerMap["stdout"] = "ignore" + destinationMap["stdout"] = "ignore" case "table", "table-verbose", "json": - err := parseFormat(outputParts, printerMap) + err := parseFormat(outputParts, destinationMap) if err != nil { - return outConfig, err + return nil, err } case "forward": err := validateURL(outputParts, "forward") if err != nil { - return outConfig, err + return nil, err } - printerMap[outputParts[1]] = "forward" + destinationMap[outputParts[1]] = "forward" case "webhook": err := validateURL(outputParts, "webhook") if err != nil { - return outConfig, err + return nil, err } - printerMap[outputParts[1]] = "webhook" + destinationMap[outputParts[1]] = "webhook" case "option": err := parseOption(outputParts, traceeConfig) if err != nil { - return outConfig, err + return nil, err } default: - return outConfig, InvalidOutputFlagError(outputParts[0]) + return nil, InvalidOutputFlagError(outputParts[0]) } } - // default - if len(printerMap) == 0 { - printerMap["stdout"] = "table" + if err := validateOrDefaults(declaredDestinations); err != nil { + return nil, err } - printerConfigs, err := getPrinterConfigs(printerMap, traceeConfig) + declaredStreams := map[string]*config.Stream{} + for _, flag := range streamsFlags { + err := parseStreamFlag(flag, declaredStreams, declaredDestinations) + if err != nil { + return nil, err + } + } + + for _, stream := range declaredStreams { + traceeConfig.Streams = append(traceeConfig.Streams, *stream) + } + + // Create streams for destinations without one + usedDestinations := map[string]struct{}{} + for _, s := range declaredStreams { + for _, streamDest := range s.Destinations { + usedDestinations[streamDest.Name] = struct{}{} + } + } + + unusedDestinations := []config.Destination{} + for _, dd := range declaredDestinations { + _, ok := usedDestinations[dd.Name] + if ok { + continue + } + + unusedDestinations = append(unusedDestinations, *dd) + } + + if len(declaredDestinations) == 0 && len(destinationMap) == 0 { + destinationMap["stdout"] = "table" + } + + destinationConfigs, err := getDestinationConfigs(destinationMap, traceeConfig, containerMode) if err != nil { - return outConfig, err + return nil, err } - outConfig.TraceeConfig = traceeConfig - outConfig.PrinterConfigs = printerConfigs + if len(destinationConfigs) > 0 || len(unusedDestinations) > 0 { + traceeConfig.Streams = append(traceeConfig.Streams, config.Stream{ + Name: "default-stream", + Destinations: append(destinationConfigs, unusedDestinations...), + }) + } - return outConfig, nil + return traceeConfig, nil } -func PreparePrinterConfig(printerKind string, outputPath string) (config.PrinterConfig, error) { +func parseDestinationFlag(flag string, existing map[string]*config.Destination) error { + parts := strings.SplitN(flag, "=", 2) + if len(parts) < 2 { + return DestinationFlagIncorrectError(flag) + } + flagValue := parts[1] + + flagNameParts := strings.Split(parts[0], ".") + + if flagNameParts[0] != "destinations" { + return WrongFunctionInvocationError("parseDestinationFlag()", flag) + } + + if len(flagNameParts) != 3 { + return DestinationFlagIncorrectError(flag) + } + + destinationName := flagNameParts[1] + if destinationName == "" { + return DestinationFlagIncorrectError(flag) + } + + if _, ok := existing[destinationName]; !ok { + conf := config.Destination{ + Name: destinationName, + } + + existing[destinationName] = &conf + } + + destinationConfig := existing[destinationName] + + destinationField := flagNameParts[2] + switch destinationField { + case "type": + destinationConfig.Type = flagValue + case "format": + destinationConfig.Format = flagValue + case "path": + destinationConfig.Path = flagValue + case "url": + destinationConfig.Url = flagValue + default: + return DestinationFlagIncorrectError(flag) + } + + return nil +} + +func validateOrDefaults(destinations map[string]*config.Destination) error { + for _, d := range destinations { + if d.Type == "" { + d.Type = "file" + } + + if d.Type == "file" && d.Format == "" { + d.Format = "table" + } + + if d.Type == "file" && d.Path == "" { + d.Path = "stdout" + } + + if (d.Type == "webhook" || d.Type == "forward") && + d.Format == "" { + d.Format = "json" + } + + if (d.Type == "webhook" || d.Type == "forward") && + d.Url == "" { + return MandatoryDestinationFieldError(d.Type, d.Name) + } + + if d.Format != "json" && d.Format != "table" && + d.Format != "table-verbose" && !strings.HasPrefix(d.Format, "gotemplate=") { + return InvalidDestinationFieldError("format", d.Format, d.Name) + } + + if d.Type != "file" && d.Type != "webhook" && d.Type != "forward" { + return InvalidDestinationFieldError("type", d.Type, d.Name) + } + + if d.Type == "file" { + d.File = os.Stdout + + if d.Path != "stdout" && d.Path != "" { + outputFile, err := CreateOutputFile(d.Path) + if err != nil { + return err + } + + d.File = outputFile + } + } + } + + return nil +} + +func parseStreamFlag(flag string, existing map[string]*config.Stream, + destinations map[string]*config.Destination) error { + parts := strings.SplitN(flag, "=", 2) + if len(parts) < 2 { + return StreamFlagIncorrect(flag) + } + flagValue := parts[1] + + flagNameParts := strings.Split(parts[0], ".") + if len(flagNameParts) < 3 || len(flagNameParts) > 4 { + return StreamFlagIncorrect(flag) + } + + if flagNameParts[0] != "streams" { + return WrongFunctionInvocationError("parseStreamFlag()", flag) + } + + if _, ok := existing[flagNameParts[1]]; !ok { + conf := config.Stream{ + Name: flagNameParts[1], + } + + existing[flagNameParts[1]] = &conf + } + + streamConfig := existing[flagNameParts[1]] + + switch flagNameParts[2] { + case "destinations": + destinationNames := strings.Split(flagValue, ",") + for _, destinationName := range destinationNames { + destinationConfig, ok := destinations[destinationName] + if !ok { + return DestinationNotFoundError(destinationName, streamConfig.Name) + } + + streamConfig.Destinations = append(streamConfig.Destinations, *destinationConfig) + } + case "filters": + if len(flagNameParts) != 4 { + return StreamFlagIncorrect(flag) + } + + switch flagNameParts[3] { + case "events": + streamConfig.Filters.Events = strings.Split(flagValue, ",") + case "policies": + streamConfig.Filters.Policies = strings.Split(flagValue, ",") + } + case "buffer": + if len(flagNameParts) != 4 { + return StreamFlagIncorrect(flag) + } + + switch flagNameParts[3] { + case "mode": + if flagValue != string(config.StreamBufferBlock) && flagValue != string(config.StreamBufferDrop) { + return StreamFlagIncorrect(flag) + } + + streamConfig.Buffer.Mode = config.StreamBufferMode(flagValue) + case "size": + size, err := strconv.Atoi(flagValue) + if err != nil { + return StreamFlagIncorrect(flag) + } + + streamConfig.Buffer.Size = size + } + default: + return StreamFlagIncorrect(flag) + } + + return nil +} + +func getWebhookFormat(webhookUrl string) string { + urlParts := strings.Split(webhookUrl, "?") + if len(urlParts) == 1 || urlParts[1] == "" { + return "json" + } + + queryParams := strings.SplitSeq(urlParts[1], "&") + for part := range queryParams { + if strings.HasPrefix(part, "gotemplate") { + return part + } + } + + return "json" +} + +func PreparePrinterConfig(printerKind string, outputPath string) (config.Destination, error) { + if printerKind == "ignore" { + return config.Destination{ + Name: "ignore", + Type: printerKind, + Path: "stdout", // here because tests expect `stdout` as a default value but I believe it can be removed + }, nil + } + + var dest config.Destination outFile := os.Stdout var err error - if outputPath != "stdout" && outputPath != "" && printerKind != "forward" && printerKind != "webhook" { - outFile, err = CreateOutputFile(outputPath) - if err != nil { - return config.PrinterConfig{}, err + isFile := outputPath != "" && printerKind != "forward" && printerKind != "webhook" + + if printerKind == "webhook" { + dest.Format = getWebhookFormat(outputPath) + } + + dest.Type = printerKind + dest.Url = outputPath + dest.Name = outputPath + printerKind + + if isFile { + if outputPath != "stdout" { + outFile, err = CreateOutputFile(outputPath) + if err != nil { + return config.Destination{}, err + } } + + dest.File = outFile + dest.Format = printerKind + dest.Path = outputPath + dest.Type = "file" + dest.Url = "" // clear unused URL } - return config.PrinterConfig{ - Kind: printerKind, - OutPath: outputPath, - OutFile: outFile, - }, nil + return dest, nil } -// setOption sets the given option in the given config -func setOption(cfg *config.OutputConfig, option string) error { +// SetOption sets the given option in the given config +func SetOption(cfg *config.OutputConfig, option string) error { switch option { case "stack-addresses": cfg.StackAddresses = true @@ -154,20 +420,29 @@ func setOption(cfg *config.OutputConfig, option string) error { return nil } -// getPrinterConfigs returns a slice of printer.Configs based on the given printerMap -func getPrinterConfigs(printerMap map[string]string, traceeConfig *config.OutputConfig) ([]config.PrinterConfig, error) { - printerConfigs := make([]config.PrinterConfig, 0, len(printerMap)) +// getDestinationConfigs returns a slice of printer.Configs based on the given printerMap +func getDestinationConfigs(printerMap map[string]string, traceeConfig *config.OutputConfig, + containerMode config.ContainerMode) ([]config.Destination, error) { + printerConfigs := make([]config.Destination, 0, len(printerMap)) for outPath, printerKind := range printerMap { + if printerKind == "ignore" { + continue + } + if printerKind == "table" { - if err := setOption(traceeConfig, "parse-arguments"); err != nil { + if err := SetOption(traceeConfig, "parse-arguments"); err != nil { return nil, err } } + printerCfg, err := PreparePrinterConfig(printerKind, outPath) if err != nil { return nil, err } + + printerCfg.ContainerMode = containerMode + printerConfigs = append(printerConfigs, printerCfg) } @@ -202,7 +477,7 @@ func parseOption(outputParts []string, traceeConfig *config.OutputConfig) error } for _, option := range strings.Split(outputParts[1], ",") { - err := setOption(traceeConfig, option) + err := SetOption(traceeConfig, option) if err != nil { return err } diff --git a/pkg/cmd/flags/output_test.go b/pkg/cmd/flags/output_test.go index e15086a0975d..a6ad05626145 100644 --- a/pkg/cmd/flags/output_test.go +++ b/pkg/cmd/flags/output_test.go @@ -2,6 +2,7 @@ package flags import ( "os" + "path" "strings" "testing" @@ -14,11 +15,12 @@ import ( func TestPrepareOutput(t *testing.T) { t.Parallel() + tempDir := t.TempDir() testCases := []struct { testName string outputSlice []string - expectedOutput PrepareOutputResult + expectedOutput config.OutputConfig expectedError error }{ // validations @@ -51,102 +53,134 @@ func TestPrepareOutput(t *testing.T) { { testName: "default format", outputSlice: []string{}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, { testName: "table to stdout", outputSlice: []string{"table"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, { testName: "table to /tmp/table", outputSlice: []string{"table:/tmp/table"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "/tmp/table"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "/tmp/tabletable", Type: "file", Format: "table", Path: "/tmp/table"}, + }, + }, }, }, }, { testName: "table to stdout, and to /tmp/table", outputSlice: []string{"table", "table:/tmp/table"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - {Kind: "table", OutPath: "/tmp/table"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + {Name: "/tmp/tabletable", Type: "file", Format: "table", Path: "/tmp/table"}, + }, + }, }, }, }, { testName: "json to stdout", outputSlice: []string{"json"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "json", OutPath: "stdout"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdoutjson", Type: "file", Format: "json", Path: "stdout"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, { testName: "json to /tmp/json, and json to /tmp/json2", outputSlice: []string{"json:/tmp/json", "json:/tmp/json2"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "json", OutPath: "/tmp/json"}, - {Kind: "json", OutPath: "/tmp/json2"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "/tmp/jsonjson", Type: "file", Format: "json", Path: "/tmp/json"}, + {Name: "/tmp/json2json", Type: "file", Format: "json", Path: "/tmp/json2"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, { testName: "table-verbose to stdout", outputSlice: []string{"table-verbose"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table-verbose", OutPath: "stdout"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable-verbose", Type: "file", Format: "table-verbose", Path: "stdout"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, { testName: "gotemplate to stdout", outputSlice: []string{"gotemplate=template.tmpl"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "gotemplate=template.tmpl", OutPath: "stdout"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdoutgotemplate=template.tmpl", Type: "file", Format: "gotemplate=template.tmpl", Path: "stdout"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, { testName: "gotemplate to multiple files", outputSlice: []string{"gotemplate=template.tmpl:/tmp/gotemplate1,/tmp/gotemplate2"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "gotemplate=template.tmpl", OutPath: "/tmp/gotemplate1"}, - {Kind: "gotemplate=template.tmpl", OutPath: "/tmp/gotemplate2"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "/tmp/gotemplate1gotemplate=template.tmpl", Type: "file", Format: "gotemplate=template.tmpl", Path: "/tmp/gotemplate1"}, + {Name: "/tmp/gotemplate2gotemplate=template.tmpl", Type: "file", Format: "gotemplate=template.tmpl", Path: "/tmp/gotemplate2"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, { @@ -156,15 +190,18 @@ func TestPrepareOutput(t *testing.T) { "json:/tmp/json,/tmp/json2", "gotemplate=template.tmpl:/tmp/gotemplate1", }, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - {Kind: "json", OutPath: "/tmp/json"}, - {Kind: "json", OutPath: "/tmp/json2"}, - {Kind: "gotemplate=template.tmpl", OutPath: "/tmp/gotemplate1"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + {Name: "/tmp/jsonjson", Type: "file", Format: "json", Path: "/tmp/json"}, + {Name: "/tmp/json2json", Type: "file", Format: "json", Path: "/tmp/json2"}, + {Name: "/tmp/gotemplate1gotemplate=template.tmpl", Type: "file", Format: "gotemplate=template.tmpl", Path: "/tmp/gotemplate1"}, + }, + }, }, }, }, @@ -184,14 +221,9 @@ func TestPrepareOutput(t *testing.T) { expectedError: DuplicateOutputPathError("/tmp/test"), }, { - testName: "none", - outputSlice: []string{"none"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "ignore", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{}, - }, + testName: "none", + outputSlice: []string{"none"}, + expectedOutput: config.OutputConfig{}, }, { testName: "invalid value for none format", @@ -222,11 +254,15 @@ func TestPrepareOutput(t *testing.T) { { testName: "forward", outputSlice: []string{"forward:tcp://localhost:1234"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "forward", OutPath: "tcp://localhost:1234"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "tcp://localhost:1234forward", Type: "forward", Url: "tcp://localhost:1234"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, // webhook @@ -248,63 +284,94 @@ func TestPrepareOutput(t *testing.T) { { testName: "webhook", outputSlice: []string{"webhook:http://localhost:8080"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "webhook", OutPath: "http://localhost:8080"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "http://localhost:8080webhook", Type: "webhook", Url: "http://localhost:8080", Format: "json"}, + }, + }, + }, + }, + }, + { + testName: "webhook with gotemplate", + outputSlice: []string{"webhook:http://localhost:8080?gotemplate=/path/to/template"}, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "http://localhost:8080?gotemplate=/path/to/templatewebhook", Type: "webhook", + Url: "http://localhost:8080?gotemplate=/path/to/template", Format: "gotemplate=/path/to/template"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{}, }, }, // options { testName: "option stack-addresses", outputSlice: []string{"option:stack-addresses"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - StackAddresses: true, - ParseArguments: true, + expectedOutput: config.OutputConfig{ + StackAddresses: true, + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, { testName: "option exec-env", outputSlice: []string{"option:exec-env"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ExecEnv: true, - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ExecEnv: true, + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, { testName: "option exec-hash", outputSlice: []string{"option:exec-hash"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - CalcHashes: digest.CalcHashesDevInode, - ParseArguments: true, + expectedOutput: config.OutputConfig{ + CalcHashes: digest.CalcHashesDevInode, + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, { testName: "option exec-hash=inode", outputSlice: []string{"option:exec-hash=inode"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - CalcHashes: digest.CalcHashesInode, - ParseArguments: true, + expectedOutput: config.OutputConfig{ + CalcHashes: digest.CalcHashesInode, + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, @@ -321,38 +388,47 @@ func TestPrepareOutput(t *testing.T) { { testName: "option parse-arguments", outputSlice: []string{"json", "option:parse-arguments"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "json", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdoutjson", Type: "file", Format: "json", Path: "stdout"}, + }, + }, }, }, }, { testName: "option parse-arguments-fds", outputSlice: []string{"json", "option:parse-arguments-fds"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "json", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, - ParseArgumentsFDs: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + ParseArgumentsFDs: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdoutjson", Type: "file", Format: "json", Path: "stdout"}, + }, + }, }, }, }, { testName: "option sort-events", outputSlice: []string{"option:sort-events"}, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "table", OutPath: "stdout"}, - }, - TraceeConfig: &config.OutputConfig{ - ParseArguments: true, - EventsSorting: true, + expectedOutput: config.OutputConfig{ + ParseArguments: true, + EventsSorting: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdouttable", Type: "file", Format: "table", Path: "stdout"}, + }, + }, }, }, }, @@ -367,62 +443,381 @@ func TestPrepareOutput(t *testing.T) { "option:parse-arguments-fds", "option:sort-events", }, - expectedOutput: PrepareOutputResult{ - PrinterConfigs: []config.PrinterConfig{ - {Kind: "json", OutPath: "stdout"}, + expectedOutput: config.OutputConfig{ + StackAddresses: true, + ExecEnv: true, + CalcHashes: digest.CalcHashesDevInode, + ParseArguments: true, + ParseArgumentsFDs: true, + EventsSorting: true, + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "stdoutjson", Type: "file", Format: "json", Path: "stdout"}, + }, + }, }, - TraceeConfig: &config.OutputConfig{ - StackAddresses: true, - ExecEnv: true, - CalcHashes: digest.CalcHashesDevInode, - ParseArguments: true, - ParseArgumentsFDs: true, - EventsSorting: true, + }, + }, + { + testName: "define a json file destination", + outputSlice: []string{ + "destinations.d1.format=json", + "destinations.d1.type=file", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d1", Type: "file", Format: "json", Path: "stdout"}, + }, + }, }, }, }, + { + testName: "define a table file destination", + outputSlice: []string{ + "destinations.d2.format=table", + "destinations.d2.type=file", + "destinations.d2.path=" + path.Join(tempDir, "tablefile"), + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "file", Format: "table", Path: path.Join(tempDir, "tablefile")}, + }, + }, + }, + }, + }, + { + testName: "define a table file destination", + outputSlice: []string{ + "destinations.d2.format=table", + "destinations.d2.type=file", + "destinations.d2.path=" + path.Join(tempDir, "tablefile"), + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "file", Format: "table", Path: path.Join(tempDir, "tablefile")}, + }, + }, + }, + }, + }, + { + testName: "define a webhook destination", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=webhook", + "destinations.d2.url=http://localhost:8080", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "webhook", Format: "json", Url: "http://localhost:8080"}, + }, + }, + }, + }, + }, + { + testName: "define a forward destination", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + "destinations.d2.url=tcp://localhost:8080", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "forward", Format: "json", Url: "tcp://localhost:8080"}, + }, + }, + }, + }, + }, + { + testName: "define multiple destinations without streams", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + "destinations.d2.url=tcp://localhost:8080", + "destinations.d3.format=json", + "destinations.d3.path=" + path.Join(tempDir, "jsonfilemultdest"), + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "forward", Format: "json", Url: "tcp://localhost:8080"}, + {Name: "d3", Type: "file", Format: "json", Path: path.Join(tempDir, "jsonfilemultdest")}, + }, + }, + }, + }, + }, + { + testName: "define multiple destinations with different streams", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + "destinations.d2.url=tcp://localhost:8080", + "destinations.d3.format=json", + "destinations.d3.path=" + path.Join(tempDir, "jsonfilemultdest"), + "streams.s2.destinations=d2", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "s2", + Destinations: []config.Destination{ + {Name: "d2", Type: "forward", Format: "json", Url: "tcp://localhost:8080"}, + }, + }, + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d3", Type: "file", Format: "json", Path: path.Join(tempDir, "jsonfilemultdest")}, + }, + }, + }, + }, + }, + { + testName: "no default-stream", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + "destinations.d2.url=tcp://localhost:8080", + "destinations.d3.format=json", + "destinations.d3.path=" + path.Join(tempDir, "jsonfilemultdest"), + "streams.s2.destinations=d2", + "streams.s3.destinations=d3", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "s2", + Destinations: []config.Destination{ + {Name: "d2", Type: "forward", Format: "json", Url: "tcp://localhost:8080"}, + }, + }, + { + Name: "s3", + Destinations: []config.Destination{ + {Name: "d3", Type: "file", Format: "json", Path: path.Join(tempDir, "jsonfilemultdest")}, + }, + }, + }, + }, + }, + { + testName: "single stream with multiple destinations", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + "destinations.d2.url=tcp://localhost:8080", + "destinations.d3.format=json", + "destinations.d3.path=" + path.Join(tempDir, "jsonfilemultdest"), + "streams.stream.destinations=d2,d3", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "stream", + Destinations: []config.Destination{ + {Name: "d2", Type: "forward", Format: "json", Url: "tcp://localhost:8080"}, + {Name: "d3", Type: "file", Format: "json", Path: path.Join(tempDir, "jsonfilemultdest")}, + }, + }, + }, + }, + }, + { + testName: "webhook without url", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=webhook", + }, + expectedError: MandatoryDestinationFieldError("webhook", "d2"), + }, + { + testName: "forward without url", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + }, + expectedError: MandatoryDestinationFieldError("forward", "d2"), + }, + { + testName: "forward without url", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=forward", + }, + expectedError: MandatoryDestinationFieldError("forward", "d2"), + }, + { + testName: "webhook without url", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=webhook", + }, + expectedError: MandatoryDestinationFieldError("webhook", "d2"), + }, + { + testName: "invalid destination format field", + outputSlice: []string{ + "destinations.d2.format=invalid", + "destinations.d2.type=file", + }, + expectedError: InvalidDestinationFieldError("format", "invalid", "d2"), + }, + { + testName: "invalid destination type field", + outputSlice: []string{ + "destinations.d2.format=json", + "destinations.d2.type=invalid", + }, + expectedError: InvalidDestinationFieldError("type", "invalid", "d2"), + }, + { + testName: "all valid types", + outputSlice: []string{ + "destinations.d1.format=json", + "destinations.d2.type=webhook", + "destinations.d2.url=http://localhost:8080", + "destinations.d3.type=forward", + "destinations.d3.url=tcp://localhost:8080", + "destinations.d4.format=table", + "destinations.d5.format=table-verbose", + }, + expectedOutput: config.OutputConfig{ + Streams: []config.Stream{ + { + Name: "default-stream", + Destinations: []config.Destination{ + {Name: "d1", Format: "json", Path: "stdout", Type: "file"}, + {Name: "d2", Format: "json", Url: "http://localhost:8080", Type: "webhook"}, + {Name: "d3", Format: "json", Url: "tcp://localhost:8080", Type: "forward"}, + {Name: "d4", Format: "table", Path: "stdout", Type: "file"}, + {Name: "d5", Format: "table-verbose", Path: "stdout", Type: "file"}, + }, + }, + }, + }, + }, + { + testName: "wrong destination flag format", + outputSlice: []string{ + "destinations.d1.type", + }, + expectedError: DestinationFlagIncorrectError("destinations.d1.type"), + }, + { + testName: "wrong destination flag format", + outputSlice: []string{ + "destinations.d1.type.wrong=invalid", + }, + expectedError: DestinationFlagIncorrectError("destinations.d1.type.wrong=invalid"), + }, + { + testName: "wrong destination field", + outputSlice: []string{ + "destinations.d1.invalid", + }, + expectedError: DestinationFlagIncorrectError("destinations.d1.invalid"), + }, + { + testName: "wrong stream field", + outputSlice: []string{ + "destinations.d1.type=file", + "streams.s1.invalid=123", + }, + expectedError: StreamFlagIncorrect("streams.s1.invalid=123"), + }, } + for _, testcase := range testCases { // testcase := testcase t.Run(testcase.testName, func(t *testing.T) { - // t.Parallel() + t.Parallel() defer func() { - for _, printer := range testcase.expectedOutput.PrinterConfigs { - if strings.HasPrefix(printer.OutPath, "/tmp") { - _ = os.Remove(printer.OutPath) + for _, stream := range testcase.expectedOutput.Streams { + for _, destination := range stream.Destinations { + if strings.HasPrefix(destination.Path, "/tmp") { + _ = os.Remove(destination.Path) + } } } }() - output, err := PrepareOutput(testcase.outputSlice) + output, err := PrepareOutput(testcase.outputSlice, config.ContainerModeDisabled) if err != nil { require.NotNil(t, testcase.expectedError) assert.Contains(t, err.Error(), testcase.expectedError.Error()) } else { - assert.Equal(t, testcase.expectedOutput.TraceeConfig, output.TraceeConfig) + assert.Equal(t, testcase.expectedOutput.CalcHashes, output.CalcHashes) + assert.Equal(t, testcase.expectedOutput.EventsSorting, output.EventsSorting) + assert.Equal(t, testcase.expectedOutput.ExecEnv, output.ExecEnv) + assert.Equal(t, testcase.expectedOutput.ParseArguments, output.ParseArguments) + assert.Equal(t, testcase.expectedOutput.ParseArgumentsFDs, output.ParseArgumentsFDs) + assert.Equal(t, testcase.expectedOutput.StackAddresses, output.StackAddresses) + assert.Equal(t, len(testcase.expectedOutput.Streams), len(output.Streams)) - assertPrinterConfigs(t, testcase.expectedOutput.PrinterConfigs, output.PrinterConfigs) + assertPrinterConfigs(t, testcase.expectedOutput.Streams, output.Streams) } }) } } -func assertPrinterConfigs(t *testing.T, expected []config.PrinterConfig, actual []config.PrinterConfig) { +func assertPrinterConfigs(t *testing.T, expected []config.Stream, actual []config.Stream) { // use a map to compare because the order of the printers is not guaranteed - printersMap := make(map[string]config.PrinterConfig) + expectedStreamsMap := make(map[string]config.Stream) - for _, p := range expected { - printersMap[p.OutPath] = p + for _, stream := range expected { + expectedStreamsMap[stream.Name] = stream } - for _, p := range actual { - expectedPrinter, ok := printersMap[p.OutPath] + for _, actualStream := range actual { + expectedDestsMap := map[string]config.Destination{} + var expectedDest config.Destination + var ok bool + + expectedStream, ok := expectedStreamsMap[actualStream.Name] assert.True(t, ok) + for _, expectedDest := range expectedStream.Destinations { + expectedDestsMap[expectedDest.Name] = expectedDest + } + + for _, actualDest := range actualStream.Destinations { + expectedDest, ok = expectedDestsMap[actualDest.Name] + assert.True(t, ok) - assert.Equal(t, expectedPrinter.Kind, p.Kind) - assert.Equal(t, expectedPrinter.OutPath, p.OutPath) - assert.Equal(t, expectedPrinter.ContainerMode, p.ContainerMode) + assert.Equal(t, expectedDest.Type, actualDest.Type) + assert.Equal(t, expectedDest.Path, actualDest.Path) + assert.Equal(t, expectedDest.Url, actualDest.Url) + if expectedDest.Format != actualDest.Format { + t.Errorf("%+v", actual) + } + assert.Equal(t, expectedDest.Format, actualDest.Format) + assert.Equal(t, expectedDest.ContainerMode, actualDest.ContainerMode) + } } } diff --git a/pkg/cmd/printer/broadcast.go b/pkg/cmd/printer/broadcast.go index 4b6fb1d1ccea..ff32db517e69 100644 --- a/pkg/cmd/printer/broadcast.go +++ b/pkg/cmd/printer/broadcast.go @@ -1,37 +1,33 @@ package printer import ( - "sync" + "context" "github.com/aquasecurity/tracee/pkg/config" "github.com/aquasecurity/tracee/pkg/metrics" + "github.com/aquasecurity/tracee/pkg/streams" "github.com/aquasecurity/tracee/types/trace" ) // Broadcast is a printer that broadcasts events to multiple printers type Broadcast struct { - PrinterConfigs []config.PrinterConfig - printers []EventPrinter - wg *sync.WaitGroup - eventsChan []chan trace.Event - done chan struct{} - containerMode config.ContainerMode + DestinationConfigs []config.Destination + printers []EventPrinter + eventsChan []chan trace.Event + containerMode config.ContainerMode } -// NewBroadcast creates a new Broadcast printer -func NewBroadcast(printerConfigs []config.PrinterConfig, containerMode config.ContainerMode) (*Broadcast, error) { - b := &Broadcast{PrinterConfigs: printerConfigs, containerMode: containerMode} +// newBroadcast creates a new Broadcast printer +func newBroadcast(destinationConfigs []config.Destination) (*Broadcast, error) { + b := &Broadcast{DestinationConfigs: destinationConfigs} return b, b.Init() } func (b *Broadcast) Init() error { - printers := make([]EventPrinter, 0, len(b.PrinterConfigs)) - wg := &sync.WaitGroup{} + printers := make([]EventPrinter, 0, len(b.DestinationConfigs)) - for _, pConfig := range b.PrinterConfigs { - pConfig.ContainerMode = b.containerMode - - p, err := New(pConfig) + for _, dstConfig := range b.DestinationConfigs { + p, err := newSinglePrinter(dstConfig) if err != nil { return err } @@ -39,23 +35,7 @@ func (b *Broadcast) Init() error { printers = append(printers, p) } - eventsChan := make([]chan trace.Event, 0, len(printers)) - done := make(chan struct{}) - - for _, printer := range printers { - // we use a buffered channel to avoid blocking the event channel, - // we match the size of ChanEvents buffer - eventChan := make(chan trace.Event, 1000) - eventsChan = append(eventsChan, eventChan) - - wg.Add(1) - go startPrinter(wg, done, eventChan, printer) - } - b.printers = printers - b.eventsChan = eventsChan - b.wg = wg - b.done = done return nil } @@ -68,67 +48,36 @@ func (b *Broadcast) Preamble() { // Print broadcasts the event to all printers func (b *Broadcast) Print(event trace.Event) { - for _, c := range b.eventsChan { + for _, p := range b.printers { // we are blocking here if the printer is not consuming events fast enough - c <- event + p.Print(event) } } func (b *Broadcast) Epilogue(stats metrics.Stats) { - // if you execute epilogue no other events should be sent to the printers, - // so we finish the events goroutines - close(b.done) - - b.wg.Wait() - for _, p := range b.printers { p.Epilogue(stats) } } -// Close closes Broadcast printer -func (b *Broadcast) Close() { - for _, p := range b.printers { - p.Close() - } -} - -// Active reports whether the broadcast has meaningful kinds to process. -// -// It returns true if there is at least one printer kind and it's not solely "ignore". -// If no printer configurations are present or if the only kind is "ignore", -// the broadcast is considered inactive. -func (b *Broadcast) Active() bool { - kinds := b.Kinds() - - if len(kinds) == 0 || (len(kinds) == 1 && kinds[0] == "ignore") { - return false +func (b *Broadcast) FromStream(ctx context.Context, stream *streams.Stream) { + for { + select { + case <-ctx.Done(): + return + case e := <-stream.ReceiveEvents(): + b.Print(e) + } } - - return true } -// Kinds returns a list of all printer kinds configured in the broadcast. -// -// Each kind corresponds to a specific printer configuration. -func (b *Broadcast) Kinds() []string { - kinds := make([]string, 0, len(b.PrinterConfigs)) - - for _, p := range b.PrinterConfigs { - kinds = append(kinds, p.Kind) - } - - return kinds +func (b *Broadcast) Kind() string { + return "broadcast" } -func startPrinter(wg *sync.WaitGroup, done chan struct{}, c chan trace.Event, p EventPrinter) { - for { - select { - case <-done: - wg.Done() - return - case event := <-c: - p.Print(event) - } +// Close closes Broadcast printer +func (b *Broadcast) Close() { + for _, p := range b.printers { + p.Close() } } diff --git a/pkg/cmd/printer/printer.go b/pkg/cmd/printer/printer.go index c65c678e9db3..68e7f09d2aab 100644 --- a/pkg/cmd/printer/printer.go +++ b/pkg/cmd/printer/printer.go @@ -2,6 +2,7 @@ package printer import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -21,6 +22,7 @@ import ( "github.com/aquasecurity/tracee/common/logger" "github.com/aquasecurity/tracee/pkg/config" "github.com/aquasecurity/tracee/pkg/metrics" + "github.com/aquasecurity/tracee/pkg/streams" "github.com/aquasecurity/tracee/types/trace" ) @@ -33,55 +35,78 @@ type EventPrinter interface { Epilogue(stats metrics.Stats) // Print prints a single event Print(event trace.Event) + // Receive events from stram + FromStream(ctx context.Context, stream *streams.Stream) + // Mainly created for testing purposes. Might be useful also in other context + Kind() string // dispose of resources Close() } -func New(cfg config.PrinterConfig) (EventPrinter, error) { +func New(destinations []config.Destination) (EventPrinter, error) { + if len(destinations) == 0 { + return nil, errfmt.Errorf("destinations can't be empty") + } + + if len(destinations) > 1 { + return newBroadcast(destinations) + } + + return newSinglePrinter(destinations[0]) +} + +func newSinglePrinter(dst config.Destination) (EventPrinter, error) { var res EventPrinter - kind := cfg.Kind + kind := dst.Type + format := dst.Format - if cfg.OutFile == nil { + if dst.Type == "file" && dst.File == nil { return res, errfmt.Errorf("out file is not set") } switch { case kind == "ignore": res = &ignoreEventPrinter{} - case kind == "table": - res = &tableEventPrinter{ - out: cfg.OutFile, - verbose: false, - containerMode: cfg.ContainerMode, - } - case kind == "table-verbose": - res = &tableEventPrinter{ - out: cfg.OutFile, - verbose: true, - containerMode: cfg.ContainerMode, - } - case kind == "json": - res = &jsonEventPrinter{ - out: cfg.OutFile, + case kind == "file": + switch { + case format == "table": + res = &tableEventPrinter{ + out: dst.File, + verbose: false, + containerMode: dst.ContainerMode, + } + case format == "table-verbose": + res = &tableEventPrinter{ + out: dst.File, + verbose: true, + containerMode: dst.ContainerMode, + } + case format == "json": + res = &jsonEventPrinter{ + out: dst.File, + } + case strings.HasPrefix(format, "gotemplate="): + res = &templateEventPrinter{ + out: dst.File, + templatePath: strings.Split(format, "=")[1], + } } case kind == "forward": res = &forwardEventPrinter{ - outPath: cfg.OutPath, + outPath: dst.Url, } case kind == "webhook": res = &webhookEventPrinter{ - outPath: cfg.OutPath, - } - case strings.HasPrefix(kind, "gotemplate="): - res = &templateEventPrinter{ - out: cfg.OutFile, - templatePath: strings.Split(kind, "=")[1], + outPath: dst.Url, + format: dst.Format, } } + err := res.Init() if err != nil { return nil, err } + return res, nil } @@ -92,7 +117,9 @@ type tableEventPrinter struct { relativeTS bool } -func (p tableEventPrinter) Init() error { return nil } +func (p tableEventPrinter) Init() error { + return nil +} func (p tableEventPrinter) Preamble() { if p.verbose { @@ -349,6 +376,23 @@ func (p tableEventPrinter) Epilogue(stats metrics.Stats) { fmt.Fprintf(p.out, "%s\n", string(jsonStats)) } +func (p *tableEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) { + eventChan := stream.ReceiveEvents() + + for { + select { + case <-ctx.Done(): + return + case e := <-eventChan: + p.Print(e) + } + } +} + +func (p *tableEventPrinter) Kind() string { + return "table" +} + func (p tableEventPrinter) Close() { // Sync flushes buffered data, ensuring events aren't lost on process exit if f, ok := p.out.(*os.File); ok { @@ -393,6 +437,23 @@ func (p templateEventPrinter) Print(event trace.Event) { func (p templateEventPrinter) Epilogue(stats metrics.Stats) {} +func (p *templateEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) { + eventChan := stream.ReceiveEvents() + + for { + select { + case <-ctx.Done(): + return + case e := <-eventChan: + p.Print(e) + } + } +} + +func (p *templateEventPrinter) Kind() string { + return "template" +} + func (p templateEventPrinter) Close() { // Sync flushes buffered data, ensuring events aren't lost on process exit if f, ok := p.out.(*os.File); ok { @@ -404,7 +465,9 @@ type jsonEventPrinter struct { out io.WriteCloser } -func (p jsonEventPrinter) Init() error { return nil } +func (p jsonEventPrinter) Init() error { + return nil +} func (p jsonEventPrinter) Preamble() {} @@ -418,6 +481,23 @@ func (p jsonEventPrinter) Print(event trace.Event) { func (p jsonEventPrinter) Epilogue(stats metrics.Stats) {} +func (p jsonEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) { + eventChan := stream.ReceiveEvents() + + for { + select { + case <-ctx.Done(): + return + case e := <-eventChan: + p.Print(e) + } + } +} + +func (p *jsonEventPrinter) Kind() string { + return "json" +} + func (p jsonEventPrinter) Close() { // Sync flushes buffered data, ensuring events aren't lost on process exit if f, ok := p.out.(*os.File); ok { @@ -438,6 +518,12 @@ func (p *ignoreEventPrinter) Print(event trace.Event) {} func (p *ignoreEventPrinter) Epilogue(stats metrics.Stats) {} +func (p *ignoreEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) {} + +func (p *ignoreEventPrinter) Kind() string { + return "ignore" +} + func (p ignoreEventPrinter) Close() {} // forwardEventPrinter sends events over the Fluent Forward protocol to a receiver @@ -530,6 +616,7 @@ func (p *forwardEventPrinter) Init() error { // The destination may not be available but may appear later so do not return an error here and just connect later. logger.Errorw("Error connecting to Forward destination", "url", p.url.String(), "error", err) } + return nil } @@ -573,6 +660,23 @@ func (p *forwardEventPrinter) Print(event trace.Event) { func (p *forwardEventPrinter) Epilogue(stats metrics.Stats) {} +func (p *forwardEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) { + eventChan := stream.ReceiveEvents() + + for { + select { + case <-ctx.Done(): + return + case e := <-eventChan: + p.Print(e) + } + } +} + +func (p *forwardEventPrinter) Kind() string { + return "forward" +} + func (p forwardEventPrinter) Close() { if p.client != nil { logger.Infow("Disconnecting from Forward destination", "url", p.url.Host, "tag", p.tag) @@ -586,6 +690,7 @@ type webhookEventPrinter struct { outPath string url *url.URL timeout time.Duration + format string templateObj *template.Template contentType string } @@ -606,8 +711,7 @@ func (ws *webhookEventPrinter) Init() error { } ws.timeout = t - gotemplate := getParameterValue(parameters, "gotemplate", "") - if gotemplate != "" { + if gotemplate, ok := strings.CutPrefix(ws.format, "gotemplate="); ok { tmpl, err := template.New(filepath.Base(gotemplate)). Funcs(sprig.TxtFuncMap()). ParseFiles(gotemplate) @@ -672,5 +776,21 @@ func (ws *webhookEventPrinter) Print(event trace.Event) { func (ws *webhookEventPrinter) Epilogue(stats metrics.Stats) {} -func (ws *webhookEventPrinter) Close() { +func (ws *webhookEventPrinter) FromStream(ctx context.Context, stream *streams.Stream) { + eventChan := stream.ReceiveEvents() + + for { + select { + case <-ctx.Done(): + return + case e := <-eventChan: + ws.Print(e) + } + } } + +func (ws *webhookEventPrinter) Kind() string { + return "webhook" +} + +func (ws *webhookEventPrinter) Close() {} diff --git a/pkg/cmd/printer/printer_test.go b/pkg/cmd/printer/printer_test.go index 832fd3804ffe..0fa436c59473 100644 --- a/pkg/cmd/printer/printer_test.go +++ b/pkg/cmd/printer/printer_test.go @@ -2,9 +2,12 @@ package printer_test import ( "bytes" + "context" "os" + "path" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,6 +15,9 @@ import ( "github.com/aquasecurity/tracee/pkg/cmd/flags" "github.com/aquasecurity/tracee/pkg/cmd/printer" "github.com/aquasecurity/tracee/pkg/config" + "github.com/aquasecurity/tracee/pkg/events" + "github.com/aquasecurity/tracee/pkg/policy" + "github.com/aquasecurity/tracee/pkg/streams" "github.com/aquasecurity/tracee/types/trace" ) @@ -38,13 +44,14 @@ func TestTemplateEventPrinterSprigFunctions(t *testing.T) { buf := &bufferWriteCloser{Buffer: &bytes.Buffer{}} // Create printer config - cfg := config.PrinterConfig{ - Kind: "gotemplate=" + templatePath, - OutFile: buf, + cfg := config.Destination{ + Type: "file", + Format: "gotemplate=" + templatePath, + File: buf, } // Create and initialize the printer - p, err := printer.New(cfg) + p, err := printer.New([]config.Destination{cfg}) require.NoError(t, err) err = p.Init() @@ -85,16 +92,19 @@ func TestPrinterCloseFlushesData(t *testing.T) { t.Parallel() testCases := []struct { - name string - printerKind string + name string + format string + typ string }{ { - name: "json printer", - printerKind: "json", + name: "json printer", + format: "json", + typ: "file", }, { - name: "table printer", - printerKind: "table", + name: "table printer", + format: "table", + typ: "file", }, } @@ -113,13 +123,14 @@ func TestPrinterCloseFlushesData(t *testing.T) { defer file.Close() // We close it since we created it // Create printer config - cfg := config.PrinterConfig{ - Kind: tc.printerKind, - OutFile: file, + cfg := config.Destination{ + Type: tc.typ, + Format: tc.format, + File: file, } // Create and initialize the printer - p, err := printer.New(cfg) + p, err := printer.New([]config.Destination{cfg}) require.NoError(t, err) // Create a sample event @@ -168,13 +179,14 @@ func TestTemplateEventPrinterCloseFlushesData(t *testing.T) { defer file.Close() // We close it since we created it // Create printer config - cfg := config.PrinterConfig{ - Kind: "gotemplate=" + templatePath, - OutFile: file, + cfg := config.Destination{ + Type: "file", + Format: "gotemplate=" + templatePath, + File: file, } // Create and initialize the printer - p, err := printer.New(cfg) + p, err := printer.New([]config.Destination{cfg}) require.NoError(t, err) // Create a sample event @@ -197,3 +209,206 @@ func TestTemplateEventPrinterCloseFlushesData(t *testing.T) { assert.Contains(t, string(content), "test_process", "File should contain event data") assert.Contains(t, string(content), "test_event", "File should contain event name") } + +func TestPrinterFromStream(t *testing.T) { + t.Parallel() + + sm := streams.NewStreamsManager() + stream := sm.Subscribe(policy.PolicyAll, map[events.ID]struct{}{}, config.StreamBuffer{}) + outPath := path.Join(t.TempDir(), "file1") + + file, err := flags.CreateOutputFile(outPath) + require.NoError(t, err) + defer file.Close() + + destination := config.Destination{ + Type: "file", + Format: "json", + Path: outPath, + File: file, + } + + p, err := printer.New([]config.Destination{destination}) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + + go func() { + p.FromStream(ctx, stream) + }() + + sm.Publish(t.Context(), trace.Event{ + ProcessName: "process_from_stream", + EventName: "event_from_stream", + MatchedPoliciesUser: policy.PolicyAll, + }) + + time.Sleep(time.Millisecond * 10) + + cancel() + p.Close() + sm.Close() + + content, err := os.ReadFile(outPath) + require.NoError(t, err) + + assert.NotEmpty(t, content, "file must not be empty") + assert.Contains(t, string(content), "process_from_stream", "file must contain the pushed process name") + assert.Contains(t, string(content), "event_from_stream", "file must contain the pushed event name") +} + +func TestPrinterCreation(t *testing.T) { + t.Parallel() + + templateFilePath := path.Join(t.TempDir(), "template1.tpl") + file, err := flags.CreateOutputFile(templateFilePath) + require.NoError(t, err) + file.Close() // close it immediately, we don't need to use it here + + testCases := []struct { + testName string + destinations []config.Destination + expectedKind string + expectedError string + }{ + { + testName: "table_printer_creation_error_no_file", + destinations: []config.Destination{ + { + Type: "file", + Format: "table", + Path: "stdout", + }, + }, + expectedError: "out file is not set", + }, + { + testName: "table_printer_creation", + destinations: []config.Destination{ + { + Type: "file", + Format: "table", + Path: "stdout", + File: os.Stdout, + }, + }, + expectedKind: "table", + }, + { + testName: "table_printer_creation", + destinations: []config.Destination{ + { + Type: "file", + Format: "table-verbose", + Path: "stdout", + File: os.Stdout, + }, + }, + expectedKind: "table", + }, + { + testName: "json_printer_creation", + destinations: []config.Destination{ + { + Type: "file", + Format: "json", + Path: "stdout", + File: os.Stdout, + }, + }, + expectedKind: "json", + }, + { + testName: "template_printer_creation", + destinations: []config.Destination{ + { + Type: "file", + Format: "gotemplate=" + templateFilePath, + Path: "stdout", + File: os.Stdout, + }, + }, + expectedKind: "template", + }, + { + testName: "webhook_json_printer_creation", + destinations: []config.Destination{ + { + Type: "webhook", + Format: "json", + Url: "http://1.1.1.1/webhook", + }, + }, + expectedKind: "webhook", + }, + { + testName: "webhook_template_printer_creation", + destinations: []config.Destination{ + { + Type: "webhook", + Format: "gotemplate=" + templateFilePath, + Url: "http://1.1.1.1/webhook", + }, + }, + expectedKind: "webhook", + }, + { + testName: "forward_printer_creation", + destinations: []config.Destination{ + { + Type: "forward", + Format: "json", + Url: "udp://1.1.1.1/fluent", + }, + }, + expectedKind: "forward", + }, + { + testName: "ignore_printer_creation", + destinations: []config.Destination{ + { + Type: "ignore", + }, + }, + expectedKind: "ignore", + }, + { + testName: "broadcast_printer_creation", + destinations: []config.Destination{ + { + Type: "forward", + Format: "json", + Url: "tcp://1.1.1.1/fluent", + }, + { + Type: "webhook", + Format: "json", + Url: "http://1.1.1.1/fluent", + }, + }, + expectedKind: "broadcast", + }, + { + testName: "broadcast_printer_creation_empty_destination_error", + destinations: []config.Destination{}, + expectedError: "destinations can't be empty", + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + t.Parallel() + + printer, err := printer.New(tc.destinations) + if tc.expectedError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedError) + + return + } + require.NoError(t, err) + assert.Equal(t, tc.expectedKind, printer.Kind()) + }) + } + +} diff --git a/pkg/cmd/tracee.go b/pkg/cmd/tracee.go index 811fcc53f14c..37c827be6613 100644 --- a/pkg/cmd/tracee.go +++ b/pkg/cmd/tracee.go @@ -14,11 +14,11 @@ import ( tracee "github.com/aquasecurity/tracee/pkg/ebpf" "github.com/aquasecurity/tracee/pkg/server/grpc" "github.com/aquasecurity/tracee/pkg/server/http" + "github.com/aquasecurity/tracee/pkg/streams" ) type Runner struct { TraceeConfig config.Config - Printer *printer.Broadcast InstallPath string HTTP *http.Server GRPC *grpc.Server @@ -89,7 +89,7 @@ func (r Runner) Run(ctx context.Context) error { // Run Tracee - if r.Printer.Active() { + if r.shouldRunWithPrinter() { // Run Tracee with event subscription and printing. return r.runWithPrinter(ctx, t) // blocks until ctx is done } @@ -98,41 +98,75 @@ func (r Runner) Run(ctx context.Context) error { return t.Run(ctx) // blocks until ctx is done } +// shouldRunWithPrinter returns true only if there is at least one +// stream with a destination which is not "ignore" +func (r Runner) shouldRunWithPrinter() bool { + streamConfigs := r.TraceeConfig.Output.Streams + if len(streamConfigs) == 0 { + return false + } + + // It should never happen + if len(streamConfigs) == 1 && len(streamConfigs[0].Destinations) == 0 { + return false + } + + // If the only stream existing has a single destination which is + // ignore we ignore it and do not even jump to r.runWithPrinter() + if len(streamConfigs) == 1 && len(streamConfigs[0].Destinations) == 1 && + streamConfigs[0].Destinations[0].Type == "ignore" { + return false + } + + return true +} + // runWithPrinter runs Tracee with event subscription and printing enabled. // // It wraps Tracee's Run method to handle event subscription and printing, and ensures // that any remaining events are drained when the context is cancelled. // -// NOTE: This should only be called if a printer is active. +// NOTE: This should only be called if at least a stream with a destination exists. func (r Runner) runWithPrinter(ctx context.Context, t *tracee.Tracee) error { - stream := t.SubscribeAll() - defer t.Unsubscribe(stream) - - r.Printer.Preamble() - - // Start goroutine to print incoming events - go func() { - for { - select { - case event := <-stream.ReceiveEvents(): - r.Printer.Print(event) - case <-ctx.Done(): - return - } + streamList := make([]*streams.Stream, 0) + printers := []printer.EventPrinter{} + + for _, s := range r.TraceeConfig.Output.Streams { + var p printer.EventPrinter + var err error + + p, err = printer.New(s.Destinations) + if err != nil { + return err } - }() + printers = append(printers, p) + + var stream *streams.Stream + stream, err = t.Subscribe(s) + if err != nil { + return err + } + + go func() { + // blocks + p.FromStream(ctx, stream) + }() + + streamList = append(streamList, stream) + } // Blocks until ctx is done err := t.Run(ctx) - // Drain remaining channel events (sent during shutdown) - for event := range stream.ReceiveEvents() { - r.Printer.Print(event) + for _, s := range streamList { + t.Unsubscribe(s) } stats := t.Stats() - r.Printer.Epilogue(*stats) - r.Printer.Close() + for _, p := range printers { + p.Epilogue(*stats) + p.Close() + } return err } diff --git a/pkg/config/config.go b/pkg/config/config.go index 4930bf1241a2..a7f6bbf63849 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -69,6 +69,13 @@ func (c Config) Validate() error { } } + // Streams + for _, s := range c.Output.Streams { + if len(s.Destinations) == 0 { + return errfmt.Errorf("each stream must have at least 1 destination %s", s.Name) + } + } + // BPF if c.BPFObjBytes == nil { return errfmt.Errorf("nil bpf object in memory") @@ -149,6 +156,8 @@ type OutputConfig struct { ParseArguments bool ParseArgumentsFDs bool EventsSorting bool + + Streams []Stream } type ContainerMode int @@ -159,9 +168,36 @@ const ( ContainerModeEnriched ) -type PrinterConfig struct { - Kind string - OutPath string - OutFile io.WriteCloser +type Destination struct { + Name string + Type string + Format string + Path string + Url string + File io.WriteCloser ContainerMode ContainerMode } + +type StreamBufferMode string + +const ( + StreamBufferBlock StreamBufferMode = "block" + StreamBufferDrop StreamBufferMode = "drop" +) + +type StreamFilters struct { + Policies []string + Events []string +} + +type StreamBuffer struct { + Size int + Mode StreamBufferMode +} + +type Stream struct { + Name string + Destinations []Destination + Filters StreamFilters + Buffer StreamBuffer +} diff --git a/pkg/ebpf/tracee.go b/pkg/ebpf/tracee.go index 3ed73967600c..1cde385ed175 100644 --- a/pkg/ebpf/tracee.go +++ b/pkg/ebpf/tracee.go @@ -2097,30 +2097,44 @@ func (t *Tracee) ready(ctx gocontext.Context) { } } -// SubscribeAll returns a stream subscribed to all policies -func (t *Tracee) SubscribeAll() *streams.Stream { - return t.subscribe(policy.PolicyAll) -} - // Subscribe returns a stream subscribed to selected policies -func (t *Tracee) Subscribe(policyNames []string) (*streams.Stream, error) { - var policyMask uint64 +func (t *Tracee) Subscribe(stream config.Stream) (*streams.Stream, error) { + policyMask := policy.PolicyAll + eventMap := map[events.ID]struct{}{} - for _, policyName := range policyNames { - p, err := t.policyManager.LookupByName(policyName) - if err != nil { - return nil, err + if len(stream.Filters.Policies) > 0 { + policyMask = 0 + + for _, policyName := range stream.Filters.Policies { + p, err := t.policyManager.LookupByName(policyName) + if err != nil { + return nil, err + } + bitwise.SetBit(&policyMask, uint(p.ID)) } - bitwise.SetBit(&policyMask, uint(p.ID)) } - return t.subscribe(policyMask), nil + if len(stream.Filters.Events) > 0 { + for _, eventName := range stream.Filters.Events { + id, found := events.Core.GetDefinitionIDByName(eventName) + if !found { + return nil, errfmt.Errorf("error event not found: %s", eventName) + } + + eventMap[id] = struct{}{} + } + } + + return t.subscribe(policyMask, eventMap, stream.Buffer), nil } -func (t *Tracee) subscribe(policyMask uint64) *streams.Stream { - // TODO: the channel size matches the pipeline channel size, - // but we should make it configurable in the future. - return t.streamsManager.Subscribe(policyMask, t.config.PipelineChannelSize) +func (t *Tracee) subscribe(policyMask uint64, eventMap map[events.ID]struct{}, bufferConfig config.StreamBuffer) *streams.Stream { + // To keep old behavior in case of streams created from GRPC server + if bufferConfig.Size <= 0 { + bufferConfig.Size = t.config.PipelineChannelSize + } + + return t.streamsManager.Subscribe(policyMask, eventMap, bufferConfig) } // Unsubscribe unsubscribes stream diff --git a/pkg/server/grpc/tracee.go b/pkg/server/grpc/tracee.go index eeec12b79e3c..59dec6faf467 100644 --- a/pkg/server/grpc/tracee.go +++ b/pkg/server/grpc/tracee.go @@ -13,6 +13,7 @@ import ( pb "github.com/aquasecurity/tracee/api/v1beta1" "github.com/aquasecurity/tracee/common/errfmt" "github.com/aquasecurity/tracee/common/logger" + "github.com/aquasecurity/tracee/pkg/config" tracee "github.com/aquasecurity/tracee/pkg/ebpf" "github.com/aquasecurity/tracee/pkg/events" "github.com/aquasecurity/tracee/pkg/streams" @@ -604,14 +605,17 @@ func (s *TraceeService) StreamEvents(in *pb.StreamEventsRequest, grpcStream pb.T var stream *streams.Stream var err error - if len(in.Policies) == 0 { - stream = s.tracee.SubscribeAll() - } else { - stream, err = s.tracee.Subscribe(in.Policies) - if err != nil { - return err - } + streamConfig := config.Stream{ + Filters: config.StreamFilters{ + Policies: in.Policies, + }, } + + stream, err = s.tracee.Subscribe(streamConfig) + if err != nil { + return err + } + defer s.tracee.Unsubscribe(stream) mask := fmutils.NestedMaskFromPaths(in.GetMask().GetPaths()) diff --git a/pkg/streams/streams.go b/pkg/streams/streams.go index fd6ae53c08f0..503fe0967588 100644 --- a/pkg/streams/streams.go +++ b/pkg/streams/streams.go @@ -4,6 +4,9 @@ import ( "context" "sync" + "github.com/aquasecurity/tracee/common/logger" + "github.com/aquasecurity/tracee/pkg/config" + "github.com/aquasecurity/tracee/pkg/events" "github.com/aquasecurity/tracee/types/trace" ) @@ -11,8 +14,13 @@ import ( type Stream struct { // policy mask is a bitmap of policies that this stream is interested in policyMask uint64 + // event to filter + eventMap map[events.ID]struct{} + // true if there is at least one element in the eventMap + eventFilter bool // events is a channel that is used to receive events from the stream - events chan trace.Event + events chan trace.Event + strategyPush func(context.Context, trace.Event) } // ReceiveEvents returns a read-only channel for receiving events from the stream @@ -20,21 +28,24 @@ func (s *Stream) ReceiveEvents() <-chan trace.Event { return s.events } -// Publish publishes an event to the stream, -// but first check if this stream is interested in this event, -// by checking the event's policy mask against the stream's policy mask. func (s *Stream) publish(ctx context.Context, event trace.Event) { if s.shouldIgnorePolicy(event) { return } - // Currently, the behavior is to block when the channel is full. - // However, there is a consideration to modify this behavior to drop events instead. - // This change is based on the notion that with multiple streams, one stream's events - // should not impede another stream's event reception if they aren't processed rapidly. - // It is worth noting that there are scenarios where blocking may be preferred. - // To accommodate both scenarios, the plan is to introduce configurability for this behavior in the future. - // TODO: allow this to be configurable (drop/block) (josedonizetti) + if s.eventFilter { + if _, ok := s.eventMap[events.ID(event.EventID)]; !ok { + return + } + } + + // Due to the dynamic nature of this function the compile doesn't + // inline this. A condition is faster (and cheaper) than a function call + // should we change it with a condition? + s.strategyPush(ctx, event) +} + +func (s *Stream) blockPublish(ctx context.Context, event trace.Event) { select { case s.events <- event: case <-ctx.Done(): @@ -42,6 +53,17 @@ func (s *Stream) publish(ctx context.Context, event trace.Event) { } } +func (s *Stream) dropPublish(ctx context.Context, event trace.Event) { + select { + case s.events <- event: + case <-ctx.Done(): + return + default: + // Probably this is going to be too verbose + logger.Debugw("stream channel full, dropping message") + } +} + // shouldIgnorePolicy checks if the stream should ignore the event func (s *Stream) shouldIgnorePolicy(event trace.Event) bool { return s.policyMask&event.MatchedPoliciesUser == 0 @@ -67,13 +89,22 @@ func NewStreamsManager() *StreamsManager { } // Subscribe adds a stream to the manager -func (sm *StreamsManager) Subscribe(policyMask uint64, chanSize int) *Stream { +func (sm *StreamsManager) Subscribe(policyMask uint64, eventMap map[events.ID]struct{}, bufferConfig config.StreamBuffer) *Stream { sm.mutex.Lock() defer sm.mutex.Unlock() stream := &Stream{ - policyMask: policyMask, - events: make(chan trace.Event, chanSize), + policyMask: policyMask, + events: make(chan trace.Event, bufferConfig.Size), + eventMap: eventMap, + eventFilter: len(eventMap) > 0, + } + + switch bufferConfig.Mode { + case "", config.StreamBufferBlock: + stream.strategyPush = stream.blockPublish + case config.StreamBufferDrop: + stream.strategyPush = stream.dropPublish } sm.subscribers[stream] = struct{}{} diff --git a/pkg/streams/streams_test.go b/pkg/streams/streams_test.go index f507155982f4..bcf3e7af4ded 100644 --- a/pkg/streams/streams_test.go +++ b/pkg/streams/streams_test.go @@ -7,6 +7,8 @@ import ( "gotest.tools/assert" + "github.com/aquasecurity/tracee/pkg/config" + "github.com/aquasecurity/tracee/pkg/events" "github.com/aquasecurity/tracee/types/trace" ) @@ -37,13 +39,13 @@ func TestStreamManager(t *testing.T) { sm := NewStreamsManager() // stream for policy1 - stream1 := sm.Subscribe(policy1Mask, 0) + stream1 := sm.Subscribe(policy1Mask, map[events.ID]struct{}{}, config.StreamBuffer{}) // stream for policy1 and policy2 - stream2 := sm.Subscribe(policy1And2Mask, 0) + stream2 := sm.Subscribe(policy1And2Mask, map[events.ID]struct{}{}, config.StreamBuffer{}) // stream for all policies - stream3 := sm.Subscribe(allPoliciesMask, 0) + stream3 := sm.Subscribe(allPoliciesMask, map[events.ID]struct{}{}, config.StreamBuffer{}) // consumers consumersWG := &sync.WaitGroup{} @@ -159,7 +161,7 @@ func Test_shouldIgnorePolicy(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - stream := sm.Subscribe(tt.policyMask, 0) + stream := sm.Subscribe(tt.policyMask, map[events.ID]struct{}{}, config.StreamBuffer{}) assert.Equal(t, tt.expected, stream.shouldIgnorePolicy(tt.event)) }) } diff --git a/tests/compatibility/compatibility_test.go b/tests/compatibility/compatibility_test.go index 41af4e102ba1..e7295781427b 100644 --- a/tests/compatibility/compatibility_test.go +++ b/tests/compatibility/compatibility_test.go @@ -79,7 +79,8 @@ func TestCompatibility(t *testing.T) { debugProbeAttachments(t, traceeInstance) debugEventDependencies(t, traceeInstance) - eventStream := traceeInstance.SubscribeAll() + eventStream, err := traceeInstance.Subscribe(config.Stream{}) + require.NoError(t, err) defer traceeInstance.Unsubscribe(eventStream) go func() { diff --git a/tests/integration/container_events_test.go b/tests/integration/container_events_test.go index c0d20fe55280..7cc8d0f07837 100644 --- a/tests/integration/container_events_test.go +++ b/tests/integration/container_events_test.go @@ -71,7 +71,8 @@ func Test_ContainerCreateRemove(t *testing.T) { require.NoError(t, err, "Tracee failed to start") // Subscribe to events - eventStream := traceeInstance.SubscribeAll() + eventStream, err := traceeInstance.Subscribe(config.Stream{}) + require.NoError(t, err) defer traceeInstance.Unsubscribe(eventStream) // Start event collection goroutine @@ -259,9 +260,9 @@ func Test_ExistingContainers(t *testing.T) { // Subscribe to events BEFORE waiting for Tracee to start // (ExistingContainer events are emitted during Tracee's initialization) - eventStream := traceeInstance.SubscribeAll() + eventStream, err := traceeInstance.Subscribe(config.Stream{}) + require.NoError(t, err) defer traceeInstance.Unsubscribe(eventStream) - // Start event collection goroutine go func() { for { diff --git a/tests/integration/dependencies_test.go b/tests/integration/dependencies_test.go index 2f46f7e08af8..bd6f73969aa5 100644 --- a/tests/integration/dependencies_test.go +++ b/tests/integration/dependencies_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.uber.org/goleak" "github.com/aquasecurity/tracee/common/environment" @@ -280,7 +281,8 @@ func Test_EventsDependencies(t *testing.T) { t.Fatal(err) } - stream := trc.SubscribeAll() + stream, err := trc.Subscribe(config.Stream{}) + require.NoError(t, err) defer trc.Unsubscribe(stream) // start a goroutine to read events from the channel into the buffer diff --git a/tests/integration/event_filters_test.go b/tests/integration/event_filters_test.go index c5cc567fc870..5c513ad62d1c 100644 --- a/tests/integration/event_filters_test.go +++ b/tests/integration/event_filters_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" "github.com/aquasecurity/tracee/common/bitwise" @@ -2311,8 +2312,8 @@ func Test_EventFilters(t *testing.T) { // wait for the previous test to cool down coolDown(t, tc.coolDown) - // prepare tracee config - config := config.Config{ + // prepare tracee traceeConfig + traceeConfig := config.Config{ Capabilities: &config.CapabilitiesConfig{ BypassCaps: true, }, @@ -2323,7 +2324,7 @@ func Test_EventFilters(t *testing.T) { for _, p := range ps { initialPolicies = append(initialPolicies, p) } - config.InitialPolicies = initialPolicies + traceeConfig.InitialPolicies = initialPolicies traceeTimeout := 60 * time.Second ctx, cancel := context.WithTimeout(context.Background(), traceeTimeout) @@ -2339,7 +2340,7 @@ func Test_EventFilters(t *testing.T) { }() // start tracee - trc, err := testutils.StartTracee(ctx, t, config, nil, nil) + trc, err := testutils.StartTracee(ctx, t, traceeConfig, nil, nil) if err != nil { t.Fatal(err) } @@ -2350,7 +2351,8 @@ func Test_EventFilters(t *testing.T) { t.Fatal(err) } - stream := trc.SubscribeAll() + stream, err := trc.Subscribe(config.Stream{}) + require.NoError(t, err) defer trc.Unsubscribe(stream) // start a goroutine to read events from the channel into the buffer