Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

<Import Project="$(EngBuildRoot)Engineering.targets" />

<PropertyGroup>
<VSTestResultsDirectory>$(ArtifactsPath)/log/$(ArtifactsProjectName)/tests_$(ArtifactsPivots)/</VSTestResultsDirectory>
</PropertyGroup>

</Project>
2 changes: 1 addition & 1 deletion eng/ci/templates/jobs/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
displayName: Publish deps.json
path: $(Build.ArtifactStagingDirectory)
artifact: WebHost_Deps
condition: failed()
condition: and(failed(), eq(variables['System.JobAttempt'], 1)) # only publish on first attempt

steps:
- template: /eng/ci/templates/install-dotnet.yml@self
Expand Down
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
-->
- Adding a "web app" configuration profile (#11447)
- Add JitTrace Files for v4.1045
- Throw exception instead of timing out when worker channel exits before initializing gRPC (#10937)
- Adding empty remote message check in the SystemLogger (#11473)
33 changes: 26 additions & 7 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Exceptions;
using Microsoft.Azure.WebJobs.Script.Extensions;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Extensions;
Expand Down Expand Up @@ -367,19 +366,39 @@ private void DispatchMessage(InboundGrpcEvent msg)

public bool IsChannelReadyForInvocations()
{
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
return !_disposing && !_disposed
&& _state.HasFlag(
RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
}

public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
{
RegisterCallbackForNextGrpcMessage(MsgType.StartStream, _workerConfig.CountOptions.ProcessStartupTimeout, 1, SendWorkerInitRequest, HandleWorkerStartStreamError);
// note: it is important that the ^^^ StartStream is in place *before* we start process the loop, otherwise we get a race condition
RegisterCallbackForNextGrpcMessage(
MsgType.StartStream,
_workerConfig.CountOptions.ProcessStartupTimeout,
count: 1,
SendWorkerInitRequest,
HandleWorkerStartStreamError);

// note: it is important that the ^^^ StartStream is in place *before* we start process the loop,
// otherwise we get a race condition
_ = ProcessInbound();

_workerChannelLogger.LogDebug("Initiating Worker Process start up");
await _rpcWorkerProcess.StartProcessAsync();
_state = _state | RpcWorkerChannelState.Initializing;
await _workerInitTask.Task;
await _rpcWorkerProcess.StartProcessAsync(cancellationToken);
_state |= RpcWorkerChannelState.Initializing;
Task<int> exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken);
Task winner = await Task.WhenAny(_workerInitTask.Task, exited).WaitAsync(cancellationToken);
await winner;

if (winner == exited)
{
// Process exited without throwing. We need to throw to indicate process is not running.
throw new WorkerProcessExitException("Worker process exited before initializing.")
{
ExitCode = await exited,
};
}
}

public async Task<WorkerStatus> GetWorkerStatusAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Script.Workers
Expand All @@ -12,7 +13,9 @@ internal interface IWorkerProcess

Process Process { get; }

Task StartProcessAsync();
Task StartProcessAsync(CancellationToken cancellationToken = default);

Task<int> WaitForExitAsync(CancellationToken cancellationToken = default);

void WaitForProcessExitInMilliSeconds(int waitTime);
}
Expand Down
76 changes: 57 additions & 19 deletions src/WebJobs.Script.Grpc/ProcessManagement/WorkerProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Logging;
Expand All @@ -33,10 +34,11 @@ internal abstract class WorkerProcess : IWorkerProcess, IDisposable
private readonly IEnvironment _environment;
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _scriptApplicationHostOptions;

private bool _useStdErrorStreamForErrorsOnly;
private Queue<string> _processStdErrDataQueue = new Queue<string>(3);
private readonly object _syncLock = new();
private readonly bool _useStdErrorStreamForErrorsOnly;
private Queue<string> _processStdErrDataQueue = new(3);
private IHostProcessMonitor _processMonitor;
private object _syncLock = new object();
private TaskCompletionSource<int> _processExit; // used to hold custom exceptions on non-success exit.

internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry processRegistry, ILogger workerProcessLogger, IWorkerConsoleLogSource consoleLogSource, IMetricsLogger metricsLogger,
IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions, bool useStdErrStreamForErrorsOnly = false)
Expand Down Expand Up @@ -69,8 +71,9 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces

internal abstract Process CreateWorkerProcess();

public Task StartProcessAsync()
public Task StartProcessAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
using (_metricsLogger.LatencyEvent(MetricEventNames.ProcessStart))
{
Process = CreateWorkerProcess();
Expand All @@ -79,11 +82,12 @@ public Task StartProcessAsync()
AssignUserExecutePermissionsIfNotExists();
}

_processExit = new();
try
{
Process.ErrorDataReceived += (sender, e) => OnErrorDataReceived(sender, e);
Process.OutputDataReceived += (sender, e) => OnOutputDataReceived(sender, e);
Process.Exited += (sender, e) => OnProcessExited(sender, e);
Process.ErrorDataReceived += OnErrorDataReceived;
Process.OutputDataReceived += OnOutputDataReceived;
Process.Exited += OnProcessExited;
Process.EnableRaisingEvents = true;
string sanitizedArguments = Sanitizer.Sanitize(Process.StartInfo.Arguments);

Expand All @@ -103,12 +107,25 @@ public Task StartProcessAsync()
}
catch (Exception ex)
{
_processExit.TrySetException(ex);
_workerProcessLogger.LogError(ex, $"Failed to start Worker Channel. Process fileName: {Process.StartInfo.FileName}");
return Task.FromException(ex);
}
}
}

