44use datadog_ipc:: platform:: { FileBackedHandle , MappedMem , NamedShmHandle , ShmHandle } ;
55use std:: ffi:: { CStr , CString } ;
66use std:: io;
7- use std:: sync:: atomic:: { AtomicBool , AtomicU64 , Ordering } ;
7+ use std:: sync:: atomic:: { fence , AtomicU64 , Ordering } ;
88use std:: sync:: Mutex ;
99
1010pub struct OneWayShmWriter < T >
2828struct RawMetaData {
2929 generation : AtomicU64 ,
3030 size : usize ,
31- writing : AtomicBool ,
3231}
3332
3433#[ repr( C ) ]
@@ -141,11 +140,9 @@ where
141140 unsafe { reinterpret_u8_as_u64_slice ( handle. as_slice ( ) ) } . into ( ) ;
142141 let copied_data: & RawData = new_mem. as_slice ( ) . into ( ) ;
143142
144- // Ensure the next write hasn't started yet *and* the data is from the expected
145- // generation
146- if !source_data. meta . writing . load ( Ordering :: SeqCst )
147- && new_generation == source_data. meta . generation . load ( Ordering :: Acquire )
148- {
143+ // Ensure a new write hasn't started yet
144+ fence ( Ordering :: Acquire ) ; // prevent loads before from being reordered with gen load after
145+ if new_generation == source_data. meta . generation . load ( Ordering :: Relaxed ) {
149146 reader. current_data . replace ( new_mem) ;
150147 return Some ( ( true , skip_last_byte ( copied_data. as_slice ( ) ) ) ) ;
151148 }
@@ -154,17 +151,27 @@ where
154151
155152 if let Some ( cur_mem) = & self . current_data {
156153 let cur_data: & RawData = cur_mem. as_slice ( ) . into ( ) ;
157- // Ensure nothing is copied during a write
158- if !source_data. meta . writing . load ( Ordering :: SeqCst )
159- && new_generation > cur_data. meta . generation . load ( Ordering :: Acquire )
160- {
154+
155+ if new_generation & 1 == 1 {
156+ // mid-write
157+ return ( false , skip_last_byte ( cur_data. as_slice ( ) ) ) ;
158+ }
159+
160+ if new_generation > cur_data. meta . generation . load ( Ordering :: Relaxed ) {
161161 if let Some ( success) = fetch_data ( self ) {
162162 return success;
163163 }
164164 }
165165
166166 return ( false , skip_last_byte ( cur_data. as_slice ( ) ) ) ;
167- } else if !source_data. meta . writing . load ( Ordering :: SeqCst ) {
167+ } else {
168+ // first read
169+
170+ if new_generation & 1 == 1 {
171+ // mid-write
172+ return ( false , "" . as_bytes ( ) ) ;
173+ }
174+
168175 if let Some ( success) = fetch_data ( self ) {
169176 return success;
170177 }
@@ -193,14 +200,13 @@ impl<T: FileBackedHandle + From<MappedMem<T>>> OneWayShmWriter<T> {
193200 // Actually &mut mapped.as_slice_mut() as RawData seems safe, but unsized locals are
194201 // unstable
195202 let data = unsafe { & mut * ( mapped. as_slice_mut ( ) as * mut [ u8 ] as * mut RawData ) } ;
196- data. meta . writing . store ( true , Ordering :: SeqCst ) ;
203+ data. meta . generation . fetch_add ( 1 , Ordering :: AcqRel ) ;
197204 data. meta . size = size;
198205
199206 data. as_slice_mut ( ) [ 0 ..contents. len ( ) ] . copy_from_slice ( contents) ;
200207 data. as_slice_mut ( ) [ contents. len ( ) ] = 0 ;
201208
202- data. meta . generation . fetch_add ( 1 , Ordering :: SeqCst ) ;
203- data. meta . writing . store ( false , Ordering :: SeqCst ) ;
209+ data. meta . generation . fetch_add ( 1 , Ordering :: Release ) ;
204210 }
205211
206212 pub fn as_slice ( & self ) -> & [ u8 ] {
0 commit comments