Skip to content

Commit e82554e

Browse files
committed
Add new data structure IpcSharedMemoryVec which allows to add more
elements to it. This data structure is not allowed to be send over the a channel but can easily get a reader which is allowed to be send but not allowed to add. Signed-off-by: Narfinger <[email protected]>
1 parent 9513802 commit e82554e

File tree

8 files changed

+454
-96
lines changed

8 files changed

+454
-96
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ fnv = "1.0.3"
4141
futures-channel = { version = "0.3.31", optional = true }
4242
futures-core = { version = "0.3.31", optional = true }
4343
libc = "0.2.162"
44-
serde_core = "1.0"
44+
serde = { version = "1.0", features = ["derive"] }
4545
uuid = { version = "1", features = ["v4"] }
4646

4747
[target.'cfg(any(target_os = "linux", target_os = "openbsd", target_os = "freebsd", target_os = "illumos"))'.dependencies]

src/ipc.rs

Lines changed: 166 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77
// option. This file may not be copied, modified, or distributed
88
// except according to those terms.
99

10-
use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender};
10+
use crate::platform::{
11+
self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender, OsIpcSharedMemoryIndex,
12+
OsIpcSharedMemoryVec,
13+
};
1114
use crate::platform::{
1215
OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel,
1316
};
1417

1518
use bincode;
16-
use serde_core::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
19+
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
1720
use std::cell::RefCell;
1821
use std::cmp::min;
1922
use std::error::Error as StdError;
@@ -33,6 +36,12 @@ thread_local! {
3336
static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell<Vec<OsIpcChannel>> = const { RefCell::new(Vec::new()) };
3437

3538
static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemory>> =
39+
const { RefCell::new(Vec::new()) };
40+
41+
static OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION:
42+
RefCell<Vec<Option<OsIpcSharedMemoryVec>>> = const { RefCell::new(Vec::new()) };
43+
44+
static OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemoryVec>> =
3645
const { RefCell::new(Vec::new()) }
3746
}
3847

@@ -368,15 +377,17 @@ where
368377
OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
369378
OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
370379
|os_ipc_shared_memory_regions_for_serialization| {
371-
bincode::serialize_into(&mut bytes, &data)?;
372-
let os_ipc_channels = os_ipc_channels_for_serialization.take();
373-
let os_ipc_shared_memory_regions =
374-
os_ipc_shared_memory_regions_for_serialization.take();
375-
Ok(self.os_sender.send(
376-
&bytes[..],
377-
os_ipc_channels,
378-
os_ipc_shared_memory_regions,
379-
)?)
380+
OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION.with(
381+
|os_ipc_shared_memory_vec_regions_for_serialization| {
382+
bincode::serialize_into(&mut bytes, &data)?;
383+
Ok(self.os_sender.send(
384+
&bytes[..],
385+
os_ipc_channels_for_serialization.take(),
386+
os_ipc_shared_memory_regions_for_serialization.take(),
387+
os_ipc_shared_memory_vec_regions_for_serialization.take(),
388+
)?)
389+
},
390+
)
380391
},
381392
)
382393
})
@@ -631,6 +642,117 @@ impl IpcSharedMemory {
631642
}
632643
}
633644

