Skip to content

Commit d48539e

Browse files
authored
Merge pull request #276 from microsoft/cgillum/fix178
Update terminate logic to handle scheduled orchestrations
2 parents 73c54d8 + 3b74528 commit d48539e

File tree

8 files changed

+167
-41
lines changed

8 files changed

+167
-41
lines changed

CHANGELOG.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
# Changelog
22

3+
## v1.5.2
4+
5+
### Updates
6+
7+
* Fix issue where scheduled orchestrations aren't immediately terminated ([#178](https://github.com/microsoft/durabletask-mssql/issues/178))
8+
39
## v1.5.1
410

511
### Updates
612

7-
* Use Singelton metrics provider instead of IScaleMonitor and ITargetScaler ([#285]https://github.com/microsoft/durabletask-mssql/pull/285)
13+
* Use Singleton metrics provider instead of IScaleMonitor and ITargetScaler ([#285](https://github.com/microsoft/durabletask-mssql/pull/285))
814
* Updated repo to use central package management
915
* Resolved multiple CVEs in dependencies
1016

1117
## v1.5.0
1218

1319
### Updates
1420

15-
* Updated Microsoft.Azure.WebJobs.Extensions.DurableTask dependency to 3.0.0 and DurableTask.Core to 3.*. ([#281]https://github.com/microsoft/durabletask-mssql/pull/281)
21+
* Updated Microsoft.Azure.WebJobs.Extensions.DurableTask dependency to 3.0.0 and DurableTask.Core to 3.*. ([#281](https://github.com/microsoft/durabletask-mssql/pull/281))
1622
* Removed `netstandard2.0` TFM from Microsoft.DurableTask.SqlServer.AzureFunctions
1723

1824
## v1.4.0

src/DurableTask.SqlServer/DTUtils.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ public static bool HasPayload(HistoryEvent e)
196196
return historyEvent.EventType switch
197197
{
198198
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
199+
EventType.SubOrchestrationInstanceCreated => ((SubOrchestrationInstanceCreatedEvent)historyEvent).Version,
199200
EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version,
200201
_ => null,
201202
};

src/DurableTask.SqlServer/Scripts/logic.sql

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -465,49 +465,74 @@ BEGIN
465465
-- *** IMPORTANT ***
466466
-- To prevent deadlocks, it is important to maintain consistent table access
467467
-- order across all stored procedures that execute within a transaction.
468-
-- Table order for this sproc: Instances --> (NewEvents --> Payloads --> NewEvents)
468+
-- Table order for this sproc: Instances --> (Payloads --> Instances --> NewEvents)
469469

470470
DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()
471471

472-
DECLARE @existingStatus varchar(30) = (
473-
SELECT TOP 1 existing.[RuntimeStatus]
474-
FROM Instances existing WITH (HOLDLOCK)
475-
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
476-
)
472+
DECLARE @existingStatus varchar(30)
473+
DECLARE @existingLockExpiration datetime2(7)
474+
475+
-- Get the status of an existing orchestration
476+
SELECT TOP 1
477+
@existingStatus = existing.[RuntimeStatus],
478+
@existingLockExpiration = existing.[LockExpiration]
479+
FROM Instances existing WITH (HOLDLOCK)
480+
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
477481

478482
IF @existingStatus IS NULL
479483
BEGIN
480484
ROLLBACK TRANSACTION;
481485
THROW 50000, 'The instance does not exist.', 1;
482486
END
483-
-- If the instance is already completed, no need to terminate it.
484-
IF @existingStatus IN ('Pending', 'Running')
487+
488+
DECLARE @now datetime2(7) = SYSUTCDATETIME()
489+
490+
IF @existingStatus IN ('Running', 'Pending')
485491
BEGIN
486-
IF NOT EXISTS (
487-
SELECT TOP (1) 1 FROM NewEvents
488-
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
489-
)
492+
-- Create a payload to store the reason, if any
493+
DECLARE @PayloadID uniqueidentifier = NULL
494+
IF @Reason IS NOT NULL
490495
BEGIN
491-
-- Payloads are stored separately from the events
492-
DECLARE @PayloadID uniqueidentifier = NULL
493-
IF @Reason IS NOT NULL
496+
-- Note that we don't use the Reason column for the Reason with terminate events
497+
SET @PayloadID = NEWID()
498+
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
499+
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
500+
END
501+
502+
-- Check the status of the orchestration to determine which termination path to take
503+
IF @existingStatus = 'Pending' AND (@existingLockExpiration IS NULL OR @existingLockExpiration <= @now)
504+
BEGIN
505+
-- The orchestration hasn't started yet - transition it directly to the Terminated state and delete
506+
-- any pending messages
507+
UPDATE Instances SET
508+
[RuntimeStatus] = 'Terminated',
509+
[LastUpdatedTime] = @now,
510+
[CompletedTime] = @now,
511+
[OutputPayloadID] = @PayloadID,
512+
[LockExpiration] = NULL -- release the lock, if any
513+
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
514+
515+
DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
516+
END
517+
ELSE
518+
BEGIN
519+
-- The orchestration has actually started running in this case
520+
IF NOT EXISTS (
521+
SELECT TOP (1) 1 FROM NewEvents
522+
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
523+
)
494524
BEGIN
495-
-- Note that we don't use the Reason column for the Reason with terminate events
496-
SET @PayloadID = NEWID()
497-
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
498-
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
525+
INSERT INTO NewEvents (
526+
[TaskHub],
527+
[InstanceID],
528+
[EventType],
529+
[PayloadID]
530+
) VALUES (
531+
@TaskHub,
532+
@InstanceID,
533+
'ExecutionTerminated',
534+
@PayloadID)
499535
END
500-
501-
INSERT INTO NewEvents (
502-
[TaskHub],
503-
[InstanceID],
504-
[EventType],
505-
[PayloadID]
506-
) VALUES (
507-
@TaskHub,
508-
@InstanceID,
509-
'ExecutionTerminated',
510-
@PayloadID)
511536
END
512537
END
513538

@@ -1444,7 +1469,7 @@ BEGIN
14441469
-- Instance IDs can be overwritten only if the orchestration is in a terminal state
14451470
IF @existingStatus NOT IN ('Failed')
14461471
BEGIN
1447-
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
1472+
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
14481473
THROW 50001, @msg, 1;
14491474
END
14501475

src/DurableTask.SqlServer/SqlOrchestrationService.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,14 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
335335
IList<TaskMessage> orchestratorMessages,
336336
IList<TaskMessage> timerMessages,
337337
TaskMessage continuedAsNewMessage,
338-
OrchestrationState orchestrationState)
338+
OrchestrationState? orchestrationState)
339339
{
340+
if (orchestrationState is null || !newRuntimeState.IsValid)
341+
{
342+
// The work item was invalid. We can't do anything with it so we ignore it.
343+
return;
344+
}
345+
340346
ExtendedOrchestrationWorkItem currentWorkItem = (ExtendedOrchestrationWorkItem)workItem;
341347

342348
this.traceHelper.CheckpointStarting(orchestrationState);

src/common.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<PropertyGroup>
1818
<MajorVersion>1</MajorVersion>
1919
<MinorVersion>5</MinorVersion>
20-
<PatchVersion>1</PatchVersion>
20+
<PatchVersion>2</PatchVersion>
2121
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
2222
<VersionSuffix></VersionSuffix>
2323
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>

test/DurableTask.SqlServer.Tests/Integration/DatabaseManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
505505
schemaName);
506506
Assert.Equal(1, currentSchemaVersion.Major);
507507
Assert.Equal(5, currentSchemaVersion.Minor);
508-
Assert.Equal(1, currentSchemaVersion.Patch);
508+
Assert.Equal(2, currentSchemaVersion.Patch);
509509
}
510510

511511
sealed class TestDatabase : IDisposable

test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,5 +863,34 @@ await Assert.ThrowsAnyAsync<OperationCanceledException>(
863863
// Now the orchestration should complete immediately
864864
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
865865
}
866+
867+
[Fact]
868+
public async Task TerminateScheduledOrchestration()
869+
{
870+
string orchestrationName = "ScheduledOrchestration";
871+
872+
// Does nothing except return the original input
873+
TestInstance<object> instance = await this.testService.RunOrchestration(
874+
input: (object)null,
875+
orchestrationName,
876+
version: null,
877+
instanceId: null,
878+
scheduledStartTime: DateTime.UtcNow.AddSeconds(30),
879+
implementation: (ctx, input) => Task.FromResult("done"));
880+
881+
// Confirm that the orchestration is pending
882+
OrchestrationState state = await instance.GetStateAsync();
883+
Assert.Equal(OrchestrationStatus.Pending, state.OrchestrationStatus);
884+
885+
// Terminate the orchestration before it starts
886+
await instance.TerminateAsync("Bye!");
887+
888+
// Confirm the orchestration was terminated
889+
await instance.WaitForCompletion(
890+
expectedStatus: OrchestrationStatus.Terminated,
891+
expectedOutput: "Bye!");
892+
893+
LogAssert.NoWarningsOrErrors(this.testService.LogProvider);
894+
}
866895
}
867896
}

test/DurableTask.SqlServer.Tests/Utils/TestService.cs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,32 @@ public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
132132
activities);
133133
}
134134

135+
public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
136+
TInput input,
137+
string orchestrationName,
138+
string version,
139+
string instanceId,
140+
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
141+
Action<OrchestrationContext, string, string> onEvent = null,
142+
params (string name, TaskActivity activity)[] activities)
143+
{
144+
return this.RunOrchestration(
145+
input,
146+
orchestrationName,
147+
version,
148+
instanceId,
149+
scheduledStartTime: null,
150+
implementation,
151+
onEvent,
152+
activities);
153+
}
154+
135155
public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
136156
TInput input,
137157
string orchestrationName,
138158
string version,
139159
string instanceId,
160+
DateTime? scheduledStartTime,
140161
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
141162
Action<OrchestrationContext, string, string> onEvent = null,
142163
params (string name, TaskActivity activity)[] activities)
@@ -147,19 +168,43 @@ public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
147168
inputGenerator: i => input,
148169
orchestrationName: orchestrationName,
149170
version: version,
171+
scheduledStartTime: scheduledStartTime,
150172
implementation,
151173
onEvent,
152174
activities);
153175

