diff --git a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart index a54f2a156..43b97c381 100644 --- a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart +++ b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart @@ -15,7 +15,7 @@ extension DirectoryRobustRecursiveListing on Directory { /// These can arise from concurrent file-system modification. /// /// See [listRecursively] for how symlinks are handled. - Stream listRecursivelyIgnoringErrors() { + Stream listRecursivelyIgnoringErrors() { return listRecursively() .ignoring() .ignoring(); @@ -33,14 +33,22 @@ extension DirectoryRobustRecursiveListing on Directory { /// symlink-resolved paths. /// /// Skipped links to directories are not mentioned in the directory listing. - Stream listRecursively() => + Stream listRecursively() => _DirectoryTraversal(this).listRecursively(); } +class DirectoryList { + final Directory directory; + final Set files = {}; + final Set directories = {}; + final Set ignores = {}; + DirectoryList(this.directory); +} + /// A recursive directory listing algorithm that follows symlinks carefully. class _DirectoryTraversal { final Directory root; - final StreamController _result = StreamController(); + final StreamController _result = StreamController(); /// The directories currently being traversed. /// @@ -49,7 +57,7 @@ class _DirectoryTraversal { _DirectoryTraversal(this.root); - Stream listRecursively() { + Stream listRecursively() { unawaited(_listAndRecurse()); return _result.stream; } @@ -72,10 +80,12 @@ class _DirectoryTraversal { /// A subdirectory is only listed if its canonical path is not already in /// [_traversing]. Future _listAndRecurseOrThrow(_ResolvedDirectory directory) async { - final subdirectories = <_ResolvedDirectory>[]; + await Future.delayed(Duration.zero); - await for (var entity - in directory.directory.list(recursive: false, followLinks: false)) { + final subdirectories = <_ResolvedDirectory>[]; + final directoryList = DirectoryList(directory.directory); + for (var entity + in directory.directory.listSync(recursive: false, followLinks: false)) { // Handle links. if (entity is Link) { // Look up their target and target type. @@ -83,15 +93,20 @@ class _DirectoryTraversal { final targetType = FileSystemEntity.typeSync(target); if (targetType == FileSystemEntityType.directory) { + // Skip if they currently being traversed. + if (_traversing.contains(target)) continue; + // Add links to directories with their target to [subdirectories]. - subdirectories.add(_ResolvedDirectory( - directory: Directory(entity.path), canonicalPath: target)); + final resolvedDirectory = _ResolvedDirectory( + directory: Directory(entity.path), canonicalPath: target); + subdirectories.add(resolvedDirectory); + directoryList.directories.add(entity.path); } else if (targetType == FileSystemEntityType.file) { // Output files. - _result.add(File(entity.path)); + directoryList.files.add(entity.path); } else { // Anything else. Broken links get output with type `Link`. - _result.add(entity); + directoryList.files.add(entity.path); } continue; } @@ -107,22 +122,26 @@ class _DirectoryTraversal { final resolvedDirectory = directory.isCanonical ? entity.path : p.join(directory.canonicalPath, p.basename(entity.path)); + + // Skip if currently being traversed. + if (_traversing.contains(resolvedDirectory)) continue; + subdirectories.add(_ResolvedDirectory( directory: entity, canonicalPath: resolvedDirectory)); + directoryList.directories.add(entity.path); continue; } // Files and anything else. - _result.add(entity); + directoryList.files.add(entity.path); } + _result.add(directoryList); // Recurse into subdirectories that are not already being traversed. for (final directory in subdirectories) { - if (_traversing.add(directory.canonicalPath)) { - _result.add(directory.directory); - await _listAndRecurseOrThrow(directory); - _traversing.remove(directory.canonicalPath); - } + _traversing.add(directory.canonicalPath); + await _listAndRecurseOrThrow(directory); + _traversing.remove(directory.canonicalPath); } } } diff --git a/pkgs/watcher/lib/src/directory_watcher/linux.dart b/pkgs/watcher/lib/src/directory_watcher/linux.dart index 70d0cfecc..c1c59f6f8 100644 --- a/pkgs/watcher/lib/src/directory_watcher/linux.dart +++ b/pkgs/watcher/lib/src/directory_watcher/linux.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:io'; import 'package:async/async.dart'; +import 'package:path/path.dart' as p; import '../directory_watcher.dart'; import '../event.dart'; @@ -66,6 +67,8 @@ class _LinuxDirectoryWatcher /// parent directory. final PathSet _directoriesWatched; + final Set<_InterruptableDirectoryListing> _listings = Set.identity(); + /// A set of all subscriptions that this watcher subscribes to. /// /// These are gathered together so that they may all be canceled when the @@ -75,7 +78,7 @@ class _LinuxDirectoryWatcher _LinuxDirectoryWatcher(String path) : _files = PathSet(path), _directoriesWatched = PathSet(path) { - _nativeEvents.add(_watch(path) + _nativeEvents.add(_watch(path, watchUntilCancelled: false) .events .transform(StreamTransformer.fromHandlers(handleDone: (sink) { // Handle the done event here rather than in the call to [_listen] because @@ -97,17 +100,22 @@ class _LinuxDirectoryWatcher _eventsController.addError(error, stackTrace); }); + final listing = _InterruptableDirectoryListing( + Directory(path).listRecursivelyIgnoringErrors()); + _listings.add(listing); _listen( - Directory(path).listRecursivelyIgnoringErrors(), - (FileSystemEntity entity) { - if (entity is Directory) { - _watchSubdir(entity.path); - } else { - _files.add(entity.path); + listing.stream, + (DirectoryList directoryList) { + for (final directory in directoryList.directories) { + _watchSubdir(directory); + } + for (final file in directoryList.files) { + _files.add(file); } }, onError: _emitError, onDone: () { + _listings.remove(listing); if (!isReady) { _readyCompleter.complete(); } @@ -130,21 +138,9 @@ class _LinuxDirectoryWatcher /// Watch a subdirectory of [directory] for changes. void _watchSubdir(String path) { - // TODO(nweiz): Right now it's possible for the watcher to emit an event for - // a file before the directory list is complete. This could lead to the user - // seeing a MODIFY or REMOVE event for a file before they see an ADD event, - // which is bad. We should handle that. - // - // One possibility is to provide a general means (e.g. - // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit - // events for all the files that already exist. This would be useful for - // top-level clients such as barback as well, and could be implemented with - // a wrapper similar to how listening/canceling works now. - - // Directory might no longer exist at the point where we try to - // start the watcher. Simply ignore this error and let the stream - // close. - var stream = _watch(path).events.ignoring(); + var stream = _watch(path, watchUntilCancelled: true) + .events + .ignoring(); _nativeEvents.add(stream); } @@ -161,7 +157,7 @@ class _LinuxDirectoryWatcher // end of the batch. Catch these cases in order to do a check on the actual // filesystem state. var deletes = {}; - var creates = {}; + var creates = PathSet(path); for (var event in batch) { // If the watched directory is deleted or moved, we'll get a deletion @@ -208,7 +204,7 @@ class _LinuxDirectoryWatcher case EventType.modifyDirectory: files.remove(event.path); - dirs.add(event.path); + if (dirs.add(event.path)) creates.add(event.path); case EventType.createFile: creates.add(event.path); @@ -216,25 +212,35 @@ class _LinuxDirectoryWatcher dirs.remove(event.path); case EventType.modifyFile: - files.add(event.path); + if (files.add(event.path)) creates.add(event.path); dirs.remove(event.path); } } // Check paths that might have been affected by out-of-order events, set // the correct state in [files] and [dirs]. - for (final path in deletes.intersection(creates)) { - final type = FileSystemEntity.typeSync(path, followLinks: false); - if (type == FileSystemEntityType.file || - type == FileSystemEntityType.link) { - files.add(path); - dirs.remove(path); - } else if (type == FileSystemEntityType.directory) { - dirs.add(path); - files.remove(path); - } else { - files.remove(path); - dirs.remove(path); + // + // If a delete is a directory, it makes all the creates in the directory + // ambiguous. `creates` is a `PathSet` so it `remove` matches files under + // removed directories. + for (final delete in deletes) { + for (final path in creates.remove(delete)) { + logForTesting?.call('ambiguous, recheck $path'); + final type = FileSystemEntity.typeSync(path, followLinks: false); + if (type == FileSystemEntityType.file || + type == FileSystemEntityType.link) { + logForTesting?.call('ambiguous, recheck $path: file'); + files.add(path); + dirs.remove(path); + } else if (type == FileSystemEntityType.directory) { + logForTesting?.call('ambiguous, recheck $path: directory'); + dirs.add(path); + files.remove(path); + } else { + logForTesting?.call('ambiguous, recheck $path: missing'); + files.remove(path); + dirs.remove(path); + } } } @@ -252,8 +258,16 @@ class _LinuxDirectoryWatcher // Unless [path] was a file and still is, emit REMOVE events for it or its // contents, if (files.contains(path) && _files.contains(path)) continue; - for (var file in _files.remove(path)) { - _emitEvent(ChangeType.REMOVE, file); + + final filesToRemove = _files.remove(path); + if (filesToRemove.isEmpty) { + for (final listing in _listings) { + listing.ignore(path); + } + } else { + for (var file in filesToRemove) { + _emitEvent(ChangeType.REMOVE, file); + } } } @@ -262,31 +276,58 @@ class _LinuxDirectoryWatcher _emitEvent(ChangeType.MODIFY, file); } else { _emitEvent(ChangeType.ADD, file); + for (final listing in _listings) { + listing.ignore(file); + } _files.add(file); } } for (var dir in dirs) { _watchSubdir(dir); + } + for (var dir in dirs) { _addSubdir(dir); } } /// Emits [ChangeType.ADD] events for the recursive contents of [path]. void _addSubdir(String path) { - _listen(Directory(path).listRecursivelyIgnoringErrors(), - (FileSystemEntity entity) { - if (entity is Directory) { - _watchSubdir(entity.path); - } else { + logForTesting?.call('_addSubdir,$path'); + final listing = _InterruptableDirectoryListing( + Directory(path).listRecursivelyIgnoringErrors()); + _listings.add(listing); + _listen(listing.stream, (DirectoryList directoryList) { + for (final directory in directoryList.directories) { + _watchSubdir(directory); + } + for (final file in directoryList.files) { // Only emit ADD if it hasn't already been emitted due to the file being // modified or added after the directory was added. - if (!_files.contains(entity.path)) { - logForTesting?.call('_addSubdir,$path,$entity'); - _files.add(entity.path); - _emitEvent(ChangeType.ADD, entity.path); + if (!_files.contains(file)) { + if (!directoryList.ignores.contains(file)) { + logForTesting?.call('_addSubdir,$path,$file'); + _files.add(file); + _emitEvent(ChangeType.ADD, file); + } + } + } + + // DO NOT SUBMIT make this fast + for (final file in _files.paths) { + if (p.dirname(file) == directoryList.directory.path) { + if (!directoryList.files.contains(file)) { + if (!directoryList.ignores.contains(file)) { + logForTesting?.call( + '_addSubdir,remove,$path,$file,${directoryList.files}}'); + _emitEvent(ChangeType.REMOVE, file); + _files.remove(file); + } + } } } + }, onDone: () { + _listings.remove(listing); }, onError: (Object error, StackTrace stackTrace) { // Ignore an exception caused by the dir not existing. It's fine if it // was added and then quickly removed. @@ -347,11 +388,16 @@ class _LinuxDirectoryWatcher /// Watches [path]. /// /// See [_Watch] class comment. - _Watch _watch(String path) { + _Watch _watch(String path, {required bool watchUntilCancelled}) { logForTesting?.call('_Watch._watch,$path'); - _watches[path]?.cancel(); - final result = _Watch(path, _cancelWatchesUnderPath); + // There can be an existing watch due to race between directory list and + // event. Add the replacement watch before closing the old one, so the + // underlying VM watch will be reused if it's actually the same directory. + final previousWatch = _watches[path]; + final result = _Watch(path, _cancelWatchesUnderPath, + watchUntilCancelled: watchUntilCancelled); + if (previousWatch != null) previousWatch.cancel(); _watches[path] = result; // If [path] is the root watch directory do nothing, that's handled when the @@ -376,6 +422,26 @@ class _LinuxDirectoryWatcher } } +class _InterruptableDirectoryListing { + late final Stream stream; + + final Set ignores = {}; + + _InterruptableDirectoryListing(Stream stream) { + this.stream = stream + .transform(StreamTransformer.fromHandlers(handleData: _handleData)); + } + + void _handleData(DirectoryList directoryList, EventSink sink) { + directoryList.ignores.addAll(ignores); + sink.add(directoryList); + } + + void ignore(String path) { + ignores.add(path); + } +} + /// Watches [path]. /// /// Workaround for issue with watches on Linux following renames @@ -392,10 +458,18 @@ class _Watch { final void Function(String) _cancelWatchesUnderPath; final StreamController _controller = StreamController(); - late final StreamSubscription _subscription; + late StreamSubscription _subscription; Stream get events => _controller.stream; + final bool _watchUntilCancelled; + bool closing = false; + + _Watch(this.path, this._cancelWatchesUnderPath, + {required bool watchUntilCancelled}) + : _watchUntilCancelled = watchUntilCancelled { + _startListening(); + } - _Watch(this.path, this._cancelWatchesUnderPath) { + void _startListening() { _subscription = _listen(path, _controller); } @@ -404,6 +478,11 @@ class _Watch { return Directory(path).watch().listen( (event) { logForTesting?.call('_Watch._listen,$path,$event'); + + if (event.path != path && !event.path.startsWith(path)) { + event = event.fixDirectory(path); + } + if (event is FileSystemDeleteEvent || (event.isDirectory && event is FileSystemMoveEvent)) { _cancelWatchesUnderPath(event.path); @@ -411,8 +490,24 @@ class _Watch { controller.add(event); }, - onError: controller.addError, - onDone: controller.close, + onError: (Object e, StackTrace s) { + logForTesting?.call('_Watch._listen,error,$path'); + + controller.addError(e, s); + closing = true; + }, + onDone: () { + logForTesting?.call('_Watch._listen,close,$path'); + // TODO(davidmorgan): link to SDK issue. + if (_watchUntilCancelled) { + _subscription.cancel(); + if (!closing) { + _startListening(); + } + } else { + controller.close(); + } + }, ); } @@ -421,3 +516,25 @@ class _Watch { _subscription.cancel(); } } + +extension _FileSystemEventExtensions on FileSystemEvent { + FileSystemEvent fixDirectory(String directory) { + final basename = p.basename(path); + final newPath = p.join(directory, basename); + + switch (type) { + case FileSystemEvent.create: + return FileSystemCreateEvent(newPath, isDirectory); + case FileSystemEvent.modify: + return FileSystemModifyEvent(newPath, isDirectory, + (this as FileSystemModifyEvent).contentChanged); + case FileSystemEvent.delete: + return FileSystemDeleteEvent(newPath, isDirectory); + case FileSystemEvent.move: + return FileSystemMoveEvent( + newPath, isDirectory, (this as FileSystemMoveEvent).destination); + default: + throw StateError('Unexpected type $type'); + } + } +} diff --git a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart index 458ae3303..4bb239361 100644 --- a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart +++ b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart @@ -69,11 +69,11 @@ class _MacOSDirectoryWatcher /// The subscription to the [Directory.list] call for the initial listing of /// the directory to determine its initial state. - StreamSubscription? _initialListSubscription; + StreamSubscription? _initialListSubscription; /// The subscriptions to [Directory.list] calls for listing the contents of a /// subdirectory that was moved into the watched directory. - final _listSubscriptions = >{}; + final _listSubscriptions = >{}; /// The timer for tracking how long we wait for an initial batch of bogus /// events (see issue 14373). @@ -152,12 +152,13 @@ class _MacOSDirectoryWatcher if (_files.containsDir(path)) continue; var stream = Directory(path).listRecursivelyIgnoringErrors(); - var subscription = stream.listen((entity) { - if (entity is Directory) return; - if (_files.contains(entity.path)) return; + var subscription = stream.listen((directoryList) { + for (final file in directoryList.files) { + if (_files.contains(file)) continue; - _emitEvent(ChangeType.ADD, entity.path); - _files.add(entity.path); + _emitEvent(ChangeType.ADD, file); + _files.add(file); + } }, cancelOnError: true); subscription.onDone(() { _listSubscriptions.remove(subscription); @@ -337,8 +338,10 @@ class _MacOSDirectoryWatcher _files.clear(); var completer = Completer(); var stream = Directory(path).listRecursivelyIgnoringErrors(); - _initialListSubscription = stream.listen((entity) { - if (entity is! Directory) _files.add(entity.path); + _initialListSubscription = stream.listen((directoryList) { + for (final file in directoryList.files) { + _files.add(file); + } }, onError: _emitError, onDone: completer.complete, cancelOnError: true); return completer.future; } diff --git a/pkgs/watcher/lib/src/directory_watcher/polling.dart b/pkgs/watcher/lib/src/directory_watcher/polling.dart index a8a4d090b..e1ccd78e9 100644 --- a/pkgs/watcher/lib/src/directory_watcher/polling.dart +++ b/pkgs/watcher/lib/src/directory_watcher/polling.dart @@ -62,7 +62,7 @@ class _PollingDirectoryWatcher /// The subscription used while [directory] is being listed. /// /// Will be `null` if a list is not currently happening. - StreamSubscription? _listSubscription; + StreamSubscription? _listSubscription; /// The queue of files waiting to be processed to see if they have been /// modified. @@ -115,11 +115,12 @@ class _PollingDirectoryWatcher } var stream = Directory(path).listRecursivelyIgnoringErrors(); - _listSubscription = stream.listen((entity) { + _listSubscription = stream.listen((directoryList) { assert(!_events.isClosed); - if (entity is! File) return; - _filesToProcess.add(entity.path); + for (final file in directoryList.files) { + _filesToProcess.add(file); + } }, onError: (Object error, StackTrace stackTrace) { // Guarantee that ready always completes. if (!isReady) { diff --git a/pkgs/watcher/lib/src/directory_watcher/windows.dart b/pkgs/watcher/lib/src/directory_watcher/windows.dart index 27ab8bacb..ce25ce82b 100644 --- a/pkgs/watcher/lib/src/directory_watcher/windows.dart +++ b/pkgs/watcher/lib/src/directory_watcher/windows.dart @@ -88,12 +88,12 @@ class WindowsManuallyClosedDirectoryWatcher /// The subscription to the [Directory.list] call for the initial listing of /// the directory to determine its initial state. - StreamSubscription? _initialListSubscription; + StreamSubscription? _initialListSubscription; /// The subscriptions to the [Directory.list] calls for listing the contents /// of subdirectories that were moved into the watched directory. - final Set> _listSubscriptions = - HashSet>(); + final Set> _listSubscriptions = + HashSet>(); WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) { // Before we're ready to emit events, wait for [_listDir] to complete. @@ -230,12 +230,13 @@ class WindowsManuallyClosedDirectoryWatcher case EventType.createDirectory: final stream = Directory(path).listRecursivelyIgnoringErrors(); - final subscription = stream.listen((entity) { - if (entity is Directory) return; - if (_files.contains(entity.path)) return; + final subscription = stream.listen((directoryList) { + for (final file in directoryList.files) { + if (_files.contains(file)) return; - _emitEvent(ChangeType.ADD, entity.path); - _files.add(entity.path); + _emitEvent(ChangeType.ADD, file); + _files.add(file); + } }, cancelOnError: true); subscription.onDone(() { _listSubscriptions.remove(subscription); @@ -376,8 +377,10 @@ class WindowsManuallyClosedDirectoryWatcher _files.clear(); var completer = Completer(); var stream = Directory(path).listRecursivelyIgnoringErrors(); - void handleEntity(FileSystemEntity entity) { - if (entity is! Directory) _files.add(entity.path); + void handleEntity(DirectoryList directoryList) { + for (final file in directoryList.files) { + _files.add(file); + } } _initialListSubscription = stream.listen( diff --git a/pkgs/watcher/test/directory_watcher/directory_list_test.dart b/pkgs/watcher/test/directory_watcher/directory_list_test.dart index faaf5d824..1e4a6b1a5 100644 --- a/pkgs/watcher/test/directory_watcher/directory_list_test.dart +++ b/pkgs/watcher/test/directory_watcher/directory_list_test.dart @@ -216,19 +216,16 @@ Future> list(String directory) async { return path.substring(directory.length + 1).replaceAll('\\', '/'); } - final fileSystemEntities = - await Directory(directory).listRecursively().toList(); + final directoryLists = await Directory(directory).listRecursively().toList(); final result = []; - for (final entity in fileSystemEntities) { - final path = normalizePath(entity.path); - if (entity is File) { + for (final directoryList in directoryLists) { + for (final file in directoryList.files) { + final path = normalizePath(file); result.add('f:$path'); - } else if (entity is Directory) { + } + for (final directory in directoryList.directories) { + final path = normalizePath(directory); result.add('d:$path'); - } else if (entity is Link) { - result.add('l:$path'); - } else { - fail('Unexpected entity type: $entity'); } } diff --git a/pkgs/watcher/test/directory_watcher/end_to_end_tests.dart b/pkgs/watcher/test/directory_watcher/end_to_end_tests.dart index 59af861a6..a6773b3a4 100644 --- a/pkgs/watcher/test/directory_watcher/end_to_end_tests.dart +++ b/pkgs/watcher/test/directory_watcher/end_to_end_tests.dart @@ -87,16 +87,25 @@ Future _runTest({ // Fail the test if still not consistent. if (!succeeded) { - if (endlessMode) print(''); + // Sort the log entries by timestamp. + final displayedLog = (log..sort()).map((m) => '$m\n').join(''); + + if (endlessMode) { + print(''); + } else { + printOnFailure(''' +=== +$displayedLog +=== +'''); + } + client.verify(printOnFailure: printOnFailure); // Write the file operations before the failure to a log, fail the test. final logTemp = Directory.systemTemp.createTempSync(); final logPath = p.join(logTemp.path, 'log.txt'); - // Sort the log entries by timestamp. - log.sort(); - - File(logPath).writeAsStringSync(log.map((m) => '$m\n').join('')); + File(logPath).writeAsStringSync(displayedLog); fail(''' Failed on run $i, seed $runSeed. Run in a loop with that seed using: