Skip to content

Commit 38e5274

Browse files
Simplify, update docs for classifySessionsBy
1 parent b8b410d commit 38e5274

File tree

1 file changed

+25
-15
lines changed
  • src/Streamly/Internal/Data/Stream/IsStream

1 file changed

+25
-15
lines changed

src/Streamly/Internal/Data/Stream/IsStream/Reduce.hs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ where
146146

147147
#include "inline.hs"
148148

149-
import Control.Concurrent (threadDelay)
150149
import Control.Exception (assert)
151150
import Control.Monad.Catch (MonadThrow)
152151
import Control.Monad.IO.Class (MonadIO(..))
@@ -165,8 +164,6 @@ import Streamly.Internal.Data.Stream.IsStream.Common
165164
, interjectSuffix
166165
, intersperseM
167166
, map
168-
, parallelFst
169-
, repeatM
170167
, scanlMAfter'
171168
, splitOnSeq
172169
, fromPure)
@@ -1188,7 +1185,9 @@ classifyKeepAliveChunks spanout = classifyChunksBy spanout True
11881185
data SessionState t m k a b = SessionState
11891186
{ sessionCurTime :: !AbsTime -- ^ time since last event
11901187
, sessionEventTime :: !AbsTime -- ^ time as per last event
1191-
, sessionCount :: !Int -- ^ total number sessions in progress
1188+
-- We can use the Map size instead of maintaining a count, but if we have
1189+
-- to switch to HashMap then it can be useful.
1190+
, sessionCount :: !Int -- ^ total number of sessions in progress
11921191
, sessionTimerHeap :: H.Heap (H.Entry AbsTime k) -- ^ heap for timeouts
11931192
, sessionKeyValueMap :: Map.Map k a -- ^ Stored sessions for keys
11941193
, sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions
@@ -1219,6 +1218,11 @@ data SessionEntry a b = LiveSession !a !b | ZombieSession
12191218
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
12201219
-- oldest session is ejected before inserting a new session.
12211220
--
1221+
-- When the stream ends any buffered sessions are ejected immediately.
1222+
--
1223+
-- If a session key is received even after a session has finished, another
1224+
-- session is created for that key.
1225+
--
12221226
-- >>> :{
12231227
-- Stream.mapM_ print
12241228
-- $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
@@ -1243,9 +1247,11 @@ classifySessionsBy
12431247
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
12441248
-> t m (k, b) -- ^ session key, fold result
12451249
classifySessionsBy tick reset ejectPred tmout
1246-
(Fold step initial extract) str =
1247-
concatMap sessionOutputStream $
1248-
scanlMAfter' sstep (return szero) flush stream
1250+
(Fold step initial extract) input =
1251+
concatMap sessionOutputStream
1252+
$ scanlMAfter' sstep (return szero) flush
1253+
$ interjectSuffix tick (return Nothing)
1254+
$ map Just input
12491255

12501256
where
12511257

@@ -1272,12 +1278,14 @@ classifySessionsBy tick reset ejectPred tmout
12721278

12731279
-- Got a new stream input element
12741280
sstep session@SessionState{..} (Just (timestamp, (key, value))) = do
1275-
-- XXX we should use a heap in pinned memory to scale it to a large
1276-
-- size
1281+
-- XXX instead of a heap we could use a timer wheel.
12771282
--
12781283
-- XXX if the key is an Int, we can also use an IntMap for slightly
12791284
-- better performance.
12801285
--
1286+
-- How it works:
1287+
--
1288+
-- Values for each key are collected in a map using the supplied fold.
12811289
-- When we insert a key in the Map we insert an entry into the heap as
12821290
-- well with the session expiry as the sort key. The Map entry
12831291
-- consists of the fold result, and the expiry time of the session. If
@@ -1302,6 +1310,14 @@ classifySessionsBy tick reset ejectPred tmout
13021310
-- finished we still keep a dummy Map entry (ZombieSession) until the
13031311
-- heap entry is removed. That way if we have a Map entry we do not
13041312
-- insert a heap entry because we know it is already there.
1313+
-- XXX The ZombieSession mechanism does not work as expected as we
1314+
-- ignore ZombieSession when inserting a new entry. Anyway, we can
1315+
-- remove this mechanism as at most only two heap entries may be
1316+
-- created and they will be ultimately cleaned up.
1317+
--
1318+
-- Heap processing needs the map and map processing needs the heap,
1319+
-- therefore we cannot separate the two for modularity unless we have a
1320+
-- way to achieve mutual recursion.
13051321
--
13061322
let curTime = max sessionEventTime timestamp
13071323
mOld = Map.lookup key sessionKeyValueMap
@@ -1476,12 +1492,6 @@ classifySessionsBy tick reset ejectPred tmout
14761492
assert (Map.null mp) (return ())
14771493
return (hp, mp, out, cnt)
14781494

1479-
-- merge timer events in the stream
1480-
stream = map Just str `parallelFst` repeatM timer
1481-
timer = do
1482-
liftIO $ threadDelay (round $ tick * 1000000)
1483-
return Nothing
1484-
14851495
-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
14861496
-- option set to 'True'.
14871497
--

0 commit comments

Comments
 (0)