Skip to content

Commit 07f0707

Browse files
authored
feat: add real-time log streaming endpoint (#55)
1 parent 6db9ae0 commit 07f0707

File tree

9 files changed

+287
-18
lines changed

9 files changed

+287
-18
lines changed

cmd/container_logs.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cmd
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/urfave/cli/v2"
7+
containerlogs "github.com/w-h-a/workflow/internal/container_logs"
8+
)
9+
10+
func StreamContainerLogs(ctx *cli.Context) error {
11+
taskID := ctx.String("task_id")
12+
13+
containerlogs.RunStreamerClient(&http.Client{}, taskID)
14+
15+
return nil
16+
}

cmd/engine.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/w-h-a/workflow/internal/engine/clients/runner/docker"
2020
"github.com/w-h-a/workflow/internal/engine/config"
2121
"github.com/w-h-a/workflow/internal/engine/services/coordinator"
22+
"github.com/w-h-a/workflow/internal/engine/services/streamer"
2223
"github.com/w-h-a/workflow/internal/engine/services/worker"
2324
"go.opentelemetry.io/contrib/bridges/otelslog"
2425
"go.opentelemetry.io/contrib/instrumentation/host"
@@ -141,6 +142,17 @@ func StartEngine(ctx *cli.Context) error {
141142
stopChannels := map[string]chan struct{}{}
142143
numServices := 0
143144

145+
// streamer
146+
var streamerServer serverv2.Server
147+
var s *streamer.Service
148+
if config.Mode() == "standalone" || config.Mode() == "streamer" {
149+
streamerServer, s = engine.NewStreamer(
150+
brokerClient,
151+
)
152+
153+
numServices += 2
154+
}
155+
144156
// worker
145157
var workerServer serverv2.Server
146158
var w *worker.Service
@@ -172,6 +184,26 @@ func StartEngine(ctx *cli.Context) error {
172184
// error chan
173185
ch := make(chan error, numServices)
174186

187+
// start streamer
188+
if s != nil {
189+
stop := make(chan struct{})
190+
stopChannels["streamer"] = stop
191+
192+
wg.Add(1)
193+
go func() {
194+
defer wg.Done()
195+
slog.InfoContext(ctx.Context, "starting streamer")
196+
ch <- s.Start(stop)
197+
}()
198+
199+
wg.Add(1)
200+
go func() {
201+
defer wg.Done()
202+
slog.InfoContext(ctx.Context, "starting streamer http server", "address", config.StreamerHttp())
203+
ch <- streamerServer.Start()
204+
}()
205+
}
206+
175207
// start worker
176208
if w != nil {
177209
stop := make(chan struct{})
@@ -222,6 +254,10 @@ func StartEngine(ctx *cli.Context) error {
222254
// graceful shutdown
223255
slog.InfoContext(ctx.Context, "stopping...")
224256

257+
if stop, ok := stopChannels["streamer"]; ok {
258+
close(stop)
259+
}
260+
225261
if stop, ok := stopChannels["worker"]; ok {
226262
close(stop)
227263
}

docker-compose.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,31 @@ services:
5858
retries: 5
5959
start_period: 30s
6060

61+
streamer:
62+
container_name: streamer
63+
build: .
64+
command: engine
65+
ports:
66+
- "4002:4002"
67+
environment:
68+
- ENV=dev
69+
- NAME=streamer
70+
- VERSION=0.1.0-alpha.0
71+
- MODE=streamer
72+
- HTTP_ADDRESS=:4002
73+
- TRACES_ADDRESS=jaeger:4318
74+
- METRICS_ADDRESS=prometheus:9090
75+
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://prometheus:9090/api/v1/otlp/v1/metrics
76+
- BROKER=rabbit
77+
- BROKER_LOCATION=amqp://guest:guest@rabbitmq:5672
78+
depends_on:
79+
jaeger:
80+
condition: service_healthy
81+
prometheus:
82+
condition: service_healthy
83+
rabbitmq:
84+
condition: service_healthy
85+
6186
worker:
6287
container_name: worker
6388
build: .
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package containerlogs
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"net/http"
7+
)
8+
9+
func RunStreamerClient(client *http.Client, id string) {
10+
url := fmt.Sprintf("http://localhost:4002/logs/%s", id)
11+
12+
resp, err := client.Get(url)
13+
if err != nil {
14+
fmt.Println("Error making request:", err)
15+
return
16+
}
17+
18+
defer resp.Body.Close()
19+
20+
if resp.StatusCode != http.StatusOK {
21+
fmt.Printf("Server returned non-200 status: %s\n", resp.Status)
22+
return
23+
}
24+
25+
scanner := bufio.NewScanner(resp.Body)
26+
27+
for scanner.Scan() {
28+
fmt.Println(scanner.Text())
29+
}
30+
31+
if err := scanner.Err(); err != nil {
32+
fmt.Println("Error reading from stream:", err)
33+
}
34+
35+
fmt.Println("Connection closed.")
36+
}

internal/engine/config/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type config struct {
2626
mode string
2727
coordinatorHttp string
2828
workerHttp string
29+
streamerHttp string
2930
workerQueues map[string]int
3031
tracesAddress string
3132
metricsAddress string
@@ -50,6 +51,7 @@ func New() {
5051
mode: "standalone",
5152
coordinatorHttp: ":4000",
5253
workerHttp: ":4001",
54+
streamerHttp: ":4002",
5355
workerQueues: map[string]int{string(task.Scheduled): 1, string(task.Cancelled): 1},
5456
tracesAddress: "localhost:4318",
5557
metricsAddress: "localhost:4318",
@@ -93,6 +95,9 @@ func New() {
9395
if instance.mode == "worker" {
9496
instance.workerHttp = httpAddress
9597
}
98+
if instance.mode == "streamer" {
99+
instance.streamerHttp = httpAddress
100+
}
96101
}
97102

98103
qs := os.Getenv("QUEUES")
@@ -247,6 +252,14 @@ func WorkerHttp() string {
247252
return instance.workerHttp
248253
}
249254

255+
func StreamerHttp() string {
256+
if instance == nil {
257+
panic("cfg is nil")
258+
}
259+
260+
return instance.streamerHttp
261+
}
262+
250263
func WorkerQueues() map[string]int {
251264
if instance == nil {
252265
panic("cfg is nil")

internal/engine/engine.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"github.com/w-h-a/workflow/internal/engine/config"
1313
httphandlers "github.com/w-h-a/workflow/internal/engine/handlers/http"
1414
"github.com/w-h-a/workflow/internal/engine/services/coordinator"
15+
"github.com/w-h-a/workflow/internal/engine/services/streamer"
1516
"github.com/w-h-a/workflow/internal/engine/services/worker"
17+
"github.com/w-h-a/workflow/internal/log"
1618
"github.com/w-h-a/workflow/internal/task"
1719
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
1820
"go.opentelemetry.io/otel"
@@ -122,3 +124,53 @@ func NewWorker(
122124

123125
return httpServer, workerService
124126
}
127+
128+
func NewStreamer(
129+
brokerClient broker.Broker,
130+
) (serverv2.Server, *streamer.Service) {
131+
// services
132+
streamerService := streamer.New(
133+
brokerClient,
134+
map[string]int{
135+
string(log.Queue): 1,
136+
},
137+
)
138+
139+
// base server options
140+
opts := []serverv2.ServerOption{
141+
serverv2.ServerWithNamespace(config.Env()),
142+
serverv2.ServerWithName(config.Name()),
143+
serverv2.ServerWithVersion(config.Version()),
144+
}
145+
146+
// create http router
147+
router := mux.NewRouter()
148+
149+
httpStatus := httphandlers.NewStatusHandler(streamerService)
150+
router.Methods(http.MethodGet).Path("/status").HandlerFunc(httpStatus.GetStatus)
151+
152+
httpLogs := httphandlers.NewLogsHandler(streamerService)
153+
router.Methods(http.MethodGet).Path("/logs/{id}").HandlerFunc(httpLogs.StreamLogs)
154+
155+
// create http server
156+
httpOpts := []serverv2.ServerOption{
157+
serverv2.ServerWithAddress(config.StreamerHttp()),
158+
}
159+
160+
httpOpts = append(httpOpts, opts...)
161+
162+
httpServer := httpserver.NewServer(httpOpts...)
163+
164+
handler := otelhttp.NewHandler(
165+
router,
166+
"",
167+
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string { return r.URL.Path }),
168+
otelhttp.WithTracerProvider(otel.GetTracerProvider()),
169+
otelhttp.WithPropagators(otel.GetTextMapPropagator()),
170+
otelhttp.WithFilter(func(r *http.Request) bool { return r.URL.Path != "/status" }),
171+
)
172+
173+
httpServer.Handle(handler)
174+
175+
return httpServer, streamerService
176+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package http
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/w-h-a/workflow/internal/engine/services/streamer"
7+
)
8+
9+
type Logs struct {
10+
parser *Parser
11+
streamer *streamer.Service
12+
}
13+
14+
func (l *Logs) StreamLogs(w http.ResponseWriter, r *http.Request) {
15+
ctx := reqToCtx(r)
16+
17+
taskId, err := l.parser.ParseTaskId(ctx, r)
18+
if err != nil {
19+
wrtRsp(w, http.StatusBadRequest, map[string]any{"error": err.Error()})
20+
return
21+
}
22+
23+
w.Header().Set("Content-Type", "text/event-stream")
24+
w.Header().Set("Cache-Control", "no-cache")
25+
w.Header().Set("Connection", "keep-alive")
26+
27+
flusher, ok := w.(http.Flusher)
28+
if !ok {
29+
http.Error(w, "streaming not supported", http.StatusInternalServerError)
30+
return
31+
}
32+
33+
logStream, err := l.streamer.StreamLogs(ctx, taskId)
34+
if err != nil {
35+
http.Error(w, "failed to stream logs", http.StatusInternalServerError)
36+
return
37+
}
38+
39+
defer close(logStream.Stop)
40+
41+
for {
42+
select {
43+
case <-ctx.Done():
44+
return
45+
case logLine, ok := <-logStream.Logs:
46+
if !ok {
47+
return
48+
}
49+
50+
data := []byte("data: " + logLine + "\n\n")
51+
if _, err := w.Write(data); err != nil {
52+
return
53+
}
54+
flusher.Flush()
55+
}
56+
}
57+
}
58+
59+
func NewLogsHandler(streamerService *streamer.Service) *Logs {
60+
return &Logs{
61+
parser: &Parser{},
62+
streamer: streamerService,
63+
}
64+
}

0 commit comments

Comments
 (0)