154176
return instances[0];
155177
}
156178

179+
public Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
180+
int count,
181+
Func<int, string> instanceIdGenerator,
182+
Func<int, TInput> inputGenerator,
183+
string orchestrationName,
184+
string version,
185+
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
186+
Action<OrchestrationContext, string, string> onEvent = null,
187+
params (string name, TaskActivity activity)[] activities)
188+
{
189+
return this.RunOrchestrations(
190+
count,
191+
instanceIdGenerator,
192+
inputGenerator,
193+
orchestrationName,
194+
version,
195+
scheduledStartTime: null,
196+
implementation,
197+
onEvent,
198+
activities);
199+
}
200+
157201
public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
158202
int count,
159203
Func<int, string> instanceIdGenerator,
160204
Func<int, TInput> inputGenerator,
161205
string orchestrationName,
162206
string version,
207+
DateTime? scheduledStartTime,
163208
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
164209
Action<OrchestrationContext, string, string> onEvent = null,
165210
params (string name, TaskActivity activity)[] activities)
@@ -178,11 +223,25 @@ public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput
178223
TInput input = inputGenerator != null ? inputGenerator(i) : default;
179224

180225
DateTime utcNow = DateTime.UtcNow;
181-
OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync(
182-
orchestrationName,
183-
version,
184-
instanceId,
185-
input);
226+
227+
OrchestrationInstance instance;
228+
if (scheduledStartTime.HasValue)
229+
{
230+
instance = await this.client.CreateScheduledOrchestrationInstanceAsync(
231+
orchestrationName,
232+
version,
233+
instanceId,
234+
input,
235+
startAt: scheduledStartTime.Value);
236+
}
237+
else
238+
{
239+
instance = await this.client.CreateOrchestrationInstanceAsync(
240+
orchestrationName,
241+
version,
242+
instanceId,
243+
input);
244+
}
186245

187246
return new TestInstance<TInput>(
188247
this.client,

0 commit comments

Comments
 (0)