Skip to content

Commit dd97ab9

Browse files
authored
Merge pull request #72 from fraktalio/feature/not-send-futures
Feature/not send futures
2 parents d4241bd + a1b44cf commit dd97ab9

File tree

12 files changed

+1021
-278
lines changed

12 files changed

+1021
-278
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ jobs:
2929
run: cargo build --verbose
3030

3131
- name: Run tests
32-
run: cargo test --verbose
32+
run: cargo test --features not-send-futures --verbose

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ pretty_assertions = "1.4.1"
1313
derive_more = { version = "2", features = ["display"] }
1414

1515
tokio = { version = "1.43.1", features = ["rt", "rt-multi-thread", "macros"] }
16+
17+
[features]
18+
default = [] # default = Send futures
19+
not-send-futures = [] # opt into non-Send futures

README.md

Lines changed: 69 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -430,65 +430,31 @@ Pushing these decisions from the core domain model is very valuable. Being able
430430

431431
## Fearless Concurrency
432432

433+
### `Send` bound futures/Async (multi-threaded executors)
434+
433435
Splitting the computation in your program into multiple threads to run multiple tasks at the same time can improve performance.
434436
However, programming with threads has a reputation for being difficult. Rust’s type system and ownership model guarantee thread safety.
435437

436-
Example of the concurrent execution of the aggregate:
438+
Example of the concurrent execution of the aggregate in multi-threaded environment (**default** - `Send`-bound futures):
437439

438440
```rust
439441
async fn es_test() {
440442
let repository = InMemoryOrderEventRepository::new();
441-
let aggregate = Arc::new(EventSourcedAggregate::new(repository, decider()));
442-
// Makes a clone of the Arc pointer. This creates another pointer to the same allocation, increasing the strong reference count.
443+
let aggregate = Arc::new(EventSourcedAggregate::new(
444+
repository,
445+
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
446+
));
447+
let aggregate1 = Arc::clone(&aggregate);
443448
let aggregate2 = Arc::clone(&aggregate);
444449

445-
// Lets spawn two threads to simulate two concurrent requests
446450
let handle1 = thread::spawn(|| async move {
447451
let command = OrderCommand::Create(CreateOrderCommand {
448452
order_id: 1,
449453
customer_name: "John Doe".to_string(),
450454
items: vec!["Item 1".to_string(), "Item 2".to_string()],
451455
});
452-
453-
let result = aggregate.handle(&command).await;
454-
assert!(result.is_ok());
455-
assert_eq!(
456-
result.unwrap(),
457-
[(
458-
OrderEvent::Created(OrderCreatedEvent {
459-
order_id: 1,
460-
customer_name: "John Doe".to_string(),
461-
items: vec!["Item 1".to_string(), "Item 2".to_string()],
462-
}),
463-
0
464-
)]
465-
);
466-
let command = OrderCommand::Update(UpdateOrderCommand {
467-
order_id: 1,
468-
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
469-
});
470-
let result = aggregate.handle(&command).await;
471-
assert!(result.is_ok());
472-
assert_eq!(
473-
result.unwrap(),
474-
[(
475-
OrderEvent::Updated(OrderUpdatedEvent {
476-
order_id: 1,
477-
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
478-
}),
479-
1
480-
)]
481-
);
482-
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 });
483-
let result = aggregate.handle(&command).await;
456+
let result = aggregate1.handle(&command).await;
484457
assert!(result.is_ok());
485-
assert_eq!(
486-
result.unwrap(),
487-
[(
488-
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }),
489-
2
490-
)]
491-
);
492458
});
493459

494460
let handle2 = thread::spawn(|| async move {
@@ -499,47 +465,71 @@ async fn es_test() {
499465
});
500466
let result = aggregate2.handle(&command).await;
501467
assert!(result.is_ok());
502-
assert_eq!(
503-
result.unwrap(),
504-
[(
505-
OrderEvent::Created(OrderCreatedEvent {
506-
order_id: 2,
507-
customer_name: "John Doe".to_string(),
508-
items: vec!["Item 1".to_string(), "Item 2".to_string()],
509-
}),
510-
0
511-
)]
512-
);
513-
let command = OrderCommand::Update(UpdateOrderCommand {
514-
order_id: 2,
515-
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
468+
});
469+
470+
handle1.join().unwrap().await;
471+
handle2.join().unwrap().await;
472+
}
473+
```
474+
475+
### `Send` free futures/Async (single-threaded executors)
476+
477+
Concurrency and async programming do not require a multi-threaded environment. You can run async tasks on a single-threaded executor, which allows you to write async code without the Send bound.
478+
479+
This approach has several benefits:
480+
481+
- Simpler code: No need for Arc, Mutex(RwLock), or other thread synchronization primitives for shared state.
482+
483+
- Ergonomic references: You can freely use references within your async code without worrying about moving data across threads. 🤯
484+
485+
- Efficient design: This model aligns with the “Thread-per-Core” pattern, letting you safely run multiple async tasks concurrently on a single thread.
486+
487+
In short: you get all the power of async/await without the complexity of multi-threaded synchronization all the time.
488+
489+
Just switching to a [LocalExecutor](https://docs.rs/async-executor/latest/async_executor/struct.LocalExecutor.html) or something like Tokio [LocalSet](https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html) should be enough.
490+
491+
If you want to enable single-threaded, Send-free async support, you can enable the optional feature `not-send-futures` when adding fmodel-rust to your project:
492+
493+
```toml
494+
[dependencies]
495+
fmodel-rust = { version = "0.8.2", features = ["not-send-futures"] }
496+
```
497+
498+
Example of the concurrent execution of the aggregate in single-threaded environment (**behind feature** - `Send` free `Futures`):
499+
500+
```rust
501+
async fn es_test_not_send() {
502+
let repository = InMemoryOrderEventRepository::new();
503+
504+
let aggregate = Rc::new(EventSourcedAggregate::new(
505+
repository,
506+
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
507+
));
508+
let aggregate2 = Rc::clone(&aggregate);
509+
510+
// Notice how we `move` here, which requires Rc (not ARc). If you do not move, Rc is not needed.
511+
let task1 = async move {
512+
let command = OrderCommand::Create(CreateOrderCommand {
513+
order_id: 1,
514+
customer_name: "Alice".to_string(),
515+
items: vec!["Item1".to_string()],
516516
});
517-
let result = aggregate2.handle(&command).await;
517+
let result = aggregate.handle(&command).await;
518518
assert!(result.is_ok());
519-
assert_eq!(
520-
result.unwrap(),
521-
[(
522-
OrderEvent::Updated(OrderUpdatedEvent {
523-
order_id: 2,
524-
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
525-
}),
526-
1
527-
)]
528-
);
529-
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 });
519+
};
520+
521+
let task2 = async move {
522+
let command = OrderCommand::Create(CreateOrderCommand {
523+
order_id: 1,
524+
customer_name: "John Doe".to_string(),
525+
items: vec!["Item 1".to_string(), "Item 2".to_string()],
526+
});
530527
let result = aggregate2.handle(&command).await;
531528
assert!(result.is_ok());
532-
assert_eq!(
533-
result.unwrap(),
534-
[(
535-
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }),
536-
2
537-
)]
538-
);
539-
});
529+
};
540530

541-
handle1.join().unwrap().await;
542-
handle2.join().unwrap().await;
531+
// Run both tasks concurrently on the same thread.
532+
tokio::join!(task1, task2);
543533
}
544534
```
545535

0 commit comments

Comments
 (0)