@@ -21,6 +21,10 @@ package controller
2121import (
2222 "bufio"
2323 "fmt"
24+ "net/http"
25+ "strconv"
26+ "strings"
27+
2428 "github.com/go-chi/chi"
2529 "github.com/goodrain/rainbond-operator/util/constants"
2630 "github.com/goodrain/rainbond/api/handler"
@@ -35,9 +39,6 @@ import (
3539 "github.com/sirupsen/logrus"
3640 corev1 "k8s.io/api/core/v1"
3741 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38- "net/http"
39- "strconv"
40- "strings"
4142)
4243
4344// PodController is an implementation of PodInterface
@@ -142,25 +143,43 @@ func logs(w http.ResponseWriter, r *http.Request, podName string, namespace stri
142143 lines = 100
143144 }
144145 tailLines := int64 (lines )
145- container := ""
146- if strings .HasPrefix (podName , "rbd-gateway" ) {
147- container = "ingress-apisix"
148- }
149- req := k8s .Default ().Clientset .CoreV1 ().Pods (namespace ).GetLogs (podName , & corev1.PodLogOptions {
150- Follow : true ,
151- Timestamps : true ,
152- TailLines : & tailLines ,
153- Container : container ,
154- })
155- logrus .Infof ("Opening log stream for pod %s" , podName )
156146
157- stream , err := req .Stream (r .Context ())
147+ // Get container name from query parameter
148+ container := r .URL .Query ().Get ("container" )
149+
150+ // Get pod to check how many containers it has
151+ pod , err := k8s .Default ().Clientset .CoreV1 ().Pods (namespace ).Get (r .Context (), podName , metav1.GetOptions {})
158152 if err != nil {
159- logrus .Errorf ("Error opening log stream : %v" , err )
160- http .Error (w , "Error opening log stream" , http .StatusInternalServerError )
153+ logrus .Errorf ("Error getting pod %s : %v" , podName , err )
154+ http .Error (w , fmt . Sprintf ( "Error getting pod: %v" , err ) , http .StatusInternalServerError )
161155 return
162156 }
163- defer stream .Close ()
157+
158+ // Determine which containers to stream logs from
159+ var containers []string
160+ if container != "" {
161+ // Use specified container
162+ containers = append (containers , container )
163+ logrus .Infof ("Streaming logs from specified container: %s" , container )
164+ } else {
165+ // Special handling for rbd-gateway pods
166+ if strings .HasPrefix (podName , "rbd-gateway" ) {
167+ containers = append (containers , "apisix" )
168+ logrus .Infof ("rbd-gateway pod detected, using container: ingress-apisix" )
169+ } else {
170+ // Default behavior: stream all containers
171+ for _ , c := range pod .Spec .Containers {
172+ containers = append (containers , c .Name )
173+ }
174+ logrus .Infof ("No container specified, streaming logs from all containers: %v" , containers )
175+ }
176+ }
177+
178+ if len (containers ) == 0 {
179+ http .Error (w , "No containers found in pod" , http .StatusNotFound )
180+ return
181+ }
182+
164183 // Use Flusher to send headers to the client
165184 flusher , ok := w .(http.Flusher )
166185 if ! ok {
@@ -174,8 +193,91 @@ func logs(w http.ResponseWriter, r *http.Request, podName string, namespace stri
174193 w .Header ().Set ("Cache-Control" , "no-cache" )
175194 w .Header ().Set ("Connection" , "keep-alive" )
176195
177- scanner := bufio .NewScanner (stream )
196+ // If single container, use original logic
197+ if len (containers ) == 1 {
198+ streamContainerLogs (w , r , podName , namespace , containers [0 ], tailLines , flusher )
199+ return
200+ }
178201
202+ // For multiple containers, merge streams
203+ logrus .Infof ("Opening log streams for pod %s with %d containers" , podName , len (containers ))
204+
205+ // Create a channel to merge logs from all containers
206+ logChan := make (chan string , 100 )
207+ doneChan := make (chan struct {})
208+
209+ // Start goroutine for each container
210+ for _ , containerName := range containers {
211+ go func (cName string ) {
212+ req := k8s .Default ().Clientset .CoreV1 ().Pods (namespace ).GetLogs (podName , & corev1.PodLogOptions {
213+ Follow : true ,
214+ Timestamps : true ,
215+ TailLines : & tailLines ,
216+ Container : cName ,
217+ })
218+
219+ stream , err := req .Stream (r .Context ())
220+ if err != nil {
221+ logrus .Errorf ("Error opening log stream for container %s: %v" , cName , err )
222+ return
223+ }
224+ defer stream .Close ()
225+
226+ scanner := bufio .NewScanner (stream )
227+ for scanner .Scan () {
228+ select {
229+ case <- r .Context ().Done ():
230+ return
231+ case <- doneChan :
232+ return
233+ default :
234+ // Prefix log line with container name
235+ logLine := fmt .Sprintf ("[%s] %s" , cName , scanner .Text ())
236+ logChan <- logLine
237+ }
238+ }
239+ }(containerName )
240+ }
241+
242+ // Stream merged logs to client
243+ for {
244+ select {
245+ case <- r .Context ().Done ():
246+ close (doneChan )
247+ logrus .Warningf ("Request context done: %v" , r .Context ().Err ())
248+ return
249+ case logLine := <- logChan :
250+ msg := "data: " + logLine + "\n \n "
251+ _ , err := fmt .Fprintf (w , msg )
252+ flusher .Flush ()
253+ if err != nil {
254+ logrus .Errorf ("Error writing to response: %v" , err )
255+ close (doneChan )
256+ return
257+ }
258+ }
259+ }
260+ }
261+
262+ // streamContainerLogs streams logs from a single container
263+ func streamContainerLogs (w http.ResponseWriter , r * http.Request , podName , namespace , container string , tailLines int64 , flusher http.Flusher ) {
264+ req := k8s .Default ().Clientset .CoreV1 ().Pods (namespace ).GetLogs (podName , & corev1.PodLogOptions {
265+ Follow : true ,
266+ Timestamps : true ,
267+ TailLines : & tailLines ,
268+ Container : container ,
269+ })
270+ logrus .Infof ("Opening log stream for pod %s, container %s" , podName , container )
271+
272+ stream , err := req .Stream (r .Context ())
273+ if err != nil {
274+ logrus .Errorf ("Error opening log stream: %v" , err )
275+ http .Error (w , "Error opening log stream" , http .StatusInternalServerError )
276+ return
277+ }
278+ defer stream .Close ()
279+
280+ scanner := bufio .NewScanner (stream )
179281 for scanner .Scan () {
180282 select {
181283 case <- r .Context ().Done ():
0 commit comments