diff --git a/Akka.Streams.Kafka.sln b/Akka.Streams.Kafka.sln
index 369abdcd..36a51e6d 100644
--- a/Akka.Streams.Kafka.sln
+++ b/Akka.Streams.Kafka.sln
@@ -28,6 +28,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventHub.Consumer", "exampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Benchmark", "src\Akka.Streams.Kafka.Benchmark\Akka.Streams.Kafka.Benchmark.csproj", "{1BF52F4E-93FA-40C4-8985-C21D779C7997}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Testkit", "src\Akka.Streams.Kafka.Testkit\Akka.Streams.Kafka.Testkit.csproj", "{49F2E094-BB98-41D8-9B3D-CF6557DCC420}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -62,6 +64,10 @@ Global
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.Build.0 = Release|Any CPU
+ {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj b/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj
new file mode 100644
index 00000000..27279a4f
--- /dev/null
+++ b/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj
@@ -0,0 +1,25 @@
+
+
+
+
+ netstandard2.0
+ 8.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs b/src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs
similarity index 96%
rename from src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs
rename to src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs
index ba31bdb7..9abfb916 100644
--- a/src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs
+++ b/src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs
@@ -3,7 +3,7 @@
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Stages.Consumers;
-namespace Akka.Streams.Kafka.Tests.TestKit.Internal
+namespace Akka.Streams.Kafka.Testkit
{
public static class ConsumerResultFactory
{
diff --git a/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs b/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs
new file mode 100644
index 00000000..ce9d61d4
--- /dev/null
+++ b/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs
@@ -0,0 +1,55 @@
+using System.Threading.Tasks;
+using Akka.Streams.Dsl;
+using Akka.Streams.Kafka.Helpers;
+
+namespace Akka.Streams.Kafka.Testkit.Dsl
+{
+ public static class ConsumerControlFactory
+ {
+ public static Source AttachControl(Source source)
+ => source.ViaMaterialized(ControlFlow(), Keep.Right);
+
+ public static Flow ControlFlow()
+ => Flow.Create()
+ .ViaMaterialized(KillSwitches.Single(), Keep.Right)
+ .MapMaterializedValue(Control);
+
+ public static IControl Control(IKillSwitch killSwitch)
+ => new FakeControl(killSwitch);
+
+ public class FakeControl : IControl
+ {
+ private readonly IKillSwitch _killSwitch;
+ private readonly TaskCompletionSource _shutdownPromise;
+
+ public FakeControl(IKillSwitch killSwitch)
+ {
+ _killSwitch = killSwitch;
+ _shutdownPromise = new TaskCompletionSource();
+ }
+
+ public Task Stop()
+ {
+ _killSwitch.Shutdown();
+ _shutdownPromise.SetResult(Done.Instance);
+ return _shutdownPromise.Task;
+ }
+
+ public Task Shutdown()
+ {
+ _killSwitch.Shutdown();
+ _shutdownPromise.SetResult(Done.Instance);
+ return _shutdownPromise.Task;
+ }
+
+ public Task IsShutdown => _shutdownPromise.Task;
+
+ public Task DrainAndShutdown(Task streamCompletion)
+ {
+ _killSwitch.Shutdown();
+ _shutdownPromise.SetResult(Done.Instance);
+ return Task.FromResult(default(TResult));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs b/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs
new file mode 100644
index 00000000..e5ebdafc
--- /dev/null
+++ b/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs
@@ -0,0 +1,177 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.Immutable;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Actor.Setup;
+using Akka.Streams.Dsl;
+using Akka.Streams.Kafka.Dsl;
+using Akka.Streams.Kafka.Helpers;
+using Akka.Streams.Kafka.Messages;
+using Akka.Streams.Kafka.Settings;
+using Akka.Streams.Kafka.Testkit.Internal;
+using Akka.Streams.TestKit;
+using Akka.Util;
+using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+using Xunit;
+using Xunit.Abstractions;
+using Config = Akka.Configuration.Config;
+
+namespace Akka.Streams.Kafka.Testkit.Dsl
+{
+ public abstract class KafkaSpec : KafkaTestKit, IAsyncLifetime
+ {
+ protected KafkaSpec(string config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
+ {
+ }
+
+ protected KafkaSpec(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
+ {
+ }
+
+ protected KafkaSpec(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output)
+ {
+ }
+
+ protected IProducer TestProducer { get; private set; }
+
+
+ public virtual Task InitializeAsync()
+ {
+ TestProducer = ProducerDefaults().CreateKafkaProducer();
+ SetUpAdminClient();
+ return Task.CompletedTask;
+ }
+
+ public virtual Task DisposeAsync()
+ {
+ TestProducer?.Dispose();
+ CleanUpAdminClient();
+ Shutdown();
+ return Task.CompletedTask;
+ }
+
+ protected void Sleep(TimeSpan time, string msg)
+ {
+ Log.Debug($"Sleeping {time}: {msg}");
+ Thread.Sleep(time);
+ }
+
+ protected List AwaitMultiple(TimeSpan timeout, IEnumerable> tasks)
+ {
+ var completedTasks = new List>();
+ using (var cts = new CancellationTokenSource(timeout))
+ {
+ var waitingTasks = tasks.ToList();
+ while (waitingTasks.Count > 0)
+ {
+ var anyTask = Task.WhenAny(waitingTasks);
+ try
+ {
+ anyTask.Wait(cts.Token);
+ }
+ catch (Exception e)
+ {
+ throw new Exception($"AwaitMultiple failed. Exception: {e.Message}", e);
+ }
+
+ var completedTask = anyTask.Result;
+ waitingTasks.Remove(completedTask);
+ completedTasks.Add(completedTask);
+ }
+ }
+
+ return completedTasks.Select(t => t.Result).ToList();
+ }
+
+ protected TimeSpan SleepAfterProduce => TimeSpan.FromSeconds(4);
+
+ protected void AwaitProduce(IEnumerable> tasks)
+ {
+ AwaitMultiple(TimeSpan.FromSeconds(4), tasks);
+ Sleep(SleepAfterProduce, "to be sure producing has happened");
+ }
+
+ protected readonly Partition Partition0 = new Partition(0);
+
+ // Not implemented
+ [Obsolete("Kafka DescribeCluster API isn't supported by the .NET driver")]
+ protected void WaitUntilCluster(Func