Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/adc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ type Config struct {
ServerAddrs []string
Token string
TlsVerify bool
BackendType string
}

// MarshalJSON implements custom JSON marshaling for adcConfig
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha1/gatewayproxy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ type ControlPlaneAuth struct {

// ControlPlaneProvider defines configuration for control plane provider.
// +kubebuilder:validation:XValidation:rule="has(self.endpoints) != has(self.service)"
// +kubebuilder:validation:XValidation:rule="oldSelf == null || (!has(self.mode) && !has(oldSelf.mode)) || self.mode == oldSelf.mode",message="mode is immutable"
type ControlPlaneProvider struct {
// Mode specifies the mode of control plane provider.
// Can be `apisix` or `apisix-standalone`.
//
// +kubebuilder:validation:Optional
Mode string `json:"mode,omitempty"`
// Endpoints specifies the list of control plane endpoints.
// +kubebuilder:validation:Optional
// +kubebuilder:validation:MinItems=1
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/apisix.apache.org_gatewayproxies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ spec:
type: string
minItems: 1
type: array
mode:
description: |-
Mode specifies the mode of control plane provider.
Can be `apisix` or `apisix-standalone`.
type: string
service:
properties:
name:
Expand All @@ -150,6 +155,9 @@ spec:
type: object
x-kubernetes-validations:
- rule: has(self.endpoints) != has(self.service)
- message: mode is immutable
rule: oldSelf == null || (!has(self.mode) && !has(oldSelf.mode))
|| self.mode == oldSelf.mode
type:
description: Type specifies the type of provider. Can only be
`ControlPlane`.
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/reference/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ ControlPlaneProvider defines configuration for control plane provider.

| Field | Description |
| --- | --- |
| `mode` _string_ | Mode specifies the mode of control plane provider. Can be `apisix` or `apisix-standalone`. |
| `endpoints` _string array_ | Endpoints specifies the list of control plane endpoints. |
| `service` _[ProviderService](#providerservice)_ | |
| `tlsVerify` _boolean_ | TlsVerify specifies whether to verify the TLS certificate of the control plane. |
Expand Down
16 changes: 10 additions & 6 deletions internal/adc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ type Client struct {
mu sync.Mutex
*cache.Store

executor ADCExecutor
BackendMode string
executor ADCExecutor

ConfigManager *common.ConfigManager[types.NamespacedNameKind, adctypes.Config]
ADCDebugProvider *common.ADCDebugProvider

defaultMode string

log logr.Logger
}

func New(log logr.Logger, mode string, timeout time.Duration) (*Client, error) {
func New(log logr.Logger, defaultMode string, timeout time.Duration) (*Client, error) {
serverURL := os.Getenv("ADC_SERVER_URL")
if serverURL == "" {
serverURL = defaultHTTPADCExecutorAddr
Expand All @@ -59,15 +60,15 @@ func New(log logr.Logger, mode string, timeout time.Duration) (*Client, error) {
configManager := common.NewConfigManager[types.NamespacedNameKind, adctypes.Config]()

logger := log.WithName("client")
logger.Info("ADC client initialized", "mode", mode)
logger.Info("ADC client initialized")

return &Client{
Store: store,
executor: NewHTTPADCExecutor(log, serverURL, timeout),
BackendMode: mode,
ConfigManager: configManager,
ADCDebugProvider: common.NewADCDebugProvider(store, configManager),
log: logger,
defaultMode: defaultMode,
}, nil
}

Expand Down Expand Up @@ -254,8 +255,11 @@ func (c *Client) sync(ctx context.Context, task Task) error {
if resourceType == "" {
resourceType = "all"
}
if config.BackendType == "" {
config.BackendType = c.defaultMode
}

err := c.executor.Execute(ctx, c.BackendMode, config, args)
err := c.executor.Execute(ctx, config, args)
duration := time.Since(startTime).Seconds()

status := adctypes.StatusSuccess
Expand Down
40 changes: 20 additions & 20 deletions internal/adc/client/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,25 @@ const (
)

type ADCExecutor interface {
Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error
Execute(ctx context.Context, config adctypes.Config, args []string) error
}

type DefaultADCExecutor struct {
sync.Mutex
log logr.Logger
}

func (e *DefaultADCExecutor) Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error {
return e.runADC(ctx, mode, config, args)
func (e *DefaultADCExecutor) Execute(ctx context.Context, config adctypes.Config, args []string) error {
return e.runADC(ctx, config, args)
}

func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adctypes.Config, args []string) error {
func (e *DefaultADCExecutor) runADC(ctx context.Context, config adctypes.Config, args []string) error {
var execErrs = types.ADCExecutionError{
Name: config.Name,
}

for _, addr := range config.ServerAddrs {
if err := e.runForSingleServerWithTimeout(ctx, addr, mode, config, args); err != nil {
if err := e.runForSingleServerWithTimeout(ctx, addr, config, args); err != nil {
e.log.Error(err, "failed to run adc for server", "server", addr)
var execErr types.ADCExecutionServerAddrError
if errors.As(err, &execErr) {
Expand All @@ -81,21 +81,21 @@ func (e *DefaultADCExecutor) runADC(ctx context.Context, mode string, config adc
return nil
}

func (e *DefaultADCExecutor) runForSingleServerWithTimeout(ctx context.Context, serverAddr, mode string, config adctypes.Config, args []string) error {
func (e *DefaultADCExecutor) runForSingleServerWithTimeout(ctx context.Context, serverAddr string, config adctypes.Config, args []string) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return e.runForSingleServer(ctx, serverAddr, mode, config, args)
return e.runForSingleServer(ctx, serverAddr, config, args)
}

func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr, mode string, config adctypes.Config, args []string) error {
func (e *DefaultADCExecutor) runForSingleServer(ctx context.Context, serverAddr string, config adctypes.Config, args []string) error {
cmdArgs := append([]string{}, args...)
if !config.TlsVerify {
cmdArgs = append(cmdArgs, "--tls-skip-verify")
}

cmdArgs = append(cmdArgs, "--timeout", "15s")

env := e.prepareEnv(serverAddr, mode, config.Token)
env := e.prepareEnv(serverAddr, config.BackendType, config.Token)

var stdout, stderr bytes.Buffer
cmd := exec.CommandContext(ctx, "adc", cmdArgs...)
Expand Down Expand Up @@ -250,26 +250,26 @@ func NewHTTPADCExecutor(log logr.Logger, serverURL string, timeout time.Duration
}

// Execute implements the ADCExecutor interface using HTTP calls
func (e *HTTPADCExecutor) Execute(ctx context.Context, mode string, config adctypes.Config, args []string) error {
return e.runHTTPSync(ctx, mode, config, args)
func (e *HTTPADCExecutor) Execute(ctx context.Context, config adctypes.Config, args []string) error {
return e.runHTTPSync(ctx, config, args)
}

// runHTTPSync performs HTTP sync to ADC Server for each server address
func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, mode string, config adctypes.Config, args []string) error {
func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, config adctypes.Config, args []string) error {
var execErrs = types.ADCExecutionError{
Name: config.Name,
}

serverAddrs := func() []string {
if mode == "apisix-standalone" {
if config.BackendType == "apisix-standalone" {
return []string{strings.Join(config.ServerAddrs, ",")}
}
return config.ServerAddrs
}()
e.log.V(1).Info("running http sync", "serverAddrs", serverAddrs, "mode", mode)
e.log.V(1).Info("running http sync", "serverAddrs", serverAddrs)

for _, addr := range serverAddrs {
if err := e.runHTTPSyncForSingleServer(ctx, addr, mode, config, args); err != nil {
if err := e.runHTTPSyncForSingleServer(ctx, addr, config, args); err != nil {
e.log.Error(err, "failed to run http sync for server", "server", addr)
var execErr types.ADCExecutionServerAddrError
if errors.As(err, &execErr) {
Expand All @@ -289,7 +289,7 @@ func (e *HTTPADCExecutor) runHTTPSync(ctx context.Context, mode string, config a
}

// runHTTPSyncForSingleServer performs HTTP sync to a single ADC Server
func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, serverAddr, mode string, config adctypes.Config, args []string) error {
func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, serverAddr string, config adctypes.Config, args []string) error {
ctx, cancel := context.WithTimeout(ctx, e.httpClient.Timeout)
defer cancel()

Expand All @@ -306,7 +306,7 @@ func (e *HTTPADCExecutor) runHTTPSyncForSingleServer(ctx context.Context, server
}

// Build HTTP request
req, err := e.buildHTTPRequest(ctx, serverAddr, mode, config, labels, types, resources)
req, err := e.buildHTTPRequest(ctx, serverAddr, config, labels, types, resources)
if err != nil {
return fmt.Errorf("failed to build HTTP request: %w", err)
}
Expand Down Expand Up @@ -379,13 +379,13 @@ func (e *HTTPADCExecutor) loadResourcesFromFile(filePath string) (*adctypes.Reso
}

// buildHTTPRequest builds the HTTP request for ADC Server
func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr, mode string, config adctypes.Config, labels map[string]string, types []string, resources *adctypes.Resources) (*http.Request, error) {
func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr string, config adctypes.Config, labels map[string]string, types []string, resources *adctypes.Resources) (*http.Request, error) {
// Prepare request body
tlsVerify := config.TlsVerify
reqBody := ADCServerRequest{
Task: ADCServerTask{
Opts: ADCServerOpts{
Backend: mode,
Backend: config.BackendType,
Server: strings.Split(serverAddr, ","),
Token: config.Token,
LabelSelector: labels,
Expand All @@ -407,7 +407,7 @@ func (e *HTTPADCExecutor) buildHTTPRequest(ctx context.Context, serverAddr, mode
e.log.V(1).Info("sending HTTP request to ADC Server",
"url", e.serverURL+"/sync",
"server", serverAddr,
"mode", mode,
"mode", config.BackendType,
"cacheKey", config.Name,
"labelSelector", labels,
"includeResourceType", types,
Expand Down
57 changes: 33 additions & 24 deletions internal/adc/translator/gatewayproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

types "github.com/apache/apisix-ingress-controller/api/adc"
"github.com/apache/apisix-ingress-controller/api/v1alpha1"
"github.com/apache/apisix-ingress-controller/internal/controller/config"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/utils"
)
Expand All @@ -44,47 +45,55 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte
if provider.Type != v1alpha1.ProviderTypeControlPlane || provider.ControlPlane == nil {
return nil, nil
}
cp := provider.ControlPlane

config := types.Config{
Name: utils.NamespacedNameKind(gatewayProxy).String(),
cfg := types.Config{
Name: utils.NamespacedNameKind(gatewayProxy).String(),
BackendType: cp.Mode,
}

if provider.ControlPlane.TlsVerify != nil {
config.TlsVerify = *provider.ControlPlane.TlsVerify
if cp.TlsVerify != nil {
cfg.TlsVerify = *cp.TlsVerify
}

if provider.ControlPlane.Auth.Type == v1alpha1.AuthTypeAdminKey && provider.ControlPlane.Auth.AdminKey != nil {
if provider.ControlPlane.Auth.AdminKey.ValueFrom != nil && provider.ControlPlane.Auth.AdminKey.ValueFrom.SecretKeyRef != nil {
secretRef := provider.ControlPlane.Auth.AdminKey.ValueFrom.SecretKeyRef
if cp.Auth.Type == v1alpha1.AuthTypeAdminKey && cp.Auth.AdminKey != nil {
if cp.Auth.AdminKey.ValueFrom != nil && cp.Auth.AdminKey.ValueFrom.SecretKeyRef != nil {
secretRef := cp.Auth.AdminKey.ValueFrom.SecretKeyRef
secret, ok := tctx.Secrets[k8stypes.NamespacedName{
// we should use gateway proxy namespace
Namespace: gatewayProxy.GetNamespace(),
Name: secretRef.Name,
}]
if ok {
if token, ok := secret.Data[secretRef.Key]; ok {
config.Token = string(token)
cfg.Token = string(token)
}
}
} else if provider.ControlPlane.Auth.AdminKey.Value != "" {
config.Token = provider.ControlPlane.Auth.AdminKey.Value
} else if cp.Auth.AdminKey.Value != "" {
cfg.Token = cp.Auth.AdminKey.Value
}
}

if config.Token == "" {
if cfg.Token == "" {
return nil, errors.New("no token found")
}

endpoints := provider.ControlPlane.Endpoints
endpoints := cp.Endpoints
if len(endpoints) > 0 {
config.ServerAddrs = endpoints
return &config, nil
cfg.ServerAddrs = endpoints
return &cfg, nil
}

if provider.ControlPlane.Service != nil {
// If Mode is empty, use the default static configuration.
// If Mode is set, resolve endpoints only when the ControlPlane is in standalone mode.
if cp.Mode != "" {
resolveEndpoints = cp.Mode == string(config.ProviderTypeStandalone)
}

if cp.Service != nil {
namespacedName := k8stypes.NamespacedName{
Namespace: gatewayProxy.Namespace,
Name: provider.ControlPlane.Service.Name,
Name: cp.Service.Name,
}
svc, ok := tctx.Services[namespacedName]
if !ok {
Expand All @@ -100,9 +109,9 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte
}
upstreamNodes, _, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
Name: gatewayv1.ObjectName(cp.Service.Name),
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),
Port: ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)),
Port: ptr.To(gatewayv1.PortNumber(cp.Service.Port)),
},
}, func(endpoint *discoveryv1.Endpoint) bool {
if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
Expand All @@ -115,21 +124,21 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte
return nil, err
}
for _, node := range upstreamNodes {
config.ServerAddrs = append(config.ServerAddrs, "http://"+net.JoinHostPort(node.Host, strconv.Itoa(node.Port)))
cfg.ServerAddrs = append(cfg.ServerAddrs, "http://"+net.JoinHostPort(node.Host, strconv.Itoa(node.Port)))
}
} else {
refPort := provider.ControlPlane.Service.Port
refPort := cp.Service.Port
var serverAddr string
if svc.Spec.Type == corev1.ServiceTypeExternalName {
serverAddr = fmt.Sprintf("http://%s:%d", svc.Spec.ExternalName, refPort)
} else {
serverAddr = fmt.Sprintf("http://%s.%s.svc:%d", provider.ControlPlane.Service.Name, gatewayProxy.Namespace, refPort)
serverAddr = fmt.Sprintf("http://%s.%s.svc:%d", cp.Service.Name, gatewayProxy.Namespace, refPort)
}
config.ServerAddrs = []string{serverAddr}
cfg.ServerAddrs = []string{serverAddr}
}

t.Log.V(1).Info("add server address to config.ServiceAddrs", "config.ServerAddrs", config.ServerAddrs)
t.Log.V(1).Info("add server address to config.ServiceAddrs", "config.ServerAddrs", cfg.ServerAddrs)
}

return &config, nil
return &cfg, nil
}
1 change: 0 additions & 1 deletion internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ func Run(ctx context.Context, logger logr.Logger) error {
SyncTimeout: config.ControllerConfig.ExecADCTimeout.Duration,
SyncPeriod: config.ControllerConfig.ProviderConfig.SyncPeriod.Duration,
InitSyncDelay: config.ControllerConfig.ProviderConfig.InitSyncDelay.Duration,
BackendMode: string(config.ControllerConfig.ProviderConfig.Type),
}
provider, err := provider.New(providerType, logger, updater.Writer(), readier, providerOptions)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/provider/apisix/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ type apisixProvider struct {
func New(log logr.Logger, updater status.Updater, readier readiness.ReadinessManager, opts ...provider.Option) (provider.Provider, error) {
o := provider.Options{}
o.ApplyOptions(opts)
if o.BackendMode == "" {
o.BackendMode = ProviderTypeAPISIX
if o.DefaultBackendMode == "" {
o.DefaultBackendMode = ProviderTypeAPISIX
}

cli, err := adcclient.New(log, o.BackendMode, o.SyncTimeout)
cli, err := adcclient.New(log, o.DefaultBackendMode, o.SyncTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (d *apisixProvider) Delete(ctx context.Context, obj client.Object) error {
func (d *apisixProvider) buildConfig(tctx *provider.TranslateContext, nnk types.NamespacedNameKind) (map[types.NamespacedNameKind]adctypes.Config, error) {
configs := make(map[types.NamespacedNameKind]adctypes.Config, len(tctx.ResourceParentRefs[nnk]))
for _, gp := range tctx.GatewayProxies {
config, err := d.translator.TranslateGatewayProxyToConfig(tctx, &gp, d.ResolveEndpoints)
config, err := d.translator.TranslateGatewayProxyToConfig(tctx, &gp, d.DefaultResolveEndpoints)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (d *apisixProvider) NeedLeaderElection() bool {

// updateConfigForGatewayProxy update config for all referrers of the GatewayProxy
func (d *apisixProvider) updateConfigForGatewayProxy(tctx *provider.TranslateContext, gp *v1alpha1.GatewayProxy) error {
config, err := d.translator.TranslateGatewayProxyToConfig(tctx, gp, d.ResolveEndpoints)
config, err := d.translator.TranslateGatewayProxyToConfig(tctx, gp, d.DefaultResolveEndpoints)
if err != nil {
return err
}
Expand Down
Loading
Loading