Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions .github/workflows/unbundled-lint-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ env:

on:
push:
branches: [ main ]
branches: [main]
pull_request:
paths:
- ".github/**"
Expand All @@ -35,7 +35,6 @@ concurrency:
cancel-in-progress: true

jobs:

set-env:
name: Set environment variables
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -64,8 +63,8 @@ jobs:
cache_key: ${{ steps.generate_cache_key.outputs.CACHE_KEY }}

env:
ARCH_SPECIFIC_CPP_STDLIB_PATH: ${{ needs.set-env.outputs.arch_specific_cpp_stdlib_path }}
GENERIC_CPP_STDLIB_PATH: ${{ needs.set-env.outputs.generic_cpp_stdlib_path }}
ARCH_SPECIFIC_CPP_STDLIB_PATH: ${{ needs.set-env.outputs.arch_specific_cpp_stdlib_path }}
GENERIC_CPP_STDLIB_PATH: ${{ needs.set-env.outputs.generic_cpp_stdlib_path }}

steps:
- name: Checkout repository
Expand All @@ -88,7 +87,7 @@ jobs:
echo "GENERIC_CPP_STDLIB_PATH: ${{ env.GENERIC_CPP_STDLIB_PATH }}"
echo "ARCH_SPECIFIC_CPP_STDLIB_PATH: ${{ env.ARCH_SPECIFIC_CPP_STDLIB_PATH }}"
env

- name: Download tarball
run: |
wget -O vsomeip-source.tar.gz $VSOMEIP_SOURCE_TARBALL
Expand Down Expand Up @@ -130,7 +129,7 @@ jobs:

- name: Install dependencies
if: steps.cache-vsomeip.outputs.cache-hit != 'true'
run: sudo apt-get install -y build-essential cmake libboost-all-dev doxygen asciidoc
run: sudo apt-get update && sudo apt-get install -y build-essential cmake libboost-all-dev doxygen asciidoc

- name: Build vsomeip
if: steps.cache-vsomeip.outputs.cache-hit != 'true'
Expand All @@ -148,7 +147,7 @@ jobs:

lint:
name: Lint
needs:
needs:
- set-env
- obtain_and_build_vsomeip
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -211,7 +210,7 @@ jobs:

test:
name: Test
needs:
needs:
- set-env
- obtain_and_build_vsomeip
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -267,7 +266,7 @@ jobs:
- name: Run tests and report code coverage
run: |
# enable nightly features so that we can also include Doctests
LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${VSOMEIP_INSTALL_PATH}/lib RUSTC_BOOTSTRAP=1 cargo tarpaulin --no-default-features -o xml -o lcov -o html --doc --tests -- --test-threads 1
LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${VSOMEIP_INSTALL_PATH}/lib RUSTC_BOOTSTRAP=1 cargo tarpaulin --no-default-features -o xml -o lcov -o html --doc --tests -- --test-threads 1

- name: Upload coverage report (xml)
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -295,7 +294,7 @@ jobs:

build-docs:
name: Build documentation
needs:
needs:
- obtain_and_build_vsomeip
- set-env
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -342,4 +341,3 @@ jobs:
- name: Create Documentation for up-linux-streamer
working-directory: ${{github.workspace}}
run: RUSTDOCFLAGS=-Dwarnings cargo doc -p up-linux-streamer --no-deps --no-default-features

40 changes: 28 additions & 12 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ const FORWARDING_LISTENERS_FN_INSERT_TAG: &str = "insert:";
const FORWARDING_LISTENERS_FN_REMOVE_TAG: &str = "remove:";

type ForwardingListenersContainer =
Mutex<HashMap<(ComparableTransport, String), (usize, Arc<ForwardingListener>)>>;
Mutex<HashMap<(ComparableTransport, String, String), (usize, Arc<ForwardingListener>)>>;

// we must have only a single listener per in UTransport and out UAuthority
struct ForwardingListeners {
Expand All @@ -203,9 +203,11 @@ impl ForwardingListeners {
let in_comparable_transport = ComparableTransport::new(in_transport.clone());
let mut forwarding_listeners = self.listeners.lock().await;

if let Some((active, forwarding_listener)) = forwarding_listeners
.get_mut(&(in_comparable_transport.clone(), out_authority.to_string()))
{
if let Some((active, forwarding_listener)) = forwarding_listeners.get_mut(&(
in_comparable_transport.clone(),
in_authority.to_string(),
out_authority.to_string(),
)) {
*active += 1;
if *active > 1 {
return Ok(None);
Expand Down Expand Up @@ -323,21 +325,32 @@ impl ForwardingListeners {

// Insert the new listener and update the active count
forwarding_listeners.insert(
(in_comparable_transport, out_authority.to_string()),
(
in_comparable_transport,
in_authority.to_string(),
out_authority.to_string(),
),
(1, forwarding_listener.clone()),
);
Ok(Some(forwarding_listener))
}

pub async fn remove(&self, in_transport: Arc<dyn UTransport>, out_authority: &str) {
pub async fn remove(
&self,
in_transport: Arc<dyn UTransport>,
in_authority: &str,
out_authority: &str,
) {
let in_comparable_transport = ComparableTransport::new(in_transport.clone());

let mut forwarding_listeners = self.listeners.lock().await;

let active_num = {
let Some((active, _)) = forwarding_listeners
.get_mut(&(in_comparable_transport.clone(), out_authority.to_string()))
else {
let Some((active, _)) = forwarding_listeners.get_mut(&(
in_comparable_transport.clone(),
in_authority.to_string(),
out_authority.to_string(),
)) else {
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} no such out_comparable_transport, out_authority: {out_authority:?}");
return;
};
Expand All @@ -346,8 +359,11 @@ impl ForwardingListeners {
};

if active_num == 0 {
let removed =
forwarding_listeners.remove(&(in_comparable_transport, out_authority.to_string()));
let removed = forwarding_listeners.remove(&(
in_comparable_transport,
in_authority.to_string(),
out_authority.to_string(),
));
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} removing ForwardingListener, out_authority: {out_authority:?}");
if let Some((_, forwarding_listener)) = removed {
warn!("ForwardingListeners::remove: ForwardingListener found we can remove, out_authority: {out_authority:?}");
Expand Down Expand Up @@ -824,7 +840,7 @@ impl UStreamer {
.remove(out.transport.clone())
.await;
self.forwarding_listeners
.remove(r#in.transport.clone(), &out.authority)
.remove(r#in.transport.clone(), &r#in.authority, &out.authority)
.await;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ use up_rust::{UListener, UTransport};
use up_streamer::{Endpoint, UStreamer};
use usubscription_static_file::USubscriptionStaticFile;

const DURATION_TO_RUN_CLIENTS: u128 = 1_000;
const DURATION_TO_RUN_CLIENTS: u128 = 1_0;
const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000;

#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn single_local_two_remote_authorities_same_remote_transport() {
integration_test_utils::init_logging();
Expand Down