Skip to content

Commit 064a24f

Browse files
authored
Merge pull request #2 from itsouvalas/slowdown
Address performance issues
2 parents 621641e + e28e3db commit 064a24f

File tree

6 files changed

+146
-56
lines changed

6 files changed

+146
-56
lines changed

cmd/scheduler/main.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,38 @@ func main() {
168168
}
169169
}
170170

171+
// Schedule cleanup tasks
172+
retentionPeriod := 5 * 30 * 24 * time.Hour // 30 days
173+
ticker := time.NewTicker(24 * time.Hour) // run cleanup every day
174+
quit := make(chan struct{})
175+
signalChan := make(chan os.Signal, 1) // New channel for OS signals
176+
177+
go func() {
178+
for {
179+
select {
180+
case <-ticker.C:
181+
log.Info(tag, "Starting cleanup of old executions")
182+
183+
deletedExecutions, err := executions.CleanupOldExecutions(retentionPeriod)
184+
if err != nil {
185+
log.Error(tag, fmt.Sprintf("Error cleaning up old executions: %s", err.Error()))
186+
} else {
187+
if len(deletedExecutions) == 0 {
188+
log.Info(tag, "No executions were deleted.")
189+
} else {
190+
for _, execution := range deletedExecutions {
191+
log.Info(tag, fmt.Sprintf("Deleted execution: %s, Last Updated: %s", execution["guid"], execution["execution_end_time"]))
192+
}
193+
}
194+
}
195+
196+
case <-quit:
197+
ticker.Stop()
198+
return
199+
}
200+
}
201+
}()
202+
171203
server := http.Server(fmt.Sprintf("0.0.0.0:%d", port), services)
172204

