Skip to content

Commit f8b4658

Browse files
authored
Merge pull request #28180 from taosdata/fix/TD-32391
fix(stream):fix issue for stream count window state buff
2 parents ef18811 + 2c0445f commit f8b4658

File tree

8 files changed

+33
-19
lines changed

8 files changed

+33
-19
lines changed

include/libs/executor/storageapi.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ typedef struct SStateStore {
382382

383383
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
384384
void** ppVal, int32_t* pVLen, int32_t* pWinCode);
385-
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
385+
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
386386

387387
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType,
388388
int32_t pkLen, SUpdateInfo** ppInfo);

include/libs/stream/streamState.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void streamStateFreeVal(void* val);
8787
// count window
8888
int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal,
8989
int32_t* pVLen, int32_t* pWinCode);
90-
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
90+
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
9191

9292
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
9393
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);

include/libs/stream/tstreamFileState.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
110110
// count window
111111
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
112112
int32_t* pVLen, int32_t* pWinCode);
113-
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
113+
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
114114

115115
// function
116116
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,

source/libs/executor/inc/streamexecutorInt.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
2626
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
2727
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
2828

29+
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
30+
2931
#ifdef __cplusplus
3032
}
3133
#endif

source/libs/executor/src/streamcountwindowoperator.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
9090

9191
if (isSlidingCountWindow(pAggSup)) {
9292
if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
93-
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
93+
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
9494
(void**)&pCurWin->winInfo.pStatePos, &size);
9595
QUERY_CHECK_CODE(code, lino, _end);
9696

@@ -101,19 +101,23 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
101101
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
102102
(void**)&pCurWin->winInfo.pStatePos, &size);
103103
if (winCode == TSDB_CODE_FAILED) {
104-
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
104+
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
105105
(void**)&pCurWin->winInfo.pStatePos, &size);
106106
QUERY_CHECK_CODE(code, lino, _end);
107+
} else {
108+
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
107109
}
108110
} else {
109111
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
110112
pAggSup->windowCount);
111113
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
112114
(void**)&pCurWin->winInfo.pStatePos, &size);
113115
if (winCode == TSDB_CODE_FAILED) {
114-
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
116+
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount,
115117
(void**)&pCurWin->winInfo.pStatePos, &size);
116118
QUERY_CHECK_CODE(code, lino, _end);
119+
} else {
120+
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
117121
}
118122
}
119123
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {

source/libs/stream/src/streamSessionState.c

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
10601060
return code;
10611061
}
10621062

1063-
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
1063+
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
10641064
SSessionKey* pWinKey = pKey;
10651065
const TSKEY gap = 0;
10661066
int32_t code = TSDB_CODE_SUCCESS;
@@ -1082,21 +1082,27 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
10821082
int32_t size = taosArrayGetSize(pWinStates);
10831083
if (size == 0) {
10841084
void* pFileStore = getStateFileStore(pFileState);
1085-
void* p = NULL;
1085+
void* pRockVal = NULL;
10861086

1087-
int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen);
1087+
int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
10881088
if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
1089-
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
1090-
if (!(*pVal)) {
1091-
code = TSDB_CODE_OUT_OF_MEMORY;
1089+
int32_t valSize = *pVLen;
1090+
COUNT_TYPE* pWinStateCount = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
1091+
if ((*pWinStateCount) == winCount) {
1092+
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
10921093
QUERY_CHECK_CODE(code, lino, _end);
1093-
}
1094-
1095-
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
1094+
} else {
1095+
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
1096+
if (!(*pVal)) {
1097+
code = TSDB_CODE_OUT_OF_MEMORY;
1098+
QUERY_CHECK_CODE(code, lino, _end);
1099+
}
1100+
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
10961101
pWinKey->win.ekey, code_file);
1102+
}
10971103
} else {
10981104
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
1099-
taosMemoryFree(p);
1105+
taosMemoryFree(pRockVal);
11001106
QUERY_CHECK_CODE(code, lino, _end);
11011107
}
11021108
} else {

source/libs/stream/src/streamState.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,6 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey
545545
return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
546546
}
547547

548-
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
549-
return createCountWinResultBuff(pState->pFileState, pKey, pVal, pVLen);
548+
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
549+
return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
550550
}

source/libs/stream/src/tstreamFileState.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,9 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
445445
}
446446

447447
int32_t clearRowBuff(SStreamFileState* pFileState) {
448-
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
448+
if (pFileState->deleteMark != INT64_MAX) {
449+
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
450+
}
449451
if (isListEmpty(pFileState->freeBuffs)) {
450452
return flushRowBuff(pFileState);
451453
}

0 commit comments

Comments
 (0)