Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion receiver/podmanreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ on a configured interval. These stats are for container
resource usage of cpu, memory, network, and the
[blkio controller](https://www.kernel.org/doc/Documentation/cgroup-v1/blkio-controller.txt).

Supported pipeline types: metrics
This receiver also queries the Podman service API to fetch life cycle events for all running containers and pods.
These events shows that containers/pods are running/died/started etc.

Supported pipeline types: metrics, logs

> :information_source: Requires Podman API version 3.3.1+ and Windows is not supported.

Expand All @@ -20,13 +23,16 @@ The following settings are optional:

- `collection_interval` (default = `10s`): The interval at which to gather container stats.

- `max_retries` (default = `10`): A number of retry to gather container life cycle events.

Example:

```yaml
receivers:
podman_stats:
endpoint: unix://run/podman/podman.sock
collection_interval: 10s
max_retries: 10
```

The full list of settings exposed for this receiver are documented [here](./config.go)
Expand Down
1 change: 1 addition & 0 deletions receiver/podmanreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
APIVersion string `mapstructure:"api_version"`
SSHKey string `mapstructure:"ssh_key"`
SSHPassphrase string `mapstructure:"ssh_passphrase"`
MaxRetries string `mapstructure:"max_retries"`
}

func (config Config) Validate() error {
Expand Down
47 changes: 43 additions & 4 deletions receiver/podmanreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (
const (
typeStr = "podman_stats"
defaultAPIVersion = "3.3.1"
defaultMaxRetries = "10"
)

func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultReceiverConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
receiverhelper.WithMetrics(createMetricsReceiver),
receiverhelper.WithLogs(createLogsReceiver))
}

func createDefaultConfig() *Config {
Expand All @@ -45,24 +47,61 @@ func createDefaultConfig() *Config {
},
Endpoint: "unix:///run/podman/podman.sock",
APIVersion: defaultAPIVersion,
MaxRetries: defaultMaxRetries,
}
}

func createDefaultReceiverConfig() config.Receiver {
return createDefaultConfig()
}

func createReceiver(
ctx context.Context,
params component.ReceiverCreateSettings,
config config.Receiver,
) (*receiver, error) {
podmanConfig := config.(*Config)
var err error
r := receivers[podmanConfig]
if r == nil {
r, err = newReceiver(ctx, params, podmanConfig, nil)
if err != nil {
return nil, err
}
receivers[podmanConfig] = r
}
return r, err
}

