77
88use std:: thread:: JoinHandle ;
99use std:: sync:: mpsc:: { Receiver , Sender } ;
10+ use std:: time:: { Duration , SystemTime } ;
1011use log:: { extend_and_hash, hash, Entry , Event , Sha256Hash } ;
1112
1213pub struct Historian {
@@ -20,29 +21,48 @@ pub enum ExitReason {
2021 RecvDisconnected ,
2122 SendDisconnected ,
2223}
24+ fn log_event (
25+ sender : & Sender < Entry > ,
26+ num_hashes : & mut u64 ,
27+ end_hash : & mut Sha256Hash ,
28+ event : Event ,
29+ ) -> Result < ( ) , ( Entry , ExitReason ) > {
30+ if let Event :: UserDataKey ( key) = event {
31+ * end_hash = extend_and_hash ( end_hash, & key) ;
32+ }
33+ let entry = Entry {
34+ end_hash : * end_hash,
35+ num_hashes : * num_hashes,
36+ event,
37+ } ;
38+ if let Err ( _) = sender. send ( entry. clone ( ) ) {
39+ return Err ( ( entry, ExitReason :: SendDisconnected ) ) ;
40+ }
41+ * num_hashes = 0 ;
42+ Ok ( ( ) )
43+ }
2344
2445fn log_events (
2546 receiver : & Receiver < Event > ,
2647 sender : & Sender < Entry > ,
2748 num_hashes : & mut u64 ,
2849 end_hash : & mut Sha256Hash ,
50+ epoch : SystemTime ,
51+ num_ticks : & mut u64 ,
52+ ms_per_tick : Option < u64 > ,
2953) -> Result < ( ) , ( Entry , ExitReason ) > {
3054 use std:: sync:: mpsc:: TryRecvError ;
3155 loop {
56+ if let Some ( ms) = ms_per_tick {
57+ let now = SystemTime :: now ( ) ;
58+ if now > epoch + Duration :: from_millis ( ( * num_ticks + 1 ) * ms) {
59+ log_event ( sender, num_hashes, end_hash, Event :: Tick ) ?;
60+ * num_ticks += 1 ;
61+ }
62+ }
3263 match receiver. try_recv ( ) {
3364 Ok ( event) => {
34- if let Event :: UserDataKey ( key) = event {
35- * end_hash = extend_and_hash ( end_hash, & key) ;
36- }
37- let entry = Entry {
38- end_hash : * end_hash,
39- num_hashes : * num_hashes,
40- event,
41- } ;
42- if let Err ( _) = sender. send ( entry. clone ( ) ) {
43- return Err ( ( entry, ExitReason :: SendDisconnected ) ) ;
44- }
45- * num_hashes = 0 ;
65+ log_event ( sender, num_hashes, end_hash, event) ?;
4666 }
4767 Err ( TryRecvError :: Empty ) => {
4868 return Ok ( ( ) ) ;
@@ -63,15 +83,26 @@ fn log_events(
6383/// sending back Entry messages until either the receiver or sender channel is closed.
6484pub fn create_logger (
6585 start_hash : Sha256Hash ,
86+ ms_per_tick : Option < u64 > ,
6687 receiver : Receiver < Event > ,
6788 sender : Sender < Entry > ,
6889) -> JoinHandle < ( Entry , ExitReason ) > {
6990 use std:: thread;
7091 thread:: spawn ( move || {
7192 let mut end_hash = start_hash;
7293 let mut num_hashes = 0 ;
94+ let mut num_ticks = 0 ;
95+ let epoch = SystemTime :: now ( ) ;
7396 loop {
74- if let Err ( err) = log_events ( & receiver, & sender, & mut num_hashes, & mut end_hash) {
97+ if let Err ( err) = log_events (
98+ & receiver,
99+ & sender,
100+ & mut num_hashes,
101+ & mut end_hash,
102+ epoch,
103+ & mut num_ticks,
104+ ms_per_tick,
105+ ) {
75106 return err;
76107 }
77108 end_hash = hash ( & end_hash) ;
@@ -81,11 +112,11 @@ pub fn create_logger(
81112}
82113
83114impl Historian {
84- pub fn new ( start_hash : & Sha256Hash ) -> Self {
115+ pub fn new ( start_hash : & Sha256Hash , ms_per_tick : Option < u64 > ) -> Self {
85116 use std:: sync:: mpsc:: channel;
86117 let ( sender, event_receiver) = channel ( ) ;
87118 let ( entry_sender, receiver) = channel ( ) ;
88- let thread_hdl = create_logger ( * start_hash, event_receiver, entry_sender) ;
119+ let thread_hdl = create_logger ( * start_hash, ms_per_tick , event_receiver, entry_sender) ;
89120 Historian {
90121 sender,
91122 receiver,
@@ -98,14 +129,13 @@ impl Historian {
98129mod tests {
99130 use super :: * ;
100131 use log:: * ;
132+ use std:: thread:: sleep;
133+ use std:: time:: Duration ;
101134
102135 #[ test]
103136 fn test_historian ( ) {
104- use std:: thread:: sleep;
105- use std:: time:: Duration ;
106-
107137 let zero = Sha256Hash :: default ( ) ;
108- let hist = Historian :: new ( & zero) ;
138+ let hist = Historian :: new ( & zero, None ) ;
109139
110140 hist. sender . send ( Event :: Tick ) . unwrap ( ) ;
111141 sleep ( Duration :: new ( 0 , 1_000_000 ) ) ;
@@ -129,12 +159,30 @@ mod tests {
129159 #[ test]
130160 fn test_historian_closed_sender ( ) {
131161 let zero = Sha256Hash :: default ( ) ;
132- let hist = Historian :: new ( & zero) ;
162+ let hist = Historian :: new ( & zero, None ) ;
133163 drop ( hist. receiver ) ;
134164 hist. sender . send ( Event :: Tick ) . unwrap ( ) ;
135165 assert_eq ! (
136166 hist. thread_hdl. join( ) . unwrap( ) . 1 ,
137167 ExitReason :: SendDisconnected
138168 ) ;
139169 }
170+
171+ #[ test]
172+ fn test_ticking_historian ( ) {
173+ let zero = Sha256Hash :: default ( ) ;
174+ let hist = Historian :: new ( & zero, Some ( 20 ) ) ;
175+ sleep ( Duration :: from_millis ( 30 ) ) ;
176+ hist. sender . send ( Event :: UserDataKey ( zero) ) . unwrap ( ) ;
177+ sleep ( Duration :: from_millis ( 15 ) ) ;
178+ drop ( hist. sender ) ;
179+ assert_eq ! (
180+ hist. thread_hdl. join( ) . unwrap( ) . 1 ,
181+ ExitReason :: RecvDisconnected
182+ ) ;
183+
184+ let entries: Vec < Entry > = hist. receiver . iter ( ) . collect ( ) ;
185+ assert ! ( entries. len( ) > 1 ) ;
186+ assert ! ( verify_slice( & entries, & zero) ) ;
187+ }
140188}
0 commit comments