Skip to content

Commit 2d2d1a7

Browse files
authored
fix duplicate when using self-service (#227)
* fix duplicate when using self-service Signed-off-by: May Zhang <[email protected]> * fix duplicate when using self-service Signed-off-by: May Zhang <[email protected]> * fix tests Signed-off-by: May Zhang <[email protected]> * -s Signed-off-by: May Zhang <[email protected]> * add test cases Signed-off-by: May Zhang <[email protected]> * remove not used method Signed-off-by: May Zhang <[email protected]> --------- Signed-off-by: May Zhang <[email protected]>
1 parent c7e26ba commit 2d2d1a7

File tree

7 files changed

+193
-20
lines changed

7 files changed

+193
-20
lines changed

pkg/api/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Config struct {
3030
// ServiceDefaultTriggers holds list of default triggers per service
3131
ServiceDefaultTriggers map[string][]string
3232
Namespace string
33+
IsSelfServiceConfig bool
3334
}
3435

3536
// Returns list of destinations for the specified trigger

pkg/api/factory.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.S
184184
if err != nil {
185185
return nil, err
186186
}
187+
188+
if cm.Namespace != f.Settings.DefaultNamespace {
189+
cfg.IsSelfServiceConfig = true
190+
}
187191
getVars, err := f.InitGetVars(cfg, cm, secret)
188192
if err != nil {
189193
return nil, err

pkg/controller/controller.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
182182
log.Warn("Controller has stopped.")
183183
}
184184

185+
// check if an api is a self-service API
186+
func (c *notificationController) isSelfServiceConfigureApi(api api.API) bool {
187+
return c.namespaceSupport && api.GetConfig().IsSelfServiceConfig
188+
}
189+
185190
func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
186191
apiNamespace := api.GetConfig().Namespace
187192
notificationsState := NewStateFromRes(resource)
@@ -209,13 +214,13 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource v1
209214

210215
if !cr.Triggered {
211216
for _, to := range destinations {
212-
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
217+
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false)
213218
}
214219
continue
215220
}
216221

217222
for _, to := range destinations {
218-
if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed {
223+
if changed := notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, true); !changed {
219224
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
220225
eventSequence.addDelivered(NotificationDelivery{
221226
Trigger: trigger,
@@ -227,7 +232,7 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource v1
227232
if err := api.Send(un.Object, cr.Templates, to); err != nil {
228233
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
229234
to, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
230-
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
235+
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false)
231236
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false)
232237
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace))
233238
} else {
@@ -320,6 +325,24 @@ func (c *notificationController) processQueueItem() (processNext bool) {
320325
}
321326
for _, api := range apisWithNamespace {
322327
c.processResource(api, resource, logEntry, &eventSequence)
328+
329+
//refresh
330+
obj, exists, err := c.informer.GetIndexer().GetByKey(key.(string))
331+
if err != nil {
332+
log.Errorf("Failed to get resource '%s' from informer index: %+v", key, err)
333+
eventSequence.addError(err)
334+
return
335+
}
336+
if !exists {
337+
// This happens after resource was deleted, but the work queue still had an entry for it.
338+
return
339+
}
340+
resource, ok = obj.(v1.Object)
341+
if !ok {
342+
log.Errorf("Failed to get resource '%s' from informer index: %+v", key, err)
343+
eventSequence.addError(err)
344+
return
345+
}
323346
}
324347
}
325348
logEntry.Info("Processing completed")

pkg/controller/controller_test.go

Lines changed: 142 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
kubetesting "k8s.io/client-go/testing"
2323
"k8s.io/client-go/tools/cache"
2424