func createMetricsReceiver(
ctx context.Context,
params component.ReceiverCreateSettings,
config config.Receiver,
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {
podmanConfig := config.(*Config)
dsr, err := newReceiver(ctx, params, podmanConfig, consumer, nil)
r, err := createReceiver(ctx, params, config)
if err != nil {
return nil, err
}
err = r.registerMetricsConsumer(consumer, params)
if err != nil {
return nil, err
}
return r, nil
}

return dsr, nil
func createLogsReceiver(
ctx context.Context,
params component.ReceiverCreateSettings,
config config.Receiver,
consumer consumer.Logs,
) (component.LogsReceiver, error) {
r, err := createReceiver(ctx, params, config)
if err != nil {
return nil, err
}
r.registerLogsConsumer(consumer)
return r, nil
}

var receivers = map[*Config]*receiver{}
1 change: 1 addition & 0 deletions receiver/podmanreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podman
go 1.17

require (
github.com/cenkalti/backoff/v4 v4.1.2
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.40.1-0.20211202221455-42566a660aac
go.opentelemetry.io/collector/model v0.40.1-0.20211202221455-42566a660aac
Expand Down
125 changes: 125 additions & 0 deletions receiver/podmanreceiver/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows
// +build !windows

package podmanreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver"

import (
"errors"
"sort"

"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

func traslateEventsToLogs(logger *zap.Logger, event event) (pdata.Logs, error) {
ld := pdata.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
ill := rl.InstrumentationLibraryLogs().AppendEmpty()

logRecord := ill.Logs().AppendEmpty()
logRecord.SetName(event.Action)
logRecord.SetTimestamp(pdata.Timestamp(event.TimeNano))
logRecord.Attributes().InsertString("contianer.id", event.ID)

body, err := convertInterfaceToAttributeValue(logger, "podman "+event.Type+"("+event.Actor.Attributes["name"]+") "+event.Action)
if err != nil {
return pdata.NewLogs(), err
}
body.CopyTo(logRecord.Body())

keys := make([]string, 0, len(event.Actor.Attributes))
for k := range event.Actor.Attributes {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
val := event.Actor.Attributes[key]
attrValue, err := convertInterfaceToAttributeValue(logger, val)
if err != nil {
return pdata.NewLogs(), err
}
if key == "image" {
logRecord.Attributes().Insert("container.image.name", attrValue)
}
if key == "name" {
logRecord.Attributes().Insert("container.name", attrValue)
}
}
return ld, nil
}

func convertInterfaceToAttributeValue(logger *zap.Logger, originalValue interface{}) (pdata.AttributeValue, error) {
if originalValue == nil {
return pdata.NewAttributeValueEmpty(), nil
} else if value, ok := originalValue.(string); ok {
return pdata.NewAttributeValueString(value), nil
} else if value, ok := originalValue.(int64); ok {
return pdata.NewAttributeValueInt(value), nil
} else if value, ok := originalValue.(float64); ok {
return pdata.NewAttributeValueDouble(value), nil
} else if value, ok := originalValue.(bool); ok {
return pdata.NewAttributeValueBool(value), nil
} else if value, ok := originalValue.(map[string]interface{}); ok {
mapValue, err := convertToAttributeMap(logger, value)
if err != nil {
return pdata.NewAttributeValueEmpty(), err
}
return mapValue, nil
} else if value, ok := originalValue.([]interface{}); ok {
arrValue, err := convertToSliceVal(logger, value)
if err != nil {
return pdata.NewAttributeValueEmpty(), err
}
return arrValue, nil
} else {
logger.Debug("Unsupported value conversion", zap.Any("value", originalValue))
return pdata.NewAttributeValueEmpty(), errors.New("cannot convert field value to attribute")
}
}

func convertToSliceVal(logger *zap.Logger, value []interface{}) (pdata.AttributeValue, error) {
attrVal := pdata.NewAttributeValueArray()
arr := attrVal.SliceVal()
for _, elt := range value {
translatedElt, err := convertInterfaceToAttributeValue(logger, elt)
if err != nil {
return attrVal, err
}
tgt := arr.AppendEmpty()
translatedElt.CopyTo(tgt)
}
return attrVal, nil
}

func convertToAttributeMap(logger *zap.Logger, value map[string]interface{}) (pdata.AttributeValue, error) {
attrVal := pdata.NewAttributeValueMap()
attrMap := attrVal.MapVal()
keys := make([]string, 0, len(value))
for k := range value {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := value[k]
translatedElt, err := convertInterfaceToAttributeValue(logger, v)
if err != nil {
return attrVal, err
}
attrMap.Insert(k, translatedElt)
}
return attrVal, nil
}
51 changes: 51 additions & 0 deletions receiver/podmanreceiver/podman_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"net/url"
"strconv"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -56,11 +60,31 @@ type containerStatsReport struct {
Error string
Stats []containerStats
}
type actor struct {
ID string
Attributes map[string]string
}

type event struct {
Status string `json:"status,omitempty"`
ID string `json:"id,omitempty"`
From string `json:"from,omitempty"`

Type string
Action string
Actor actor

Scope string `json:"scope,omitempty"`

Time int64 `json:"time,omitempty"`
TimeNano int64 `json:"timeNano,omitempty"`
}

type clientFactory func(logger *zap.Logger, cfg *Config) (client, error)

type client interface {
stats() ([]containerStats, error)
events(*zap.Logger, chan event, chan error) error
}

type podmanClient struct {
Expand Down Expand Up @@ -122,6 +146,33 @@ func (c *podmanClient) stats() ([]containerStats, error) {
return report.Stats, nil
}

func (c *podmanClient) events(logger *zap.Logger, eventChan chan event, errorChan chan error) error {
params := url.Values{}
params.Add("stream", "true")
params.Add("since", "0m")

response, err := c.request(context.Background(), "/events", params)
if err != nil {
return err
}

go func() {
dec := json.NewDecoder(response.Body)

for {
var eventToDecode event
if err = dec.Decode(&eventToDecode); err != nil {
errorChan <- err
errWhileClose := response.Body.Close()
logger.Error("Error while closing the body", zap.Error(errWhileClose))
return
}
eventChan <- eventToDecode
}
}()
return nil
}

func (c *podmanClient) ping() error {
resp, err := c.request(context.Background(), "/_ping", nil)
if err != nil {
Expand Down
Loading