Skip to content

Commit 72eef0d

Browse files
Add unit tests for Job classes, ThreadPool, and ScopedBoolInverter (#1)
* Add unit tests for ThreadPool * Add unit tests for ScopedBoolInverter * Fix coverage report generator config * Add tests for and improve Job classes * Remove unnecessary includes of ThreadPool.h * Rename ThreadPool::execute to ::enqueue * Update NOTICE-3RD-PARTY-CONTENT.md Co-authored-by: Dominic Sudy <[email protected]>
1 parent 2ce3e04 commit 72eef0d

File tree

13 files changed

+630
-65
lines changed

13 files changed

+630
-65
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ repos:
2222
"--std=c++17",
2323
"--error-exitcode=1",
2424
"--enable=all",
25+
"--inline-suppr",
2526
"--suppress=noExplicitConstructor",
2627
"--suppress=missingInclude",
2728
"--suppress=unusedFunction",

examples/vehicle_model/Cabin/SeatService/SeatService.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include "SeatService.h"
1818
#include "sdk/Exceptions.h"
1919
#include "sdk/Logger.h"
20-
#include "sdk/ThreadPool.h"
2120

2221
#include "sdk/dapr/DaprSupport.h"
2322
#include "vehicle_model/Cabin/SeatService/SeatServiceAsyncGrpcFacade.h"

gcovr.cfg

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
filter = src
1+
filter = app
22
filter = examples
3-
filter = include
3+
filter = sdk
4+
exclude = .*/vehicle_model/
5+
exclude = .*/tests/
46

57
cobertura = build/coverage.cobertura.xml
68
html = build/coverage.html

sdk/include/sdk/Job.h

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717
#ifndef VEHICLE_APP_SDK_JOB_H
1818
#define VEHICLE_APP_SDK_JOB_H
1919

20+
#include <atomic>
2021
#include <functional>
2122
#include <memory>
2223
#include <mutex>
2324

2425
namespace velocitas {
2526

26-
class ThreadPool;
27-
class IJob;
28-
using JobPtr_t = std::shared_ptr<IJob>;
29-
3027
/**
3128
* @brief Interface for jobs which can be executed by a worker in the thread pool.
32-
*
3329
*/
3430
class IJob {
3531
public:
@@ -38,59 +34,65 @@ class IJob {
3834

3935
/**
4036
* @brief Execute the job.
37+
*/
38+
virtual void execute() = 0;
39+
40+
/**
41+
* @brief Indicates if this job shall recur after its execution.
4142
*
42-
* @param thisJobPtr A smart pointer to this job. Allows sub classes to i.e. re-trigger the
43-
* very same job instance.
44-
* @param pool The pool that is executing the job. Allows sub classes to i.e.
45-
* re-trigger the same job.
43+
* @return true - recur this job
44+
* @return false - don't recure
4645
*/
47-
virtual void execute(JobPtr_t& thisJobPtr, ThreadPool& pool) = 0; // NOLINT
46+
[[nodiscard]] virtual bool shallRecur() const { return false; }
4847

4948
IJob(const IJob&) = delete;
5049
IJob(IJob&&) = delete;
5150
IJob& operator=(const IJob&) = delete;
5251
IJob& operator=(IJob&&) = delete;
5352
};
5453

54+
using JobPtr_t = std::shared_ptr<IJob>;
55+
5556
/**
5657
* @brief A nonrecurring job.
57-
*
5858
*/
5959
class Job : public IJob {
6060
public:
6161
static JobPtr_t create(std::function<void()> fun) { return std::make_shared<Job>(fun); }
6262

6363
explicit Job(std::function<void()> fun);
6464

65-
void execute(JobPtr_t& /*thisJobPtr*/, ThreadPool& /*pool*/) override;
66-
67-
std::function<void()> getFunction() { return m_fun; }
65+
void execute() override;
6866

69-
void waitForTermination();
67+
void waitForTermination() const;
7068

7169
private:
7270
std::function<void()> m_fun;
73-
std::mutex m_terminationMutex;
71+
mutable std::mutex m_terminationMutex;
7472
};
7573

7674
/**
7775
* @brief A recurring job which can be cancelled manually.
78-
*
7976
*/
8077
class RecurringJob : public Job {
8178
public:
8279
static JobPtr_t create(std::function<void()> fun) {
8380
return std::make_shared<RecurringJob>(fun);
8481
}
8582

86-
void execute(JobPtr_t& thisJobPtr, ThreadPool& pool) override;
83+
using Job::Job;
84+
85+
void execute() override;
8786

87+
/**
88+
* @brief Prevents execution of the function once called.
89+
*/
8890
void cancel() { m_isCancelled = true; }
8991

90-
using Job::Job;
92+
[[nodiscard]] bool shallRecur() const override { return !m_isCancelled; }
9193

9294
private:
93-
bool m_isCancelled{false};
95+
std::atomic_bool m_isCancelled{false};
9496
};
9597

9698
} // namespace velocitas

sdk/include/sdk/ThreadPool.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
#include "sdk/Job.h"
2121

22+
#include <atomic>
2223
#include <condition_variable>
23-
#include <functional>
2424
#include <memory>
2525
#include <mutex>
2626
#include <queue>
@@ -35,9 +35,8 @@ namespace velocitas {
3535
*/
3636
class ThreadPool final {
3737
public:
38-
using Job_t = std::function<void()>;
39-
4038
ThreadPool();
39+
explicit ThreadPool(size_t numWorkerThreads);
4140

4241
~ThreadPool();
4342

@@ -48,12 +47,14 @@ class ThreadPool final {
4847
*/
4948
static std::shared_ptr<ThreadPool> getInstance();
5049

50+
[[nodiscard]] size_t getNumWorkerThreads() const;
51+
5152
/**
52-
* @brief Execute the given job asynchronously in one of the worker threads.
53+
* @brief Enqueue the given job to be executed asynchronously by one of the worker threads.
5354
*
5455
* @param job The job to execute.
5556
*/
56-
void execute(JobPtr_t job);
57+
void enqueue(JobPtr_t job);
5758

5859
ThreadPool(const ThreadPool&) = delete;
5960
ThreadPool(ThreadPool&&) = delete;
@@ -63,11 +64,11 @@ class ThreadPool final {
6364
private:
6465
void threadLoop();
6566

66-
std::mutex m_queueMutex;
67-
std::condition_variable m_cv;
68-
std::queue<JobPtr_t> m_jobs;
69-
std::vector<std::unique_ptr<std::thread>> m_workerThreads;
70-
bool m_isRunning{true};
67+
std::mutex m_queueMutex;
68+
std::condition_variable m_cv;
69+
std::queue<JobPtr_t> m_jobs;
70+
std::vector<std::thread> m_workerThreads;
71+
std::atomic_bool m_isRunning{true};
7172
};
7273

7374
} // namespace velocitas

sdk/src/sdk/Job.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,22 @@
1515
*/
1616

1717
#include "sdk/Job.h"
18-
#include "sdk/ThreadPool.h"
1918

2019
namespace velocitas {
2120

2221
Job::Job(std::function<void()> fun)
2322
: m_fun(std::move(fun)) {}
2423

25-
void Job::waitForTermination() { std::scoped_lock lock(m_terminationMutex); }
24+
void Job::waitForTermination() const { std::lock_guard lock(m_terminationMutex); }
2625

27-
void Job::execute(JobPtr_t& /*thisJobPtr*/, ThreadPool& /*pool*/) {
28-
std::scoped_lock lock(m_terminationMutex);
26+
void Job::execute() {
27+
std::lock_guard lock(m_terminationMutex);
2928
m_fun();
3029
}
3130

32-
void RecurringJob::execute(JobPtr_t& thisJobPtr, ThreadPool& pool) {
31+
void RecurringJob::execute() {
3332
if (!m_isCancelled) {
34-
Job::execute(thisJobPtr, pool);
35-
}
36-
37-
if (!m_isCancelled) {
38-
pool.execute(thisJobPtr);
33+
Job::execute();
3934
}
4035
}
4136

sdk/src/sdk/ThreadPool.cpp

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,44 @@
1818

1919
namespace velocitas {
2020

21-
ThreadPool::ThreadPool() {
22-
constexpr size_t numThreads{2};
23-
24-
for (size_t i = 0; i < numThreads; ++i) {
25-
auto workerThread = std::make_unique<std::thread>([this]() { threadLoop(); });
26-
m_workerThreads.push_back(std::move(workerThread));
21+
ThreadPool::ThreadPool(size_t numWorkerThreads)
22+
: m_workerThreads{numWorkerThreads} {
23+
for (size_t i = 0; i < numWorkerThreads; ++i) {
24+
m_workerThreads[i] = std::thread([this]() { threadLoop(); });
2725
}
2826
}
2927

28+
ThreadPool::ThreadPool()
29+
: ThreadPool(2) {}
30+
3031
ThreadPool::~ThreadPool() {
31-
m_isRunning = false;
32+
{
33+
std::lock_guard lock{m_queueMutex};
34+
m_isRunning = false;
35+
// empty the job queue (std::queue does not offer a clear function)
36+
std::queue<JobPtr_t>().swap(m_jobs);
37+
}
3238
m_cv.notify_all();
3339

3440
for (auto& thread : m_workerThreads) {
35-
thread->join();
41+
thread.join();
3642
}
3743
}
3844

3945
std::shared_ptr<ThreadPool> ThreadPool::getInstance() {
40-
static std::shared_ptr<ThreadPool> instance{nullptr};
41-
42-
if (!instance) {
43-
instance = std::make_shared<ThreadPool>();
44-
}
45-
46+
static std::shared_ptr<ThreadPool> instance{std::make_shared<ThreadPool>()};
4647
return instance;
4748
}
4849

49-
void ThreadPool::execute(JobPtr_t job) {
50+
size_t ThreadPool::getNumWorkerThreads() const { return m_workerThreads.size(); }
51+
52+
void ThreadPool::enqueue(JobPtr_t job) {
5053
std::lock_guard<std::mutex> lock(m_queueMutex);
5154
m_jobs.emplace(std::move(job));
52-
m_cv.notify_all();
55+
m_cv.notify_one();
5356
}
5457

5558
void ThreadPool::threadLoop() {
56-
std::mutex waitMutex;
5759
while (m_isRunning) {
5860
JobPtr_t job;
5961
{
@@ -65,10 +67,13 @@ void ThreadPool::threadLoop() {
6567
}
6668

6769
if (job) {
68-
job->execute(job, *this);
70+
job->execute();
71+
if (job->shallRecur()) {
72+
enqueue(job);
73+
}
6974
} else {
70-
std::unique_lock<std::mutex> lock(waitMutex);
71-
m_cv.wait(lock);
75+
std::unique_lock<std::mutex> lock(m_queueMutex);
76+
m_cv.wait(lock, [this] { return !m_jobs.empty() || !m_isRunning; });
7277
}
7378
}
7479
}

sdk/src/sdk/grpc/GrpcClient.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace velocitas {
2222

2323
GrpcClient::GrpcClient() {
2424
m_recurringJob = std::make_shared<RecurringJob>([this]() { pruneCompletedRequests(); });
25-
ThreadPool::getInstance()->execute(m_recurringJob);
25+
ThreadPool::getInstance()->enqueue(m_recurringJob);
2626
}
2727

2828
GrpcClient::~GrpcClient() {

sdk/src/sdk/grpc/VehicleDataBrokerClient.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include "sdk/Exceptions.h"
2525
#include "sdk/Job.h"
2626
#include "sdk/Logger.h"
27-
#include "sdk/ThreadPool.h"
2827

2928
#include <fmt/core.h>
3029
#include <grpcpp/channel.h>

sdk/tests/utests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ set(TARGET_NAME "sdk_utests")
33
add_executable(${TARGET_NAME}
44
AsyncResult_tests.cpp
55
AsyncSubscription_tests.cpp
6+
Job_tests.cpp
67
Node_tests.cpp
8+
ScopedBoolInverter_tests.cpp
9+
ThreadPool_tests.cpp
710
Utils_tests.cpp
811
VehicleDataBrokerClient_tests.cpp
912
#PubSub_tests.cpp

0 commit comments

Comments
 (0)