25-
"github.com/argoproj/notifications-engine/pkg/api"
25+
notificationApi "github.com/argoproj/notifications-engine/pkg/api"
2626
"github.com/argoproj/notifications-engine/pkg/mocks"
2727
"github.com/argoproj/notifications-engine/pkg/services"
2828
"github.com/argoproj/notifications-engine/pkg/subscriptions"
@@ -72,7 +72,6 @@ func newController(t *testing.T, ctx context.Context, client dynamic.Interface,
7272
mockCtrl.Finish()
7373
}()
7474
mockAPI := mocks.NewMockAPI(mockCtrl)
75-
mockAPI.EXPECT().GetConfig().Return(api.Config{}).AnyTimes()
7675
resourceClient := client.Resource(testGVR)
7776
informer := cache.NewSharedIndexInformer(
7877
&cache.ListWatch{
@@ -98,6 +97,45 @@ func newController(t *testing.T, ctx context.Context, client dynamic.Interface,
9897
return c, mockAPI, nil
9998
}
10099

100+
func newControllerWithNamespaceSupport(t *testing.T, ctx context.Context, client dynamic.Interface, opts ...Opts) (*notificationController, map[string]notificationApi.API, error) {
101+
mockCtrl := gomock.NewController(t)
102+
go func() {
103+
<-ctx.Done()
104+
mockCtrl.Finish()
105+
}()
106+
107+
resourceClient := client.Resource(testGVR)
108+
informer := cache.NewSharedIndexInformer(
109+
&cache.ListWatch{
110+
ListFunc: func(options v1.ListOptions) (object runtime.Object, err error) {
111+
return resourceClient.List(context.Background(), options)
112+
},
113+
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
114+
return resourceClient.Watch(context.Background(), options)
115+
},
116+
},
117+
&unstructured.Unstructured{},
118+
time.Minute,
119+
cache.Indexers{},
120+
)
121+
122+
go informer.Run(ctx.Done())
123+
124+
apiMap := make(map[string]notificationApi.API)
125+
mockAPIDefault := mocks.NewMockAPI(mockCtrl)
126+
apiMap["default"] = mockAPIDefault
127+
128+
mockAPISelfService := mocks.NewMockAPI(mockCtrl)
129+
apiMap["selfservice_namespace"] = mockAPISelfService
130+
131+
c := NewControllerWithNamespaceSupport(resourceClient, informer, &mocks.FakeFactory{ApiMap: apiMap}, opts...)
132+
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
133+
return nil, nil, errors.New("failed to sync informers")
134+
}
135+
136+
return c, apiMap, nil
137+
}
138+
101139
func TestSendsNotificationIfTriggered(t *testing.T) {
102140
ctx, cancel := context.WithCancel(context.TODO())
103141
defer cancel()
@@ -109,6 +147,7 @@ func TestSendsNotificationIfTriggered(t *testing.T) {
109147
assert.NoError(t, err)
110148

111149
receivedObj := map[string]interface{}{}
150+
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
112151
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
113152
api.EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
114153
receivedObj = obj
@@ -123,22 +162,23 @@ func TestSendsNotificationIfTriggered(t *testing.T) {
123162
assert.NoError(t, err)
124163

125164
state := NewState(annotations[notifiedAnnotationKey])
126-
assert.NotNil(t, state[StateItemKey("mock", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
165+
assert.NotNil(t, state[StateItemKey(false, "", "mock", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
127166
assert.Equal(t, app.Object, receivedObj)
128167
}
129168

130169
func TestDoesNotSendNotificationIfAnnotationPresent(t *testing.T) {
131170
ctx, cancel := context.WithCancel(context.TODO())
132171
defer cancel()
133172
state := NotificationsState{}
134-
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
173+
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
135174
app := newResource("test", withAnnotations(map[string]string{
136175
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
137176
notifiedAnnotationKey: mustToJson(state),
138177
}))
139178
ctrl, api, err := newController(t, ctx, newFakeClient(app))
140179
assert.NoError(t, err)
141180

181+
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
142182
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
143183

144184
_, err = ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
@@ -153,14 +193,15 @@ func TestRemovesAnnotationIfNoTrigger(t *testing.T) {
153193
defer cancel()
154194

155195
state := NotificationsState{}
156-
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
196+
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
157197
app := newResource("test", withAnnotations(map[string]string{
158198
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
159199
notifiedAnnotationKey: mustToJson(state),
160200
}))
161201
ctrl, api, err := newController(t, ctx, newFakeClient(app))
162202
assert.NoError(t, err)
163203

204+
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
164205
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil)
165206

166207
annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
@@ -177,7 +218,7 @@ func TestUpdatedAnnotationsSavedAsPatch(t *testing.T) {
177218
defer cancel()
178219

179220
state := NotificationsState{}
180-
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
221+
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
181222

182223
app := newResource("test", withAnnotations(map[string]string{
183224
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
@@ -193,6 +234,7 @@ func TestUpdatedAnnotationsSavedAsPatch(t *testing.T) {
193234
})
194235
ctrl, api, err := newController(t, ctx, client)
195236
assert.NoError(t, err)
237+
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
196238
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil)
197239

198240
go ctrl.Run(1, ctx.Done())
@@ -324,6 +366,7 @@ func TestWithEventCallback(t *testing.T) {
324366

325367
ctrl, api, err := newController(t, ctx, newFakeClient(app), WithEventCallback(mockEventCallback))
326368
ctrl.namespaceSupport = false
369+
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
327370
assert.NoError(t, err)
328371
ctrl.apiFactory = &mocks.FakeFactory{Api: api, Err: tc.apiErr}
329372

@@ -349,3 +392,96 @@ func TestWithEventCallback(t *testing.T) {
349392
})
350393
}
351394
}
395+
396+
// verify annotations after calling processResourceWithAPI when using self-service
397+
func TestProcessResourceWithAPIWithSelfService(t *testing.T) {
398+
ctx, cancel := context.WithCancel(context.TODO())
399+
defer cancel()
400+
app := newResource("test", withAnnotations(map[string]string{
401+
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
402+
}))
403+
404+
ctrl, api, err := newController(t, ctx, newFakeClient(app))
405+
assert.NoError(t, err)
406+
ctrl.namespaceSupport = true
407+
408+
trigger := "my-trigger"
409+
namespace := "my-namespace"
410+
411+
receivedObj := map[string]interface{}{}
412+
413+
//SelfService API: config has IsSelfServiceConfig set to true
414+
api.EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: namespace}).AnyTimes()
415+
api.EXPECT().RunTrigger(trigger, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
416+
api.EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
417+
receivedObj = obj
418+
return true
419+
}), []string{"test"}, services.Destination{Service: "mock", Recipient: "recipient"}).Return(nil)
420+
421+
annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
422+
if err != nil {
423+
logEntry.Errorf("Failed to process: %v", err)
424+
}
425+
426+
assert.NoError(t, err)
427+
428+
state := NewState(annotations[notifiedAnnotationKey])
429+
assert.NotZero(t, state[StateItemKey(true, namespace, trigger, triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
430+
assert.Equal(t, app.Object, receivedObj)
431+
}
432+
433+
// verify notification sent to both default and self-service configuration after calling processResourceWithAPI when using self-service
434+
func TestProcessItemsWithSelfService(t *testing.T) {
435+
const triggerName = "my-trigger"
436+
destination := services.Destination{Service: "mock", Recipient: "recipient"}
437+
438+
var actualSequence *NotificationEventSequence
439+
mockEventCallback := func(eventSequence NotificationEventSequence) {
440+
actualSequence = &eventSequence
441+
}
442+
443+
ctx, cancel := context.WithCancel(context.TODO())
444+
defer cancel()
445+
app := newResource("test", withAnnotations(map[string]string{
446+
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
447+
}))
448+
449+
ctrl, apiMap, err := newControllerWithNamespaceSupport(t, ctx, newFakeClient(app), WithEventCallback(mockEventCallback))
450+
assert.NoError(t, err)
451+
452+
ctrl.namespaceSupport = true
453+
//SelfService API: config has IsSelfServiceConfig set to true
454+
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: "selfservice_namespace"}).Times(3)
455+
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
456+
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
457+
return true
458+
}), []string{"test"}, destination).Return(nil).AnyTimes()
459+
460+
apiMap["default"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: false, Namespace: "default"}).Times(3)
461+
apiMap["default"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
462+
apiMap["default"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
463+
return true
464+
}), []string{"test"}, destination).Return(nil).AnyTimes()
465+
466+
ctrl.apiFactory = &mocks.FakeFactory{ApiMap: apiMap}
467+
468+
ctrl.processQueueItem()
469+
470+
assert.Equal(t, app, actualSequence.Resource)
471+
472+
expectedDeliveries := []NotificationDelivery{
473+
{
474+
Trigger: triggerName,
475+
Destination: destination,
476+
},
477+
{
478+
Trigger: triggerName,
479+
Destination: destination,
480+
},
481+
}
482+
for i, event := range actualSequence.Delivered {
483+
assert.Equal(t, expectedDeliveries[i].Trigger, event.Trigger)
484+
assert.Equal(t, expectedDeliveries[i].Destination, event.Destination)
485+
}
486+
487+
}

pkg/controller/state.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@ const (
1717
notifiedHistoryMaxSize = 100
1818
)
1919

20-
func StateItemKey(trigger string, conditionResult triggers.ConditionResult, dest services.Destination) string {
21-
key := fmt.Sprintf("%s:%s:%s:%s", trigger, conditionResult.Key, dest.Service, dest.Recipient)
20+
func StateItemKey(isSelfConfig bool, apiNamespace, trigger string, conditionResult triggers.ConditionResult, dest services.Destination) string {
21+
var key string
22+
if isSelfConfig {
23+
key = fmt.Sprintf("%s:%s:%s:%s:%s", apiNamespace, trigger, conditionResult.Key, dest.Service, dest.Recipient)
24+
} else {
25+
key = fmt.Sprintf("%s:%s:%s:%s", trigger, conditionResult.Key, dest.Service, dest.Recipient)
26+
}
2227
if conditionResult.OncePer != "" {
2328
key = conditionResult.OncePer + ":" + key
2429
}
@@ -47,8 +52,8 @@ func (s NotificationsState) truncate(maxSize int) {
4752
}
4853

4954
// SetAlreadyNotified set the state of given trigger/destination and return if state has been changed
50-
func (s NotificationsState) SetAlreadyNotified(trigger string, result triggers.ConditionResult, dest services.Destination, isNotified bool) bool {
51-
key := StateItemKey(trigger, result, dest)
55+
func (s NotificationsState) SetAlreadyNotified(isSelfConfig bool, apiNamespace, trigger string, result triggers.ConditionResult, dest services.Destination, isNotified bool) bool {
56+
key := StateItemKey(isSelfConfig, apiNamespace, trigger, result, dest)
5257
if _, alreadyNotified := s[key]; alreadyNotified == isNotified {
5358
return false
5459
}

pkg/controller/state_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ func TestSetAlreadyNotified(t *testing.T) {
2626
dest := services.Destination{Service: "slack", Recipient: "my-channel"}
2727

2828
state := NotificationsState{}
29-
changed := state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
29+
changed := state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
3030

3131
assert.True(t, changed)
3232
_, ok := state["app-synced:0:slack:my-channel"]
3333
assert.True(t, ok)
3434

35-
changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
35+
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
3636
assert.False(t, changed)
3737

38-
changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, false)
38+
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, false)
3939
assert.True(t, changed)
4040
_, ok = state["app-synced:0:slack:my-channel"]
4141
assert.False(t, ok)
@@ -45,13 +45,13 @@ func TestSetAlreadyNotified_OncePerItem(t *testing.T) {
4545
dest := services.Destination{Service: "slack", Recipient: "my-channel"}
4646

4747
state := NotificationsState{}
48-
changed := state.SetAlreadyNotified("app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, true)
48+
changed := state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, true)
4949

5050
assert.True(t, changed)
5151
_, ok := state["abc:app-synced:0:slack:my-channel"]
5252
assert.True(t, ok)
5353

54-
changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, false)
54+
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, false)
5555
assert.False(t, changed)
5656
_, ok = state["abc:app-synced:0:slack:my-channel"]
5757
assert.True(t, ok)

0 commit comments

Comments
 (0)