Skip to content

Commit f65cea2

Browse files
authored
session/redis: fix linter and add tests (#374)
- Introduced TestService_AsyncPersisterNum_DefaultClamp to verify default clamp behavior for async persister number. - Added TestService_AppendEvent_NoPanic_AfterClose to ensure AppendEvent does not panic after service closure. - Implemented TestService_SessionEventLimit_TrimsOldest to confirm that only the latest events are retained when exceeding the session event limit. - Updated AppendEvent method to handle event channel indexing more robustly and added context cancellation in async worker.
1 parent 6dd2031 commit f65cea2

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

session/redis/redis_session_service.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,8 @@ func (s *Service) AppendEvent(
417417

418418
// TODO: Init hash index at session creation to prevent duplicate computation.
419419
hKey := getEventKey(key)
420-
index := murmur3.Sum32([]byte(hKey)) % uint32(len(s.eventPairChans))
420+
n := len(s.eventPairChans)
421+
index := int(murmur3.Sum32([]byte(hKey))) % n
421422
select {
422423
case s.eventPairChans[index] <- &sessionEventPair{key: key, event: event}:
423424
case <-ctx.Done():
@@ -789,10 +790,11 @@ func (s *Service) startAsyncPersistWorker() {
789790
for _, eventPairChan := range s.eventPairChans {
790791
go func(eventPairChan chan *sessionEventPair) {
791792
for eventPair := range eventPairChan {
792-
ctx, _ := context.WithTimeout(context.Background(), defaultTimeout)
793+
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
793794
if err := s.addEvent(ctx, eventPair.key, eventPair.event); err != nil {
794795
log.Errorf("redis session service persistence event failed: %w", err)
795796
}
797+
cancel()
796798
}
797799
}(eventPairChan)
798800
}

session/redis/redis_session_service_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,86 @@ func TestService_UserStateTTL(t *testing.T) {
10301030
}
10311031
}
10321032

1033+
func TestService_AsyncPersisterNum_DefaultClamp(t *testing.T) {
1034+
redisURL, cleanup := setupTestRedis(t)
1035+
defer cleanup()
1036+
1037+
service, err := NewService(
1038+
WithRedisClientURL(redisURL),
1039+
WithEnableAsyncPersist(true),
1040+
WithAsyncPersisterNum(0),
1041+
)
1042+
require.NoError(t, err)
1043+
defer service.Close()
1044+
1045+
assert.Equal(t, defaultAsyncPersisterNum, len(service.eventPairChans))
1046+
}
1047+
1048+
func TestService_AppendEvent_NoPanic_AfterClose(t *testing.T) {
1049+
redisURL, cleanup := setupTestRedis(t)
1050+
defer cleanup()
1051+
1052+
service, err := NewService(
1053+
WithRedisClientURL(redisURL),
1054+
WithEnableAsyncPersist(true),
1055+
WithAsyncPersisterNum(2),
1056+
)
1057+
require.NoError(t, err)
1058+
1059+
// Create a session.
1060+
sessionKey := session.Key{
1061+
AppName: "testapp",
1062+
UserID: "user123",
1063+
SessionID: "session123",
1064+
}
1065+
sess, err := service.CreateSession(context.Background(), sessionKey, session.StateMap{})
1066+
require.NoError(t, err)
1067+
1068+
// Close service to close channels.
1069+
service.Close()
1070+
1071+
// Append after close should not panic due to recover in AppendEvent.
1072+
evt := createTestEvent("event-after-close", "agent", "msg", time.Now(), false)
1073+
assert.NotPanics(t, func() {
1074+
_ = service.AppendEvent(context.Background(), sess, evt)
1075+
})
1076+
}
1077+
1078+
func TestService_SessionEventLimit_TrimsOldest(t *testing.T) {
1079+
redisURL, cleanup := setupTestRedis(t)
1080+
defer cleanup()
1081+
1082+
limit := 3
1083+
service, err := NewService(
1084+
WithRedisClientURL(redisURL),
1085+
WithSessionEventLimit(limit),
1086+
)
1087+
require.NoError(t, err)
1088+
defer service.Close()
1089+
1090+
// Create session.
1091+
sessionKey := session.Key{AppName: "testapp", UserID: "user123", SessionID: "session123"}
1092+
sess, err := service.CreateSession(context.Background(), sessionKey, session.StateMap{})
1093+
require.NoError(t, err)
1094+
1095+
// Append 5 events with increasing timestamps.
1096+
base := time.Now().Add(-5 * time.Minute)
1097+
ids := []string{"e1", "e2", "e3", "e4", "e5"}
1098+
for i, id := range ids {
1099+
evt := createTestEvent(id, "agent", "content", base.Add(time.Duration(i)*time.Second), false)
1100+
err := service.AppendEvent(context.Background(), sess, evt, session.WithEventTime(evt.Timestamp))
1101+
require.NoError(t, err)
1102+
}
1103+
1104+
// Get session and verify only latest 'limit' events remain in chronological order.
1105+
got, err := service.GetSession(context.Background(), sessionKey, session.WithEventNum(0))
1106+
require.NoError(t, err)
1107+
require.NotNil(t, got)
1108+
require.Len(t, got.Events, limit)
1109+
// Should keep e3, e4, e5 in chronological order.
1110+
assert.Equal(t, []string{"e3", "e4", "e5"}, []string{got.Events[0].ID, got.Events[1].ID, got.Events[2].ID})
1111+
}
1112+
10331113
func TestEnsureEventStartWithUser(t *testing.T) {
10341114
tests := []struct {
10351115
name string

0 commit comments

Comments
 (0)