@@ -16,6 +16,7 @@ import (
1616 "github.com/layer5io/meshkit/logger"
1717 "github.com/layer5io/meshkit/models"
1818 "github.com/layer5io/meshkit/models/oam/core/v1alpha1"
19+ "github.com/layer5io/meshkit/utils/events"
1920 "gopkg.in/yaml.v2"
2021)
2122
@@ -25,24 +26,24 @@ type AppMesh struct {
2526}
2627
2728// New initializes AppMesh handler.
28- func New (c meshkitCfg.Handler , l logger.Handler , kc meshkitCfg.Handler ) adapter.Handler {
29+ func New (c meshkitCfg.Handler , l logger.Handler , kc meshkitCfg.Handler , e * events. EventStreamer ) adapter.Handler {
2930 return & AppMesh {
3031 Adapter : adapter.Adapter {
3132 Config : c ,
3233 Log : l ,
3334 KubeconfigHandler : kc ,
35+ EventStreamer : e ,
3436 },
3537 }
3638}
3739
3840// ApplyOperation applies the requested operation on app-mesh
39- func (appMesh * AppMesh ) ApplyOperation (ctx context.Context , opReq adapter.OperationRequest , hchan * chan interface {} ) error {
41+ func (appMesh * AppMesh ) ApplyOperation (ctx context.Context , opReq adapter.OperationRequest ) error {
4042 err := appMesh .CreateKubeconfigs (opReq .K8sConfigs )
4143 if err != nil {
4244 return err
4345 }
4446 kubeConfigs := opReq .K8sConfigs
45- appMesh .SetChannel (hchan );
4647
4748 operations := make (adapter.Operations )
4849 err = appMesh .Config .GetObject (adapter .OperationsKey , & operations )
@@ -51,10 +52,10 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat
5152 }
5253
5354 e := & meshes.EventsResponse {
54- OperationId : opReq .OperationID ,
55- Summary : status .Deploying ,
56- Details : "Operation is not supported" ,
57- Component : internalconfig .ServerConfig ["type" ],
55+ OperationId : opReq .OperationID ,
56+ Summary : status .Deploying ,
57+ Details : "Operation is not supported" ,
58+ Component : internalconfig .ServerConfig ["type" ],
5859 ComponentName : internalconfig .ServerConfig ["name" ],
5960 }
6061 stat := status .Deploying
@@ -65,12 +66,12 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat
6566 version := string (operations [opReq .OperationName ].Versions [0 ])
6667 if stat , err = hh .installAppMesh (opReq .IsDeleteOperation , version , opReq .Namespace , kubeConfigs ); err != nil {
6768 summary := fmt .Sprintf ("Error while %s AWS App mesh" , stat )
68- hh .streamErr (summary , e , err )
69+ hh .streamErr (summary , ee , err )
6970 return
7071 }
7172 ee .Summary = fmt .Sprintf ("App mesh %s successfully" , stat )
7273 ee .Details = fmt .Sprintf ("The App mesh is now %s." , stat )
73- hh .StreamInfo (e )
74+ hh .StreamInfo (ee )
7475 }(appMesh , e )
7576
7677 case internalconfig .LabelNamespace :
@@ -116,25 +117,25 @@ func (appMesh *AppMesh) ApplyOperation(ctx context.Context, opReq adapter.Operat
116117 stat , err := hh .installSampleApp (opReq .Namespace , opReq .IsDeleteOperation , operations [opReq .OperationName ].Templates , kubeConfigs )
117118 if err != nil {
118119 summary := fmt .Sprintf ("Error while %s %s application" , stat , appName )
119- hh .streamErr (summary , e , err )
120+ hh .streamErr (summary , ee , err )
120121 return
121122 }
122123 ee .Summary = fmt .Sprintf ("%s application %s successfully" , appName , stat )
123124 ee .Details = fmt .Sprintf ("The %s application is now %s." , appName , stat )
124- hh .StreamInfo (e )
125+ hh .StreamInfo (ee )
125126 }(appMesh , e )
126127
127128 case common .CustomOperation :
128129 go func (hh * AppMesh , ee * meshes.EventsResponse ) {
129130 stat , err := hh .applyCustomOperation (opReq .Namespace , opReq .CustomBody , opReq .IsDeleteOperation , kubeConfigs )
130131 if err != nil {
131132 summary := fmt .Sprintf ("Error while %s custom operation" , stat )
132- hh .streamErr (summary , e , err )
133+ hh .streamErr (summary , ee , err )
133134 return
134135 }
135136 ee .Summary = fmt .Sprintf ("Manifest %s successfully" , status .Deployed )
136137 ee .Details = ""
137- hh .StreamInfo (e )
138+ hh .StreamInfo (ee )
138139 }(appMesh , e )
139140 default :
140141 appMesh .streamErr ("Invalid operation" , e , ErrOpInvalid )
@@ -189,8 +190,8 @@ func (appMesh *AppMesh) CreateKubeconfigs(kubeconfigs []string) error {
189190}
190191
191192// ProcessOAM handles the grpc invocation for handling OAM objects
192- func (appMesh * AppMesh ) ProcessOAM (ctx context.Context , oamReq adapter.OAMRequest , hchan * chan interface {} ) (string , error ) {
193- appMesh . SetChannel ( hchan )
193+ func (appMesh * AppMesh ) ProcessOAM (ctx context.Context , oamReq adapter.OAMRequest ) (string , error ) {
194+
194195 err := appMesh .CreateKubeconfigs (oamReq .K8sConfigs )
195196 if err != nil {
196197 return "" , err
@@ -245,11 +246,11 @@ func (appMesh *AppMesh) ProcessOAM(ctx context.Context, oamReq adapter.OAMReques
245246 return msg1 + "\n " + msg2 , nil
246247}
247248
248- func (appMesh * AppMesh ) streamErr (summary string , e * meshes.EventsResponse , err error ) {
249+ func (appMesh * AppMesh ) streamErr (summary string , e * meshes.EventsResponse , err error ) {
249250 e .Summary = summary
250251 e .Details = err .Error ()
251252 e .ErrorCode = errors .GetCode (err )
252253 e .ProbableCause = errors .GetCause (err )
253254 e .SuggestedRemediation = errors .GetRemedy (err )
254255 appMesh .StreamErr (e , err )
255- }
256+ }
0 commit comments