Skip to content

Commit 5f01fd2

Browse files
authored
refactor: simplify some file structure (#72)
1 parent 983f36c commit 5f01fd2

File tree

4 files changed

+42
-74
lines changed

4 files changed

+42
-74
lines changed

cmd/engine.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/urfave/cli/v2"
1010
"github.com/w-h-a/pkg/serverv2"
11-
"github.com/w-h-a/workflow/internal/engine"
1211
"github.com/w-h-a/workflow/internal/engine/clients/broker"
1312
memorybroker "github.com/w-h-a/workflow/internal/engine/clients/broker/memory"
1413
"github.com/w-h-a/workflow/internal/engine/clients/broker/nats"
@@ -21,9 +20,12 @@ import (
2120
"github.com/w-h-a/workflow/internal/engine/clients/runner"
2221
"github.com/w-h-a/workflow/internal/engine/clients/runner/docker"
2322
"github.com/w-h-a/workflow/internal/engine/config"
23+
"github.com/w-h-a/workflow/internal/engine/handlers/http"
2424
"github.com/w-h-a/workflow/internal/engine/services/coordinator"
2525
"github.com/w-h-a/workflow/internal/engine/services/streamer"
2626
"github.com/w-h-a/workflow/internal/engine/services/worker"
27+
"github.com/w-h-a/workflow/internal/log"
28+
"github.com/w-h-a/workflow/internal/task"
2729
"go.opentelemetry.io/contrib/bridges/otelslog"
2830
"go.opentelemetry.io/contrib/instrumentation/host"
2931
"go.opentelemetry.io/contrib/instrumentation/runtime"
@@ -149,10 +151,15 @@ func StartEngine(ctx *cli.Context) error {
149151
var streamerServer serverv2.Server
150152
var s *streamer.Service
151153
if config.Mode() == "standalone" || config.Mode() == "streamer" {
152-
streamerServer, s = engine.NewStreamer(
154+
s = streamer.New(
153155
brokerClient,
156+
map[string]int{
157+
string(log.Queue): 1,
158+
},
154159
)
155160

161+
streamerServer = http.NewStreamerServer(s)
162+
156163
numServices += 2
157164
}
158165

@@ -162,11 +169,14 @@ func StartEngine(ctx *cli.Context) error {
162169
if config.Mode() == "standalone" || config.Mode() == "worker" {
163170
runnerClient := initRunner()
164171

165-
workerServer, w = engine.NewWorker(
166-
brokerClient,
172+
w = worker.New(
167173
runnerClient,
174+
brokerClient,
175+
config.WorkerQueues(),
168176
)
169177

178+
workerServer = http.NewWorkerServer(w)
179+
170180
numServices += 2
171181
}
172182

@@ -177,12 +187,19 @@ func StartEngine(ctx *cli.Context) error {
177187
readwriterClient := initReadWriter()
178188
notifierClient := initNotifier()
179189

180-
coordinatorServer, c = engine.NewCoordinator(
190+
c = coordinator.New(
181191
brokerClient,
182192
readwriterClient,
183193
notifierClient,
194+
map[string]int{
195+
string(task.Started): 1,
196+
string(task.Completed): 1,
197+
string(task.Failed): 1,
198+
},
184199
)
185200

201+
coordinatorServer = http.NewCoordinatorServer(c)
202+
186203
numServices += 2
187204
}
188205

cmd/migration.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package cmd
22

33
import (
44
"github.com/urfave/cli/v2"
5-
"github.com/w-h-a/workflow/internal/migration"
65
"github.com/w-h-a/workflow/internal/migration/clients/migrator"
76
"github.com/w-h-a/workflow/internal/migration/clients/migrator/postgres"
7+
schemamanager "github.com/w-h-a/workflow/internal/migration/services/schema_manager"
88
)
99

1010
func RunMigrations(ctx *cli.Context) error {
@@ -14,7 +14,7 @@ func RunMigrations(ctx *cli.Context) error {
1414
)
1515

1616
// get service
17-
schemaManager := migration.NewSchemaManager(m)
17+
schemaManager := schemamanager.New(m)
1818

1919
// use service
2020
return schemaManager.CreateSchema()

internal/engine/engine.go renamed to internal/engine/handlers/http/server.go

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,22 @@
1-
package engine
1+
package http
22

33
import (
44
"net/http"
55

66
"github.com/gorilla/mux"
77
"github.com/w-h-a/pkg/serverv2"
88
httpserver "github.com/w-h-a/pkg/serverv2/http"
9-
"github.com/w-h-a/workflow/internal/engine/clients/broker"
10-
"github.com/w-h-a/workflow/internal/engine/clients/notifier"
11-
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
12-
"github.com/w-h-a/workflow/internal/engine/clients/runner"
139
"github.com/w-h-a/workflow/internal/engine/config"
14-
httphandlers "github.com/w-h-a/workflow/internal/engine/handlers/http"
1510
"github.com/w-h-a/workflow/internal/engine/services/coordinator"
1611
"github.com/w-h-a/workflow/internal/engine/services/streamer"
1712
"github.com/w-h-a/workflow/internal/engine/services/worker"
18-
"github.com/w-h-a/workflow/internal/log"
19-
"github.com/w-h-a/workflow/internal/task"
2013
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
2114
"go.opentelemetry.io/otel"
2215
)
2316

24-
func NewCoordinator(
25-
brokerClient broker.Broker,
26-
readwriterClient readwriter.ReadWriter,
27-
notifierClient notifier.Notifier,
28-
) (serverv2.Server, *coordinator.Service) {
29-
// services
30-
coordinatorService := coordinator.New(
31-
brokerClient,
32-
readwriterClient,
33-
notifierClient,
34-
map[string]int{
35-
string(task.Started): 1,
36-
string(task.Completed): 1,
37-
string(task.Failed): 1,
38-
},
39-
)
40-
17+
func NewCoordinatorServer(
18+
coordinatorService *coordinator.Service,
19+
) serverv2.Server {
4120
// base server options
4221
opts := []serverv2.ServerOption{
4322
serverv2.ServerWithNamespace(config.Env()),
@@ -48,10 +27,10 @@ func NewCoordinator(
4827
// create http router
4928
router := mux.NewRouter()
5029

51-
httpStatus := httphandlers.NewStatusHandler(coordinatorService)
30+
httpStatus := NewStatusHandler(coordinatorService)
5231
router.Methods(http.MethodGet).Path("/status").HandlerFunc(httpStatus.GetStatus)
5332

54-
httpTasks := httphandlers.NewTasksHandler(coordinatorService)
33+
httpTasks := NewTasksHandler(coordinatorService)
5534
router.Methods(http.MethodGet).Path("/tasks").HandlerFunc(httpTasks.GetTasks)
5635
router.Methods(http.MethodGet).Path("/tasks/{id}").HandlerFunc(httpTasks.GetOneTask)
5736
router.Methods(http.MethodPut).Path("/tasks/cancel/{id}").HandlerFunc(httpTasks.PutCancelTask)
@@ -78,20 +57,12 @@ func NewCoordinator(
7857

7958
httpServer.Handle(handler)
8059

81-
return httpServer, coordinatorService
60+
return httpServer
8261
}
8362

84-
func NewWorker(
85-
brokerClient broker.Broker,
86-
runnerClient runner.Runner,
87-
) (serverv2.Server, *worker.Service) {
88-
// services
89-
workerService := worker.New(
90-
runnerClient,
91-
brokerClient,
92-
config.WorkerQueues(),
93-
)
94-
63+
func NewWorkerServer(
64+
workerService *worker.Service,
65+
) serverv2.Server {
9566
// base server options
9667
opts := []serverv2.ServerOption{
9768
serverv2.ServerWithNamespace(config.Env()),
@@ -102,7 +73,7 @@ func NewWorker(
10273
// create http router
10374
router := mux.NewRouter()
10475

105-
httpStatus := httphandlers.NewStatusHandler(workerService)
76+
httpStatus := NewStatusHandler(workerService)
10677
router.Methods(http.MethodGet).Path("/status").HandlerFunc(httpStatus.GetStatus)
10778

10879
// create http server
@@ -125,20 +96,12 @@ func NewWorker(
12596

12697
httpServer.Handle(handler)
12798

128-
return httpServer, workerService
99+
return httpServer
129100
}
130101

131-
func NewStreamer(
132-
brokerClient broker.Broker,
133-
) (serverv2.Server, *streamer.Service) {
134-
// services
135-
streamerService := streamer.New(
136-
brokerClient,
137-
map[string]int{
138-
string(log.Queue): 1,
139-
},
140-
)
141-
102+
func NewStreamerServer(
103+
streamerService *streamer.Service,
104+
) serverv2.Server {
142105
// base server options
143106
opts := []serverv2.ServerOption{
144107
serverv2.ServerWithNamespace(config.Env()),
@@ -149,10 +112,10 @@ func NewStreamer(
149112
// create http router
150113
router := mux.NewRouter()
151114

152-
httpStatus := httphandlers.NewStatusHandler(streamerService)
115+
httpStatus := NewStatusHandler(streamerService)
153116
router.Methods(http.MethodGet).Path("/status").HandlerFunc(httpStatus.GetStatus)
154117

155-
httpLogs := httphandlers.NewLogsHandler(streamerService)
118+
httpLogs := NewLogsHandler(streamerService)
156119
router.Methods(http.MethodGet).Path("/logs/{id}").HandlerFunc(httpLogs.StreamLogs)
157120

158121
// create http server
@@ -175,5 +138,5 @@ func NewStreamer(
175138

176139
httpServer.Handle(handler)
177140

178-
return httpServer, streamerService
141+
return httpServer
179142
}

internal/migration/migration.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)