645+
/// An index to access `IpcSharedMemoryVec`
646+
#[derive(Clone, Debug, Serialize, Deserialize)]
647+
pub struct IpcSharedMemoryIndex(OsIpcSharedMemoryIndex);
648+
649+
/// Shared memory vector that will can be made accessible to a receiver via the reader method
650+
/// of an IPC message that contains the discriptor.
651+
///
652+
/// # Examples
653+
/// ```
654+
/// # use ipc_channel::ipc::{self, IpcSharedMemoryVec};
655+
/// # let (tx, rx) = ipc::channel().unwrap();
656+
/// # let data = [0x76, 0x69, 0x6d, 0x00];
657+
/// let (shmem, index) = IpcSharedMemoryVec::from_bytes(&data);
658+
/// tx.send(shmem.reader()).unwrap();
659+
/// # let rx_shmem = rx.recv().unwrap();
660+
/// # assert_eq!(shmem.get(&index), rx_shmem.get(&index));
661+
/// ```
662+
#[derive(Clone)]
663+
pub struct IpcSharedMemoryVec {
664+
os_shared_memory_vec: OsIpcSharedMemoryVec,
665+
}
666+
667+
unsafe impl Send for IpcSharedMemoryVec {}
668+
unsafe impl Sync for IpcSharedMemoryVec {}
669+
670+
impl Default for IpcSharedMemoryVec {
671+
fn default() -> Self {
672+
let (memory, _index) = OsIpcSharedMemoryVec::from_bytes(&[0xab]);
673+
IpcSharedMemoryVec {
674+
os_shared_memory_vec: memory,
675+
}
676+
}
677+
}
678+
679+
impl IpcSharedMemoryVec {
680+
pub fn from_bytes(bytes: &[u8]) -> (IpcSharedMemoryVec, IpcSharedMemoryIndex) {
681+
let (memory, index) = OsIpcSharedMemoryVec::from_bytes(bytes);
682+
(
683+
IpcSharedMemoryVec {
684+
os_shared_memory_vec: memory,
685+
},
686+
IpcSharedMemoryIndex(index),
687+
)
688+
}
689+
690+
pub fn push(&mut self, bytes: &[u8]) -> IpcSharedMemoryIndex {
691+
IpcSharedMemoryIndex(self.os_shared_memory_vec.push(bytes))
692+
}
693+
694+
pub fn get(&self, index: &IpcSharedMemoryIndex) -> Option<&[u8]> {
695+
self.os_shared_memory_vec.get(&index.0)
696+
}
697+
698+
/// Gives you a reader to access the already stored memory locations.
699+
/// Notice this will not allow you to access memory added _after_ you
700+
/// got this reader.
701+
pub fn reader(&self) -> IpcSharedMemoryReader {
702+
IpcSharedMemoryReader(self.os_shared_memory_vec.clone())
703+
}
704+
}
705+
706+
#[derive(Clone)]
707+
pub struct IpcSharedMemoryReader(OsIpcSharedMemoryVec);
708+
709+
impl IpcSharedMemoryReader {
710+
pub fn get(&self, index: &IpcSharedMemoryIndex) -> Option<&[u8]> {
711+
self.0.get(&index.0)
712+
}
713+
}
714+
715+
impl<'de> Deserialize<'de> for IpcSharedMemoryReader {
716+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
717+
where
718+
D: Deserializer<'de>,
719+
{
720+
let index: usize = Deserialize::deserialize(deserializer)?;
721+
722+
let os_shared_memory_vec = OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION.with(
723+
|os_ipc_shared_memory_regions_for_deserialization| {
724+
let mut regions = os_ipc_shared_memory_regions_for_deserialization.borrow_mut();
725+
let Some(region) = regions.get_mut(index) else {
726+
return Err(format!("Cannot consume shared memory region at index {index}, there are only {} regions available", regions.len()));
727+
};
728+
729+
region.take().ok_or_else(|| format!("Shared memory region {index} has already been consumed"))
730+
},
731+
).map_err(D::Error::custom)?;
732+
733+
Ok(IpcSharedMemoryReader(os_shared_memory_vec))
734+
}
735+
}
736+
737+
impl Serialize for IpcSharedMemoryReader {
738+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
739+
where
740+
S: Serializer,
741+
{
742+
let index = OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_SERIALIZATION.with(
743+
|os_ipc_shared_memory_vec_regions_for_serialization| {
744+
let mut os_ipc_shared_memory_vec_regions_for_serialization =
745+
os_ipc_shared_memory_vec_regions_for_serialization.borrow_mut();
746+
let index = os_ipc_shared_memory_vec_regions_for_serialization.len();
747+
os_ipc_shared_memory_vec_regions_for_serialization.push(self.0.clone());
748+
index
749+
},
750+
);
751+
debug_assert!(index < usize::MAX);
752+
index.serialize(serializer)
753+
}
754+
}
755+
634756
/// Result for readable events returned from [IpcReceiverSet::select].
635757
///
636758
/// [IpcReceiverSet::select]: struct.IpcReceiverSet.html#method.select
@@ -674,6 +796,7 @@ pub struct IpcMessage {
674796
pub(crate) data: Vec<u8>,
675797
pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
676798
pub(crate) os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
799+
pub(crate) os_ipc_shared_memory_vec: Vec<OsIpcSharedMemoryVec>,
677800
}
678801

679802
impl IpcMessage {
@@ -684,6 +807,7 @@ impl IpcMessage {
684807
data,
685808
os_ipc_channels: vec![],
686809
os_ipc_shared_memory_regions: vec![],
810+
os_ipc_shared_memory_vec: vec![],
687811
}
688812
}
689813
}
@@ -702,11 +826,13 @@ impl IpcMessage {
702826
data: Vec<u8>,
703827
os_ipc_channels: Vec<OsOpaqueIpcChannel>,
704828
os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
829+
os_ipc_shared_memory_vec: Vec<OsIpcSharedMemoryVec>,
705830
) -> IpcMessage {
706831
IpcMessage {
707832
data,
708833
os_ipc_channels,
709834
os_ipc_shared_memory_regions,
835+
os_ipc_shared_memory_vec,
710836
}
711837
}
712838

