Skip to content

Commit 9ecd5fe

Browse files
Merge pull request #392 from depot/fsnotify-fallback
Add polling replacement for fsnotify
2 parents 20b529f + 9f3251d commit 9ecd5fe

File tree

3 files changed

+206
-11
lines changed

3 files changed

+206
-11
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ require (
2626
github.com/docker/go-connections v0.4.0
2727
github.com/docker/go-units v0.5.0
2828
github.com/erikgeiser/promptkit v0.9.0
29-
github.com/fsnotify/fsnotify v1.6.0
3029
github.com/getsentry/sentry-go v0.13.0
3130
github.com/gogo/protobuf v1.3.2
3231
github.com/hashicorp/go-cty-funcs v0.0.0-20250210171435-dda779884a9f
@@ -101,6 +100,7 @@ require (
101100
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
102101
github.com/fatih/color v1.13.0 // indirect
103102
github.com/felixge/httpsnoop v1.0.4 // indirect
103+
github.com/fsnotify/fsnotify v1.6.0 // indirect
104104
github.com/fvbommel/sortorder v1.0.1 // indirect
105105
github.com/go-logr/logr v1.3.0 // indirect
106106
github.com/go-logr/stdr v1.2.2 // indirect

pkg/cmd/claude/claude.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import (
1717
"connectrpc.com/connect"
1818
"github.com/depot/cli/pkg/api"
1919
"github.com/depot/cli/pkg/config"
20+
"github.com/depot/cli/pkg/fswatch"
2021
"github.com/depot/cli/pkg/helpers"
2122
agentv1 "github.com/depot/cli/pkg/proto/depot/agent/v1"
2223
"github.com/depot/cli/pkg/proto/depot/agent/v1/agentv1connect"
23-
"github.com/fsnotify/fsnotify"
2424
"github.com/spf13/cobra"
2525
)
2626

@@ -429,15 +429,10 @@ func continuouslySaveSessionFile(ctx context.Context, projectDir string, client
429429
return fmt.Errorf("failed to create project directory: %w", err)
430430
}
431431

432-
watcher, err := fsnotify.NewWatcher()
432+
events, errors, err := fswatch.WatchContext(ctx, projectDir, 5*time.Second)
433433
if err != nil {
434434
return fmt.Errorf("failed to create file watcher: %w", err)
435435
}
436-
defer watcher.Close()
437-
438-
if err := watcher.Add(projectDir); err != nil {
439-
return fmt.Errorf("failed to watch directory: %w", err)
440-
}
441436

442437
var sessionFilePath string
443438
var claudeSessionID string
@@ -447,12 +442,12 @@ func continuouslySaveSessionFile(ctx context.Context, projectDir string, client
447442
case <-ctx.Done():
448443
return nil
449444

450-
case event, ok := <-watcher.Events:
445+
case event, ok := <-events:
451446
if !ok {
452447
return nil
453448
}
454449

455-
if event.Op&(fsnotify.Write|fsnotify.Create) == 0 {
450+
if event.Op != fswatch.Create && event.Op != fswatch.Write {
456451
continue
457452
}
458453

@@ -478,7 +473,7 @@ func continuouslySaveSessionFile(ctx context.Context, projectDir string, client
478473
// if the continuous save fails, it doesn't matter much. this is really only for the live view of the conversation
479474
_ = saveSession(ctx, client, token, sessionID, sessionFilePath, 3, 2*time.Second, orgID)
480475

481-
case err, ok := <-watcher.Errors:
476+
case err, ok := <-errors:
482477
if !ok {
483478
return nil
484479
}

pkg/fswatch/fswatch.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package fswatch
2+
3+
import (
4+
"context"
5+
"io/fs"
6+
"os"
7+
"path/filepath"
8+
"sort"
9+
"sync"
10+
"time"
11+
)
12+
13+
// Event represents a file system event
14+
type Event struct {
15+
Name string // file path
16+
Op Op // operation that triggered the event
17+
Time time.Time // when the event occurred
18+
}
19+
20+
// Op describes file system operations
21+
type Op uint32
22+
23+
const (
24+
Create Op = 1 << iota
25+
Write
26+
Remove
27+
Rename
28+
Chmod
29+
)
30+
31+
// Watcher watches a directory for file changes using both fsnotify and polling fallback
32+
type Watcher struct {
33+
events chan Event
34+
errors chan error
35+
done chan struct{}
36+
pollInterval time.Duration
37+
fileSystem fs.FS
38+
watchDir string
39+
lastScan map[string]time.Time // file path -> last modified time
40+
wg sync.WaitGroup
41+
}
42+
43+
// New creates a new hybrid file watcher
44+
func New(pollInterval time.Duration, dir string) (*Watcher, error) {
45+
return &Watcher{
46+
events: make(chan Event, 100),
47+
errors: make(chan error, 10),
48+
done: make(chan struct{}),
49+
pollInterval: pollInterval,
50+
fileSystem: os.DirFS(dir),
51+
watchDir: dir,
52+
lastScan: make(map[string]time.Time),
53+
}, nil
54+
}
55+
56+
// Watch starts watching the directory for file changes.
57+
func (w *Watcher) Watch() error {
58+
w.wg.Add(1)
59+
go func() {
60+
defer w.wg.Done()
61+
w.runPolling()
62+
}()
63+
64+
return nil
65+
}
66+
67+
func (w *Watcher) Events() <-chan Event { return w.events }
68+
func (w *Watcher) Errors() <-chan error { return w.errors }
69+
70+
// Close stops the watcher and cleans up resources.
71+
func (w *Watcher) Close() error {
72+
close(w.done)
73+
w.wg.Wait()
74+
close(w.events)
75+
return nil
76+
}
77+
78+
// runPolling periodically scans the directory for changes
79+
func (w *Watcher) runPolling() {
80+
ticker := time.NewTicker(w.pollInterval)
81+
defer ticker.Stop()
82+
83+
// Initial scan
84+
w.scanDirectory(true)
85+
86+
for {
87+
select {
88+
case <-w.done:
89+
return
90+
case <-ticker.C:
91+
w.scanDirectory(false)
92+
}
93+
}
94+
}
95+
96+
// scanDirectory scans the watch directory and detects changes
97+
func (w *Watcher) scanDirectory(initial bool) {
98+
entries, err := fs.ReadDir(w.fileSystem, ".")
99+
if err != nil {
100+
w.errors <- err
101+
return
102+
}
103+
104+
currentFiles := make(map[string]time.Time)
105+
var events []Event
106+
107+
// Check all current files
108+
for _, entry := range entries {
109+
if entry.IsDir() {
110+
continue
111+
}
112+
113+
filePath := filepath.Join(w.watchDir, entry.Name())
114+
info, err := entry.Info()
115+
if err != nil {
116+
continue
117+
}
118+
119+
modTime := info.ModTime()
120+
currentFiles[filePath] = modTime
121+
122+
if initial {
123+
w.lastScan[filePath] = modTime
124+
continue
125+
}
126+
127+
if lastModTime, exists := w.lastScan[filePath]; exists {
128+
// File existed before, check if modified
129+
if modTime.After(lastModTime) {
130+
events = append(events, Event{
131+
Name: filePath,
132+
Op: Write,
133+
Time: modTime,
134+
})
135+
}
136+
} else {
137+
// New file
138+
events = append(events, Event{
139+
Name: filePath,
140+
Op: Create,
141+
Time: modTime,
142+
})
143+
}
144+
}
145+
146+
// Check for removed files
147+
for filePath := range w.lastScan {
148+
if _, exists := currentFiles[filePath]; !exists {
149+
events = append(events, Event{
150+
Name: filePath,
151+
Op: Remove,
152+
Time: time.Now(),
153+
})
154+
}
155+
}
156+
157+
// Update last scan state
158+
w.lastScan = currentFiles
159+
160+
// Sort events by timestamp and send them
161+
sort.Slice(events, func(i, j int) bool {
162+
return events[i].Time.Before(events[j].Time)
163+
})
164+
165+
for _, event := range events {
166+
select {
167+
case w.events <- event:
168+
case <-w.done:
169+
return
170+
}
171+
}
172+
}
173+
174+
// WatchContext watches a directory with context cancellation
175+
func WatchContext(ctx context.Context, dir string, pollInterval time.Duration) (<-chan Event, <-chan error, error) {
176+
watcher, err := New(pollInterval, dir)
177+
if err != nil {
178+
return nil, nil, err
179+
}
180+
return watchContext(ctx, watcher)
181+
}
182+
183+
// watchContext is split from WatchContext for testing watchers.
184+
func watchContext(ctx context.Context, watcher *Watcher) (<-chan Event, <-chan error, error) {
185+
if err := watcher.Watch(); err != nil {
186+
watcher.Close()
187+
return nil, nil, err
188+
}
189+
190+
// Close watcher when context is done
191+
go func() {
192+
select {
193+
case <-ctx.Done():
194+
case <-watcher.done:
195+
}
196+
watcher.Close()
197+
}()
198+
199+
return watcher.Events(), watcher.Errors(), nil
200+
}

0 commit comments

Comments
 (0)