Skip to content

Commit 1937bc0

Browse files
committed
Add packet ack configuration
1 parent 2eeddb5 commit 1937bc0

File tree

6 files changed

+167
-62
lines changed

6 files changed

+167
-62
lines changed

src/message_cache.rs

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ impl<T: PartialEq + MessageHash> Deref for CacheMessage<T> {
3838
}
3939
}
4040

41-
pub enum PopFront<'a> {
42-
Duration(Duration),
43-
Ack(&'a [u8]),
44-
}
45-
4641
impl<T: PartialEq + MessageHash> MessageCache<T> {
4742
pub fn new(max_messages: u16) -> Self {
4843
let waiting = VecDeque::new();
@@ -69,15 +64,18 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
6964

7065
/// Returns the index of the first matching message in the cache or None if
7166
/// not present
72-
pub fn index_of(&self, message: &T) -> Option<usize> {
73-
self.cache.iter().position(|m| m.message == *message)
67+
pub fn index_of<P>(&self, pred: P) -> Option<usize>
68+
where
69+
P: Fn(&T) -> bool,
70+
{
71+
self.cache.iter().position(|entry| pred(&entry.message))
7472
}
7573

7674
/// Promotes the given message to the back of the queue, effectively
7775
/// recreating an LRU cache. Returns true if a cache hit was found
7876
pub fn tag(&mut self, message: T, received: Instant) -> bool {
7977
let result = self
80-
.index_of(&message)
78+
.index_of(|msg| *msg == message)
8179
.and_then(|index| self.cache.remove(index))
8280
.is_some();
8381
self.push_back(message, received);
@@ -100,30 +98,30 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
10098
self.cache.push_front(cache_message);
10199
}
102100

103-
pub fn pop_front(&mut self, args: PopFront) -> (usize, Option<CacheMessage<T>>) {
101+
pub fn pop_front(&mut self, duration: Duration) -> (usize, Option<CacheMessage<T>>) {
104102
let mut dropped = 0;
105103
let mut front = None;
106104
while let Some(msg) = self.cache.pop_front() {
107-
match args {
108-
PopFront::Duration(duration) => {
109-
if msg.hold_time() <= duration {
110-
front = Some(msg);
111-
break;
112-
}
113-
}
114-
PopFront::Ack(ack) => {
115-
if msg.hash() == ack {
116-
front = self.cache.pop_front();
117-
break;
118-
}
119-
}
120-
};
121-
// held for too long or acked, count as dropped and move on
105+
if msg.hold_time() <= duration {
106+
front = Some(msg);
107+
break;
108+
}
122109
dropped += 1;
123110
}
124111
(dropped, front)
125112
}
126113

114+
/// Removes all items from the cache up to and including the given index.
115+
///
116+
/// The index is bounds checked and an index beyond the length of the cache
117+
/// is ignored
118+
pub fn remove_to(&mut self, index: usize) {
119+
if index >= self.len() {
120+
return;
121+
}
122+
self.cache = self.cache.split_off(index + 1);
123+
}
124+
127125
/// Returns a reference to the first (and oldest/first to be removed)
128126
/// message in the cache
129127
pub fn peek_front(&self) -> Option<&CacheMessage<T>> {
@@ -141,7 +139,8 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
141139

142140
#[cfg(test)]
143141
mod test {
144-
use super::MessageCache;
142+
use super::{Instant, MessageCache};
143+
use sha2::{Digest, Sha256};
145144

146145
#[test]
147146
fn test_cache_tagging() {
@@ -161,8 +160,36 @@ mod test {
161160

162161
// Third tag should evict the least recently used entry (2)
163162
assert!(!cache.tag_now(vec![3]));
164-
assert_eq!(Some(0), cache.index_of(&vec![1u8]));
165-
assert_eq!(Some(1), cache.index_of(&vec![3u8]));
166-
assert!(cache.index_of(&vec![2u8]).is_none());
163+
assert_eq!(Some(0), cache.index_of(|msg| msg.as_slice() == &[1u8]));
164+
assert_eq!(Some(1), cache.index_of(|msg| msg.as_slice() == &[3u8]));
165+
assert!(cache.index_of(|msg| msg.as_slice() == &[2u8]).is_none());
166+
}
167+
168+
#[test]
169+
fn test_remove_to() {
170+
let mut cache = MessageCache::<Vec<u8>>::new(5);
171+
cache.push_back(vec![1], Instant::now());
172+
cache.push_back(vec![2], Instant::now());
173+
cache.push_back(vec![3], Instant::now());
174+
175+
let ack = Sha256::digest(vec![2]).to_vec();
176+
177+
// Find entry by hash as an example
178+
let ack_index = cache.index_of(|msg| Sha256::digest(msg).to_vec() == ack);
179+
assert_eq!(Some(1), ack_index);
180+
// Can't find non existing
181+
assert_eq!(None, cache.index_of(|_| false));
182+
183+
// remove and check inclusion of remove_to
184+
cache.remove_to(1);
185+
assert_eq!(1, cache.len());
186+
187+
// remove past last index
188+
cache.remove_to(5);
189+
assert_eq!(1, cache.len());
190+
191+
// remove last element
192+
cache.remove_to(0);
193+
assert!(cache.is_empty());
167194
}
168195
}

src/packet.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use semtech_udp::{
99
push_data::{self, CRC},
1010
CodingRate, DataRate, Modulation,
1111
};
12+
use sha2::{Digest, Sha256};
1213
use std::{
1314
convert::TryFrom,
1415
fmt,
@@ -17,7 +18,10 @@ use std::{
1718
};
1819

1920
#[derive(Debug, Clone, PartialEq)]
20-
pub struct PacketUp(PacketRouterPacketUpV1);
21+
pub struct PacketUp {
22+
packet: PacketRouterPacketUpV1,
23+
pub(crate) hash: Vec<u8>,
24+
}
2125

2226
#[derive(Debug, Clone)]
2327
pub struct PacketDown(PacketRouterPacketDownV1);
@@ -26,18 +30,19 @@ impl Deref for PacketUp {
2630
type Target = PacketRouterPacketUpV1;
2731

2832
fn deref(&self) -> &Self::Target {
29-
&self.0
33+
&self.packet
3034
}
3135
}
3236

3337
impl From<PacketUp> for PacketRouterPacketUpV1 {
3438
fn from(value: PacketUp) -> Self {
35-
value.0
39+
value.packet
3640
}
3741
}
42+
3843
impl From<&PacketUp> for PacketRouterPacketUpV1 {
3944
fn from(value: &PacketUp) -> Self {
40-
value.0.clone()
45+
value.packet.clone()
4146
}
4247
}
4348

@@ -51,12 +56,12 @@ impl fmt::Display for PacketUp {
5156
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
5257
f.write_fmt(format_args!(
5358
"@{} us, {:.2} MHz, {:?}, snr: {}, rssi: {}, len: {}",
54-
self.0.timestamp,
55-
self.0.frequency,
56-
self.0.datarate(),
57-
self.0.snr,
58-
self.0.rssi,
59-
self.0.payload.len()
59+
self.packet.timestamp,
60+
self.packet.frequency,
61+
self.packet.datarate(),
62+
self.packet.snr,
63+
self.packet.rssi,
64+
self.packet.payload.len()
6065
))
6166
}
6267
}
@@ -66,15 +71,15 @@ impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
6671
fn try_from(value: PacketUp) -> Result<Self> {
6772
let report = poc_lora::LoraWitnessReportReqV1 {
6873
data: vec![],
69-
tmst: value.0.timestamp as u32,
74+
tmst: value.packet.timestamp as u32,
7075
timestamp: SystemTime::now()
7176
.duration_since(UNIX_EPOCH)
7277
.map_err(Error::from)?
7378
.as_nanos() as u64,
74-
signal: value.0.rssi * 10,
75-
snr: (value.0.snr * 10.0) as i32,
76-
frequency: value.0.frequency as u64,
77-
datarate: value.0.datarate,
79+
signal: value.packet.rssi * 10,
80+
snr: (value.packet.snr * 10.0) as i32,
81+
frequency: value.packet.frequency as u64,
82+
datarate: value.packet.datarate,
7883
pub_key: vec![],
7984
signature: vec![],
8085
};
@@ -106,7 +111,10 @@ impl PacketUp {
106111
gateway: gateway.into(),
107112
signature: vec![],
108113
};
109-
Ok(Self(packet))
114+
Ok(Self {
115+
hash: Sha256::digest(&packet.payload).to_vec(),
116+
packet,
117+
})
110118
}
111119

112120
pub fn is_potential_beacon(&self) -> bool {
@@ -132,7 +140,7 @@ impl PacketUp {
132140
}
133141

134142
pub fn payload(&self) -> &[u8] {
135-
&self.0.payload
143+
&self.packet.payload
136144
}
137145

138146
pub fn parse_header(payload: &[u8]) -> Result<MHDR> {

src/packet_router/mod.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
gateway,
3-
message_cache::{CacheMessage, MessageCache, MessageHash, PopFront},
4-
service::{packet_router::PacketRouterService, Reconnect},
3+
message_cache::{CacheMessage, MessageCache, MessageHash},
4+
service::{packet_router::PacketRouterService, AckTimer, Reconnect},
55
sync, Base64, PacketUp, PublicKey, Result, Settings,
66
};
77
use futures::TryFutureExt;
@@ -13,7 +13,6 @@ use serde::Serialize;
1313
use sha2::{Digest, Sha256};
1414
use std::{ops::Deref, time::Instant as StdInstant};
1515
use tokio::time::Duration;
16-
1716
use tracing::{debug, info, warn};
1817

1918
const STORE_GC_INTERVAL: Duration = Duration::from_secs(60);
@@ -57,6 +56,7 @@ pub struct PacketRouter {
5756
transmit: gateway::MessageSender,
5857
service: PacketRouterService,
5958
reconnect: Reconnect,
59+
ack_timer: AckTimer,
6060
store: MessageCache<PacketUp>,
6161
}
6262

@@ -73,16 +73,21 @@ impl PacketRouter {
7373
transmit: gateway::MessageSender,
7474
) -> Self {
7575
let router_settings = &settings.router;
76-
let service =
77-
PacketRouterService::new(router_settings.uri.clone(), settings.keypair.clone());
76+
let service = PacketRouterService::new(
77+
router_settings.uri.clone(),
78+
router_settings.ack_timeout(),
79+
settings.keypair.clone(),
80+
);
7881
let store = MessageCache::new(router_settings.queue);
7982
let reconnect = Reconnect::default();
83+
let ack_timer = AckTimer::new(router_settings.ack_timeout());
8084
Self {
8185
service,
8286
transmit,
8387
messages,
8488
store,
8589
reconnect,
90+
ack_timer,
8691
}
8792
}
8893

@@ -102,6 +107,7 @@ impl PacketRouter {
102107
self.service.disconnect();
103108
warn!("router disconnected");
104109
self.reconnect.update_next_time(true);
110+
self.ack_timer.update_next_time(false);
105111
},
106112
Some(Message::Status(tx_resp)) => {
107113
let status = RouterStatus {
@@ -116,6 +122,13 @@ impl PacketRouter {
116122
_ = self.reconnect.wait() => {
117123
let reconnect_result = self.handle_reconnect().await;
118124
self.reconnect.update_next_time(reconnect_result.is_err());
125+
self.ack_timer.update_next_time(reconnect_result.is_ok());
126+
},
127+
_ = self.ack_timer.wait() => {
128+
warn!("no packet acks received");
129+
let reconnect_result = self.handle_reconnect().await;
130+
self.reconnect.update_next_time(reconnect_result.is_err());
131+
self.ack_timer.update_next_time(reconnect_result.is_ok());
119132
},
120133
router_message = self.service.recv() => match router_message {
121134
Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await,
@@ -130,11 +143,16 @@ impl PacketRouter {
130143
self.service.disconnect();
131144
}
132145
self.reconnect.update_next_time(session_result.is_err());
146+
self.ack_timer.update_next_time(session_result.is_ok());
147+
},
148+
Ok(envelope_down_v1::Data::PacketAck(message)) => {
149+
self.handle_packet_ack(message).await;
150+
self.ack_timer.update_next_time(true);
133151
},
134-
Ok(envelope_down_v1::Data::PacketAck(message)) => self.handle_packet_ack(message).await,
135152
Err(err) => {
136153
warn!(?err, "router error");
137154
self.reconnect.update_next_time(true);
155+
self.ack_timer.update_next_time(false);
138156
},
139157
}
140158
}
@@ -164,7 +182,14 @@ impl PacketRouter {
164182
}
165183

166184
async fn handle_packet_ack(&mut self, message: PacketRouterPacketAckV1) {
167-
self.store.pop_front(PopFront::Ack(&message.payload_hash));
185+
if message.payload_hash.is_empty() {
186+
// Empty ack is just a heartbeat and is ignored
187+
return;
188+
}
189+
if let Some(index) = self.store.index_of(|msg| msg.hash == message.payload_hash) {
190+
self.store.remove_to(index);
191+
debug!(removed = index, "removed acked packets");
192+
}
168193
}
169194

170195
async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result {
@@ -175,9 +200,7 @@ impl PacketRouter {
175200
}
176201

177202
async fn send_waiting_packets(&mut self) -> Result {
178-
while let (removed, Some(packet)) =
179-
self.store.pop_front(PopFront::Duration(STORE_GC_INTERVAL))
180-
{
203+
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
181204
if removed > 0 {
182205
info!(removed, "discarded queued packets");
183206
}

src/service/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,35 @@ impl Reconnect {
5959
self.next_time = Instant::now() + backoff;
6060
}
6161
}
62+
63+
pub struct AckTimer {
64+
next_time: Instant,
65+
timeout: Duration,
66+
}
67+
68+
impl AckTimer {
69+
pub fn new(timeout: Duration) -> Self {
70+
Self {
71+
next_time: Instant::now() + timeout,
72+
timeout,
73+
}
74+
}
75+
76+
pub async fn wait(&self) {
77+
if self.next_time >= Instant::now() {
78+
time::sleep_until(self.next_time).await
79+
} else {
80+
std::future::pending().await
81+
}
82+
}
83+
84+
pub fn update_next_time(&mut self, active: bool) {
85+
// timeout is 0 if the ack timer is not requested. Active means the
86+
// connection is open and acks are to be expected
87+
self.next_time = if self.timeout.as_secs() > 0 && active {
88+
Instant::now() + self.timeout
89+
} else {
90+
Instant::now() - self.timeout
91+
};
92+
}
93+
}

0 commit comments

Comments
 (0)