public Task<int> WaitForExitAsync(CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(Disposing, this);
if (_processExit is { } tcs)
{
// We use a TaskCompletionSource (and not Process.WaitForExitAsync) so we can propagate our custom exceptions.
return tcs.Task.WaitAsync(cancellationToken);
}

throw new InvalidOperationException("Process has not been started yet.");
}

private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)
{
if (e.Data != null)
Expand Down Expand Up @@ -159,12 +176,15 @@ private void OnProcessExited(object sender, EventArgs e)

if (Disposing)
{
// No action needed
return;
}

int exit = 0;
try
{
ThrowIfExitError();

exit = Process.ExitCode;
if (Process.ExitCode == WorkerConstants.SuccessExitCode)
{
Process.WaitForExit();
Expand All @@ -174,27 +194,45 @@ private void OnProcessExited(object sender, EventArgs e)
{
HandleWorkerProcessRestart();
}
else
{
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
var processExitEx = new WorkerProcessExitException($"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode.ToString("X")})", new Exception(sanitizedExceptionMessage));
processExitEx.ExitCode = Process.ExitCode;
processExitEx.Pid = Process.Id;
HandleWorkerProcessExitError(processExitEx);
}
}
catch (WorkerProcessExitException processExitEx)
{
_processExit.TrySetException(processExitEx);
HandleWorkerProcessExitError(processExitEx);
}
catch (Exception exc)
{
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
// ignore process is already disposed
_processExit.TrySetException(exc);
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
}
finally
{
_processExit.TrySetResult(exit);
UnregisterFromProcessMonitor();
}
}

private void ThrowIfExitError()
{
if (Process.ExitCode is WorkerConstants.SuccessExitCode or WorkerConstants.IntentionalRestartExitCode)
{
return;
}

string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
WorkerProcessExitException processExitEx = new(
$"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode:X})",
new Exception(sanitizedExceptionMessage))
{
ExitCode = Process.ExitCode,
Pid = Process.Id
};

throw processExitEx;
}

private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)
{
if (e.Data != null)
Expand Down Expand Up @@ -343,7 +381,7 @@ private void AssignUserExecutePermissionsIfNotExists()
return;
}