173205
go func() {
@@ -178,10 +210,12 @@ func main() {
178210

179211
log.Info(tag, fmt.Sprintf("listening for connections on %s", server.Addr))
180212

181-
quit := make(chan os.Signal)
182-
signal.Notify(quit, os.Interrupt)
213+
signal.Notify(signalChan, os.Interrupt) // Use signalChan for signal notification
214+
215+
<-signalChan // Wait for signal
183216

184-
<-quit
217+
// Stop the cleanup ticker
218+
close(quit)
185219

186220
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
187221
defer cancel()

core/jobs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type JobService interface {
3737
Delete(*Job) error
3838
Named(string) (*Job, error)
3939
Persist(*Job) (*Job, error)
40-
InSpace(string) []*Job
40+
InSpace(string) ([]*Job, error)
4141
Success(*Job) (*Job, error)
4242
Fail(*Job) (*Job, error)
4343
}

cron/cron_service.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ type CronService struct {
1616

1717
func NewCronService(log core.LogService) *CronService {
1818
return &CronService{
19-
cron.New(),
20-
log,
21-
make(map[string]cron.EntryID)}
19+
Cron: cron.New(),
20+
log: log,
21+
mapping: make(map[string]cron.EntryID),
22+
}
2223
}
2324

2425
func (service *CronService) Add(runnable core.Runnable) error {
@@ -60,6 +61,7 @@ func (service *CronService) Add(runnable core.Runnable) error {
6061
}
6162

6263
service.mapping[schedule.GUID] = id
64+
service.logMappingSize("Added job to cron service")
6365

6466
return nil
6567
}
@@ -71,6 +73,8 @@ func (service *CronService) Delete(runnable core.Runnable) error {
7173
}
7274

7375
service.Remove(id)
76+
delete(service.mapping, runnable.Schedule().GUID)
77+
service.logMappingSize("Deleted job from cron service")
7478

7579
return nil
7680
}
@@ -84,3 +88,12 @@ func (service *CronService) Validate(expression string) error {
8488

8589
return err
8690
}
91+
92+
func (service *CronService) MappingSize() int {
93+
return len(service.mapping)
94+
}
95+
96+
func (service *CronService) logMappingSize(action string) {
97+
size := service.MappingSize()
98+
service.log.Info("cron-service", fmt.Sprintf("%s: current mapping size is %d", action, size))
99+
}

http/routes/all_jobs.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package routes
22

33
import (
4+
"fmt"
45
"net/http"
56

67
"github.com/labstack/echo/v4"
@@ -42,7 +43,11 @@ func AllJobs(e *echo.Echo, services *core.Services) {
4243

4344
spaceGUID := c.QueryParam("space_guid")
4445

45-
jobs := services.Jobs.InSpace(spaceGUID)
46+
jobs, err := services.Jobs.InSpace(spaceGUID)
47+
if err != nil {
48+
services.Logger.Error(tag, fmt.Sprintf("error retrieving jobs: %v", err))
49+
return c.JSON(http.StatusInternalServerError, "error retrieving jobs")
50+
}
4651

4752
output := &jobCollection{
4853
Resources: jobs,

postgres/execution_service.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,42 @@ func (service *ExecutionService) finish(execution *core.Execution, state string)
207207

208208
return service.update(execution)
209209
}
210+
211+
func (service *ExecutionService) CleanupOldExecutions(retentionPeriod time.Duration) ([]map[string]string, error) {
212+
cutoffTime := time.Now().Add(-retentionPeriod).UTC()
213+
var deletedExecutions []map[string]string
214+
215+
err := WithTransaction(service.db, func(tx Transaction) error {
216+
rows, err := tx.Query(
217+
"SELECT guid, execution_end_time FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')",
218+
cutoffTime,
219+
)
220+
if err != nil {
221+
return err
222+
}
223+
defer rows.Close()
224+
225+
for rows.Next() {
226+
var guid string
227+
var execution_end_time time.Time
228+
if err := rows.Scan(&guid, &execution_end_time); err != nil {
229+
return err
230+
}
231+
deletedExecutions = append(deletedExecutions, map[string]string{
232+
"guid": guid,
233+
"execution_end_time": execution_end_time.Format(time.RFC3339),
234+
})
235+
}
236+
237+
_, err = tx.Exec(
238+
"DELETE FROM executions WHERE execution_end_time < $1 AND state IN ('SUCCEEDED', 'FAILED')",
239+
cutoffTime,
240+
)
241+
return err
242+
})
243+
if err != nil {
244+
return nil, err
245+
}
246+
247+
return deletedExecutions, nil
248+
}

postgres/job_service.go

Lines changed: 47 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,24 @@ type JobService struct {
1212
db *sql.DB
1313
}
1414

15+
const (
16+
JobStatePending = "PENDING"
17+
JobStateSucceeded = "SUCCEEDED"
18+
JobStateFailed = "FAILED"
19+
)
20+
1521
func NewJobService(db *sql.DB) *JobService {
1622
return &JobService{db}
1723
}
1824

1925
func (service *JobService) Get(guid string) (*core.Job, error) {
20-
candidates := service.getCollection(
21-
"select * from jobs where guid = $1",
26+
candidates, err := service.getCollection(
27+
"SELECT * FROM jobs WHERE guid = $1",
2228
guid,
2329
)
30+
if err != nil {
31+
return nil, err
32+
}
2433

2534
if err := expectingOne(len(candidates)); err != nil {
2635
return nil, err
@@ -32,7 +41,7 @@ func (service *JobService) Get(guid string) (*core.Job, error) {
3241
func (service *JobService) Delete(job *core.Job) error {
3342
// Let's not try to delete something that isn't in the db
3443
if _, err := service.Get(job.GUID); err != nil {
35-
return nil
44+
return fmt.Errorf("job with GUID %s not found: %v", job.GUID, err)
3645
}
3746

3847
err := WithTransaction(service.db, func(tx Transaction) error {
@@ -48,10 +57,13 @@ func (service *JobService) Delete(job *core.Job) error {
4857
}
4958

5059
func (service *JobService) Named(name string) (*core.Job, error) {
51-
candidates := service.getCollection(
52-
"select * from jobs where name = $1",
60+
candidates, err := service.getCollection(
61+
"SELECT * FROM jobs WHERE name = $1",
5362
name,
5463
)
64+
if err != nil {
65+
return nil, err
66+
}
5567

5668
if err := expectingOne(len(candidates)); err != nil {
5769
return nil, err
@@ -65,13 +77,13 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {
6577

6678
guid, err := core.GenGUID()
6779
if err != nil {
68-
return nil, fmt.Errorf("coult not generate a job id")
80+
return nil, fmt.Errorf("could not generate a job id: %v", err)
6981
}
7082

7183
candidate.GUID = guid
7284
candidate.CreatedAt = now
7385
candidate.UpdatedAt = now
74-
candidate.State = "PENDING"
86+
candidate.State = JobStatePending
7587

7688
if candidate.DiskInMb == 0 {
7789
candidate.DiskInMb = 1024
@@ -83,7 +95,7 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {
8395

8496
err = WithTransaction(service.db, func(tx Transaction) error {
8597
_, aErr := tx.Exec(
86-
"INSERT INTO jobs VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
98+
"INSERT INTO jobs (guid, name, command, disk_in_mb, memory_in_mb, state, app_guid, space_guid, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
8799
candidate.GUID,
88100
candidate.Name,
89101
candidate.Command,
@@ -107,14 +119,12 @@ func (service *JobService) Persist(candidate *core.Job) (*core.Job, error) {
107119
}
108120

109121
func (service *JobService) Success(candidate *core.Job) (*core.Job, error) {
110-
candidate.State = "SUCCEEDED"
111-
122+
candidate.State = JobStateSucceeded
112123
return service.update(candidate)
113124
}
114125

115126
func (service *JobService) Fail(candidate *core.Job) (*core.Job, error) {
116-
candidate.State = "FAILED"
117-
127+
candidate.State = JobStateFailed
118128
return service.update(candidate)
119129
}
120130

@@ -125,7 +135,7 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) {
125135

126136
err := WithTransaction(service.db, func(tx Transaction) error {
127137
_, aErr := tx.Exec(
128-
"update jobs set updated_at = $3, state = $2 where guid = $1",
138+
"UPDATE jobs SET updated_at = $3, state = $2 WHERE guid = $1",
129139
candidate.GUID,
130140
candidate.State,
131141
candidate.UpdatedAt,
@@ -141,53 +151,42 @@ func (service *JobService) update(candidate *core.Job) (*core.Job, error) {
141151
return candidate, nil
142152
}
143153

144-
func (service *JobService) InSpace(guid string) []*core.Job {
145-
return service.getCollection(
146-
"select * from jobs where space_guid = $1 ORDER BY name ASC",
154+
func (service *JobService) InSpace(guid string) ([]*core.Job, error) {
155+
candidates, err := service.getCollection(
156+
"SELECT * FROM jobs WHERE space_guid = $1 ORDER BY name ASC",
147157
guid,
148158
)
159+
if err != nil {
160+
return nil, err
161+
}
162+
return candidates, nil
149163
}
150164

151-
func (service *JobService) getCollection(query string, args ...interface{}) []*core.Job {
152-
collection := make([]*core.Job, 0)
165+
func (service *JobService) scanJob(rows *sql.Rows) (*core.Job, error) {
166+
var job core.Job
167+
err := rows.Scan(&job.GUID, &job.Name, &job.Command, &job.DiskInMb, &job.MemoryInMb, &job.State, &job.AppGUID, &job.SpaceGUID, &job.CreatedAt, &job.UpdatedAt)
168+
if err != nil {
169+
return nil, err
170+
}
171+
return &job, nil
172+
}
173+
174+
func (service *JobService) getCollection(query string, args ...interface{}) ([]*core.Job, error) {
175+
var collection []*core.Job
153176

154177
rows, err := service.db.Query(query, args...)
155178
if err != nil {
156-
return collection
179+
return nil, err
157180
}
181+
defer rows.Close()
158182

159183
for rows.Next() {
160-
var guid string
161-
var name string
162-
var command string
163-
var diskInMb int
164-
var memoryInMb int
165-
var state string
166-
var spaceGUID string
167-
var appGUID string
168-
var createdAt time.Time
169-
var updatedAt time.Time
170-
171-
err := rows.Scan(&guid, &name, &command, &diskInMb, &memoryInMb, &state, &appGUID, &spaceGUID, &createdAt, &updatedAt)
184+
job, err := service.scanJob(rows)
172185
if err != nil {
173-
continue
186+
return nil, err
174187
}
175-
176-
candidate := &core.Job{
177-
GUID: guid,
178-
Name: name,
179-
Command: command,
180-
DiskInMb: diskInMb,
181-
MemoryInMb: memoryInMb,
182-
State: state,
183-
SpaceGUID: spaceGUID,
184-
AppGUID: appGUID,
185-
CreatedAt: createdAt,
186-
UpdatedAt: updatedAt,
187-
}
188-
189-
collection = append(collection, candidate)
188+
collection = append(collection, job)
190189
}
191190

192-
return collection
191+
return collection, rows.Err()
193192
}

0 commit comments

Comments
 (0)