Skip to content

Commit c73615f

Browse files
add v3 version of nats jetstream protocol with integration tests and samples
Signed-off-by: stephen-totty-hpe <[email protected]>
1 parent 6bcc075 commit c73615f

File tree

18 files changed

+2081
-12
lines changed

18 files changed

+2081
-12
lines changed

protocol/nats_jetstream/v3/go.mod

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3
2+
3+
go 1.18
4+
5+
replace github.com/cloudevents/sdk-go/v2 => ../../../v2
6+
7+
require (
8+
github.com/cloudevents/sdk-go/v2 v2.15.2
9+
github.com/nats-io/nats.go v1.37.0
10+
)
11+
12+
require (
13+
github.com/davecgh/go-spew v1.1.1 // indirect
14+
github.com/google/go-cmp v0.5.0 // indirect
15+
github.com/json-iterator/go v1.1.12 // indirect
16+
github.com/klauspost/compress v1.17.9 // indirect
17+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
18+
github.com/modern-go/reflect2 v1.0.2 // indirect
19+
github.com/nats-io/nkeys v0.4.7 // indirect
20+
github.com/nats-io/nuid v1.0.1 // indirect
21+
github.com/pmezard/go-difflib v1.0.0 // indirect
22+
github.com/stretchr/testify v1.8.0 // indirect
23+
golang.org/x/crypto v0.27.0 // indirect
24+
golang.org/x/sys v0.25.0 // indirect
25+
golang.org/x/text v0.18.0 // indirect
26+
gopkg.in/yaml.v3 v3.0.1 // indirect
27+
)

protocol/nats_jetstream/v3/go.sum

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
5+
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
6+
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
7+
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
8+
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
9+
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
10+
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
11+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
12+
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
13+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
14+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
15+
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
16+
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
17+
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
18+
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
19+
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
20+
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
21+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
22+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
23+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
27+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
28+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
29+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
30+
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
31+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
32+
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
33+
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
34+
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
35+
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
36+
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
37+
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
38+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
39+
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
40+
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
41+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
42+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
43+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
44+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
45+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
46+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
47+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
Copyright 2024 The CloudEvents Authors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package nats_jetstream
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"errors"
12+
"fmt"
13+
"strings"
14+
15+
"github.com/nats-io/nats.go/jetstream"
16+
17+
"github.com/cloudevents/sdk-go/v2/binding"
18+
"github.com/cloudevents/sdk-go/v2/binding/format"
19+
"github.com/cloudevents/sdk-go/v2/binding/spec"
20+
)
21+
22+
const (
23+
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md
24+
prefix = "ce-"
25+
contentTypeHeader = "content-type"
26+
)
27+
28+
var (
29+
specs = spec.WithPrefix(prefix)
30+
31+
// ErrNoVersion returned when no version header is found in the protocol header.
32+
ErrNoVersion = errors.New("message does not contain version header")
33+
)
34+
35+
// Message implements binding.Message by wrapping an jetstream.Msg.
36+
// This message *can* be read several times safely
37+
type Message struct {
38+
Msg jetstream.Msg
39+
encoding binding.Encoding
40+
}
41+
42+
// NewMessage wraps an *nats.Msg in a binding.Message.
43+
// The returned message *can* be read several times safely
44+
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.
45+
func NewMessage(msg jetstream.Msg) *Message {
46+
encoding := binding.EncodingStructured
47+
if msg.Headers() != nil {
48+
if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" {
49+
encoding = binding.EncodingBinary
50+
}
51+
}
52+
return &Message{Msg: msg, encoding: encoding}
53+
}
54+
55+
var _ binding.Message = (*Message)(nil)
56+
57+
// ReadEncoding return the type of the message Encoding.
58+
func (m *Message) ReadEncoding() binding.Encoding {
59+
return m.encoding
60+
}
61+
62+
// ReadStructured transfers a structured-mode event to a StructuredWriter.
63+
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
64+
if m.encoding != binding.EncodingStructured {
65+
return binding.ErrNotStructured
66+
}
67+
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data()))
68+
}
69+
70+
// ReadBinary transfers a binary-mode event to an BinaryWriter.
71+
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
72+
if m.encoding != binding.EncodingBinary {
73+
return binding.ErrNotBinary
74+
}
75+
76+
version := m.GetVersion()
77+
if version == nil {
78+
return ErrNoVersion
79+
}
80+
81+
var err error
82+
for k, v := range m.Msg.Headers() {
83+
headerValue := v[0]
84+
if strings.HasPrefix(k, prefix) {
85+
attr := version.Attribute(k)
86+
if attr != nil {
87+
err = encoder.SetAttribute(attr, headerValue)
88+
} else {
89+
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
90+
}
91+
} else if k == contentTypeHeader {
92+
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
93+
}
94+
if err != nil {
95+
return err
96+
}
97+
}
98+
99+
if m.Msg.Data() != nil {
100+
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data()))
101+
}
102+
103+
return err
104+
}
105+
106+
// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
107+
func (m *Message) Finish(err error) error {
108+
return nil
109+
}
110+
111+
// GetAttribute implements binding.MessageMetadataReader
112+
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) {
113+
key := withPrefix(attributeKind.String())
114+
if m.Msg.Headers() != nil {
115+
version := m.GetVersion()
116+
headerValue := m.Msg.Headers().Get(key)
117+
if headerValue != "" {
118+
return version.Attribute(key), headerValue
119+
}
120+
return version.Attribute(key), nil
121+
}
122+
// if the headers are nil, the version is also nil. Therefore return nil.
123+
return nil, nil
124+
}
125+
126+
// GetExtension implements binding.MessageMetadataReader
127+
func (m *Message) GetExtension(name string) interface{} {
128+
key := withPrefix(name)
129+
if m.Msg.Headers() != nil {
130+
headerValue := m.Msg.Headers().Get(key)
131+
if headerValue != "" {
132+
return headerValue
133+
}
134+
}
135+
return nil
136+
}
137+
138+
// GetVersion looks for specVersion header and returns a Version object
139+
func (m *Message) GetVersion() spec.Version {
140+
if m.Msg.Headers() == nil {
141+
return nil
142+
}
143+
versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName())
144+
if versionValue == "" {
145+
return nil
146+
}
147+
return specs.Version(versionValue)
148+
}
149+
150+
// withPrefix prepends the prefix to the attribute name
151+
func withPrefix(attributeName string) string {
152+
return fmt.Sprintf("%s%s", prefix, attributeName)
153+
}

0 commit comments

Comments
 (0)