UnixFileInfo fileInfo = new UnixFileInfo(filePath);
UnixFileInfo fileInfo = new(filePath);
if (!fileInfo.FileAccessPermissions.HasFlag(FileAccessPermissions.UserExecute))
{
_workerProcessLogger.LogDebug("Assigning execute permissions to file: {filePath}", filePath);
Expand Down
59 changes: 35 additions & 24 deletions src/WebJobs.Script.Grpc/Rpc/RpcWorkerProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,23 @@ internal class RpcWorkerProcess : WorkerProcess
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
private readonly IEnvironment _environment;

internal RpcWorkerProcess(string runtime,
string workerId,
string rootScriptPath,
Uri serverUri,
RpcWorkerConfig rpcWorkerConfig,
IScriptEventManager eventManager,
IWorkerProcessFactory processFactory,
IProcessRegistry processRegistry,
ILogger workerProcessLogger,
IWorkerConsoleLogSource consoleLogSource,
IMetricsLogger metricsLogger,
IServiceProvider serviceProvider,
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
IEnvironment environment,
IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions,
ILoggerFactory loggerFactory)
internal RpcWorkerProcess(
string runtime,
string workerId,
string rootScriptPath,
Uri serverUri,
RpcWorkerConfig rpcWorkerConfig,
IScriptEventManager eventManager,
IWorkerProcessFactory processFactory,
IProcessRegistry processRegistry,
ILogger workerProcessLogger,
IWorkerConsoleLogSource consoleLogSource,
IMetricsLogger metricsLogger,
IServiceProvider serviceProvider,
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
IEnvironment environment,
IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions,
ILoggerFactory loggerFactory)
: base(eventManager, processRegistry, workerProcessLogger, consoleLogSource, metricsLogger, serviceProvider, loggerFactory, environment,
scriptApplicationHostOptions, rpcWorkerConfig.Description.UseStdErrorStreamForErrorsOnly)
{
Expand Down Expand Up @@ -74,23 +75,33 @@ internal override Process CreateWorkerProcess()

internal override void HandleWorkerProcessExitError(WorkerProcessExitException rpcWorkerProcessExitException)
{
ArgumentNullException.ThrowIfNull(rpcWorkerProcessExitException);
if (Disposing)
{
return;
}
if (rpcWorkerProcessExitException == null)
{
throw new ArgumentNullException(nameof(rpcWorkerProcessExitException));
}

// The subscriber of WorkerErrorEvent is expected to Dispose() the errored channel
_workerProcessLogger.LogError(rpcWorkerProcessExitException, $"Language Worker Process exited. Pid={rpcWorkerProcessExitException.Pid}.", _workerProcessArguments.ExecutablePath);
_eventManager.Publish(new WorkerErrorEvent(_runtime, _workerId, rpcWorkerProcessExitException));
_workerProcessLogger.LogError(rpcWorkerProcessExitException, $"Language Worker Process exited. Pid={rpcWorkerProcessExitException.Pid}.", _workerProcessArguments?.ExecutablePath);
PublishNoThrow(new WorkerErrorEvent(_runtime, _workerId, rpcWorkerProcessExitException));
}

internal override void HandleWorkerProcessRestart()
{
_workerProcessLogger?.LogInformation("Language Worker Process exited and needs to be restarted.");
_eventManager.Publish(new WorkerRestartEvent(_runtime, _workerId));
PublishNoThrow(new WorkerRestartEvent(_runtime, _workerId));
}

private void PublishNoThrow(RpcChannelEvent @event)
{
try
{
_eventManager.Publish(@event);
}
catch (Exception ex)
{
_workerProcessLogger.LogWarning(ex, "Failed to publish RpcChannelEvent.");
}
}
}
}
}
9 changes: 7 additions & 2 deletions src/WebJobs.Script.Grpc/WorkerFunctionMetadataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
// forceRefresh will be false when bundle is not used (dotnet and dotnet-isolated).
if (!_environment.IsPlaceholderModeEnabled() && forceRefresh && !_scriptOptions.CurrentValue.IsFileSystemReadOnly)
{
_channelManager.ShutdownChannelsAsync().GetAwaiter().GetResult();
await _channelManager.ShutdownChannelsAsync();
}

var channels = _channelManager.GetChannels(_workerRuntime);
Expand Down Expand Up @@ -107,6 +107,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
throw new InvalidOperationException($"No initialized language worker channel found for runtime: {_workerRuntime}.");
}

List<Exception> errors = null;
foreach (string workerId in channels.Keys.ToList())
{
if (channels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> languageWorkerChannelTask))
Expand All @@ -129,7 +130,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
}

_functions = functions.ToImmutableArray();
_logger.FunctionsReturnedByProvider(_functions.IsDefault ? 0 : _functions.Count(), _metadataProviderName);
_logger.FunctionsReturnedByProvider(_functions.Length, _metadataProviderName);

// Validate if the app has functions in legacy format and add in logs to inform about the mixed app
_ = Task.Delay(TimeSpan.FromMinutes(1)).ContinueWith(t => ValidateFunctionAppFormat(_scriptOptions.CurrentValue.ScriptPath, _logger, _environment));
Expand All @@ -140,9 +141,13 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
{
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
await _channelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
errors ??= [];
errors.Add(ex);
}
}
}

ExceptionExtensions.ThrowIfErrorsPresent(errors, "Errors getting function metadata from workers.");
}

return new FunctionMetadataResult(useDefaultMetadataIndexing: false, _functions);
Expand Down
Loading
Loading