@@ -718,23 +844,34 @@ impl IpcMessage {
718844
OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
719845
OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
720846
|os_ipc_shared_memory_regions_for_deserialization| {
721-
// Setup the thread local memory for deserialization to take it.
722-
*os_ipc_channels_for_deserialization.borrow_mut() = self.os_ipc_channels;
723-
*os_ipc_shared_memory_regions_for_deserialization.borrow_mut() = self
724-
.os_ipc_shared_memory_regions
725-
.into_iter()
726-
.map(Some)
727-
.collect();
728-
729-
let result = bincode::deserialize(&self.data[..]);
730-
731-
// Clear the shared memory
732-
let _ = os_ipc_shared_memory_regions_for_deserialization.take();
733-
let _ = os_ipc_channels_for_deserialization.take();
734-
735-
/* Error check comes after doing cleanup,
736-
* since we need the cleanup both in the success and the error cases. */
737-
result
847+
OS_IPC_SHARED_MEMORY_VEC_REGIONS_FOR_DESERIALIZATION.with(
848+
|os_ipc_shared_memory_vec_regions_for_deserialization| {
849+
// Setup the thread local memory for deserialization to take it.
850+
*os_ipc_channels_for_deserialization.borrow_mut() =
851+
self.os_ipc_channels;
852+
*os_ipc_shared_memory_regions_for_deserialization.borrow_mut() = self
853+
.os_ipc_shared_memory_regions
854+
.into_iter()
855+
.map(Some)
856+
.collect();
857+
*os_ipc_shared_memory_vec_regions_for_deserialization.borrow_mut() =
858+
self.os_ipc_shared_memory_vec
859+
.into_iter()
860+
.map(Some)
861+
.collect();
862+
863+
let result = bincode::deserialize(&self.data[..]);
864+
865+
// Clear the shared memory
866+
let _ = os_ipc_shared_memory_regions_for_deserialization.take();
867+
let _ = os_ipc_channels_for_deserialization.take();
868+
let _ = os_ipc_shared_memory_vec_regions_for_deserialization.take();
869+
870+
/* Error check comes after doing cleanup,
871+
* since we need the cleanup both in the success and the error cases. */
872+
result
873+
},
874+
)
738875
},
739876
)
740877
})
@@ -960,7 +1097,7 @@ impl IpcBytesSender {
9601097
#[inline]
9611098
pub fn send(&self, data: &[u8]) -> Result<(), io::Error> {
9621099
self.os_sender
963-
.send(data, vec![], vec![])
1100+
.send(data, vec![], vec![], vec![])
9641101
.map_err(io::Error::from)
9651102
}
9661103
}

src/platform/inprocess/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,15 @@ impl OsIpcSender {
153153
data: &[u8],
154154
ports: Vec<OsIpcChannel>,
155155
shared_memory_regions: Vec<OsIpcSharedMemory>,
156+
shared_memory_vecs: Vec<OsIpcSharedMemoryVec>,
156157
) -> Result<(), ChannelError> {
157158
let os_ipc_channels = ports.into_iter().map(OsOpaqueIpcChannel::new).collect();
158-
let ipc_message = IpcMessage::new(data.to_vec(), os_ipc_channels, shared_memory_regions);
159+
let ipc_message = IpcMessage::new(
160+
data.to_vec(),
161+
os_ipc_channels,
162+
shared_memory_regions,
163+
shared_memory_vecs,
164+
);
159165
self.sender
160166
.send(ChannelMessage(ipc_message))
161167
.map_err(|_| ChannelError::BrokenPipeError)

src/platform/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ mod os {
6666
pub use self::os::{channel, OsOpaqueIpcChannel};
6767
pub use self::os::{OsIpcChannel, OsIpcOneShotServer, OsIpcReceiver, OsIpcReceiverSet};
6868
pub use self::os::{OsIpcSelectionResult, OsIpcSender, OsIpcSharedMemory};
69+
pub use self::os::{OsIpcSharedMemoryIndex, OsIpcSharedMemoryVec};
6970

7071
#[cfg(test)]
7172
mod test;

0 commit comments

Comments
 (0)