Coverage Report

Created: 2026-06-09 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/tdengine/source/common/src/msg/streamMsg.c
Line
Count
Source
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
16
#include "streamMsg.h"
17
#include "taos.h"
18
#include "tarray.h"
19
#include "tdatablock.h"
20
#include "thash.h"
21
#include "tlist.h"
22
#include "tmsg.h"
23
#include "os.h"
24
#include "tcommon.h"
25
#include "tsimplehash.h"
26
27
0
int32_t tEncodeSStreamMgmtReq(SEncoder* pEncoder, const SStreamMgmtReq* pReq) {
28
0
  int32_t code = 0;
29
0
  int32_t lino = 0;
30
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId));
31
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type));
32
0
  switch (pReq->type) {
33
0
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
34
0
      if (pReq->cont.pReqs) {
35
0
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
36
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
37
0
        for (int32_t i = 0; i < num; ++i) {
38
0
          SStreamDbTableName* pName = taosArrayGet(pReq->cont.pReqs, i);
39
0
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->dbFName, strlen(pName->dbFName) + 1));
40
0
          TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pName->tbName, strlen(pName->tbName) + 1));
41
0
        }
42
0
      } else {
43
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
44
0
      }
45
0
      break;
46
0
    }
47
0
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
48
0
      if (pReq->cont.pReqs) {
49
0
        int32_t num = taosArrayGetSize(pReq->cont.pReqs);
50
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, num));
51
0
        for (int32_t i = 0; i < num; ++i) {
52
0
          SStreamOReaderDeployReq* pDeploy = taosArrayGet(pReq->cont.pReqs, i);
53
0
          int32_t vgIdNum = taosArrayGetSize(pDeploy->vgIds);
54
0
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeploy->execId));
55
0
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pDeploy->uid));
56
0
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgIdNum));
57
0
          for (int32_t n = 0; n < vgIdNum; ++n) {
58
0
            TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)taosArrayGet(pDeploy->vgIds, n)));
59
0
          }
60
0
        }
61
0
      } else {
62
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
63
0
      }
64
0
      break;
65
0
    }
66
0
    default:
67
0
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
68
0
      break;
69
0
  }
70
71
0
_exit:
72
73
0
  return code;
74
0
}
75
76
0
void tFreeRunnerOReaderDeployReq(void* param) {
77
0
  SStreamOReaderDeployReq* pReq = (SStreamOReaderDeployReq*)param;
78
0
  if (pReq) {
79
0
    taosArrayDestroy(pReq->vgIds);
80
0
  }
81
0
}
82
83
0
void tFreeSStreamMgmtReq(SStreamMgmtReq* pReq) {
84
0
  if (NULL == pReq) {
85
0
    return;
86
0
  }
87
88
0
  switch (pReq->type) {
89
0
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
90
0
      taosArrayDestroy(pReq->cont.pReqs);
91
0
      break;
92
0
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
93
0
      taosArrayDestroyEx(pReq->cont.pReqs, tFreeRunnerOReaderDeployReq);
94
0
      break;
95
0
    default:
96
0
      break;
97
0
  }
98
0
}
99
100
101
0
int32_t tCloneSStreamMgmtReq(SStreamMgmtReq* pSrc, SStreamMgmtReq** ppDst) {
102
0
  *ppDst = NULL;
103
  
104
0
  if (NULL == pSrc) {
105
0
    return TSDB_CODE_SUCCESS;
106
0
  }
107
108
0
  int32_t code = 0, lino = 0;
109
0
  *ppDst = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
110
0
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
111
112
0
  memcpy(*ppDst, pSrc, sizeof(*pSrc));
113
0
  if (pSrc->cont.pReqs) {
114
0
    switch (pSrc->type) {
115
0
      case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
116
0
        (*ppDst)->cont.pReqs = taosArrayDup(pSrc->cont.pReqs, NULL);
117
0
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
118
0
        break;
119
0
      case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
120
0
        int32_t reqNum = taosArrayGetSize(pSrc->cont.pReqs);
121
0
        (*ppDst)->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), reqNum);
122
0
        TSDB_CHECK_NULL((*ppDst)->cont.pReqs, code, lino, _exit, terrno);
123
0
        for (int32_t i = 0; i < reqNum; ++i) {
124
0
          SStreamOReaderDeployReq* pNew = taosArrayGet((*ppDst)->cont.pReqs, i);
125
0
          SStreamOReaderDeployReq* pReq = taosArrayGet(pSrc->cont.pReqs, i);
126
0
          pNew->vgIds = taosArrayDup(pReq->vgIds, NULL);
127
0
          TSDB_CHECK_NULL(pNew->vgIds, code, lino, _exit, terrno);
128
0
          pNew->execId = pReq->execId;
129
0
          pNew->uid = pReq->uid;
130
0
        }
131
0
        break;
132
0
      }  
133
0
      default:
134
0
        break;
135
0
    }
136
0
  }
137
  
138
0
_exit:
139
140
0
  if (code) {
141
0
    tFreeSStreamMgmtReq(*ppDst);
142
0
    taosMemoryFreeClear(*ppDst);
143
0
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
144
0
  }
145
  
146
0
  return code;
147
0
}
148
149
150
0
int32_t tDecodeSStreamMgmtReq(SDecoder* pDecoder, SStreamMgmtReq* pReq) {
151
0
  int32_t code = 0;
152
0
  int32_t lino = 0;
153
154
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId));
155
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pReq->type));
156
0
  switch (pReq->type) {
157
0
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER: {
158
0
      int32_t num = 0;
159
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
160
0
      if (num > 0) {
161
0
        pReq->cont.pReqs = taosArrayInit(num, sizeof(SStreamDbTableName));
162
0
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
163
0
        for (int32_t i = 0; i < num; ++i) {
164
0
          SStreamDbTableName* p = taosArrayReserve(pReq->cont.pReqs, 1);
165
0
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->dbFName));
166
0
          TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, p->tbName));
167
0
        }
168
0
      }
169
0
      break;
170
0
    }
171
0
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER: {
172
0
      int32_t num = 0, vgIdNum = 0;
173
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &num));
174
0
      if (num > 0) {
175
0
        pReq->cont.pReqs = taosArrayInit_s(sizeof(SStreamOReaderDeployReq), num);
176
0
        TSDB_CHECK_NULL(pReq->cont.pReqs, code, lino, _exit, terrno);
177
0
        for (int32_t i = 0; i < num; ++i) {
178
0
          SStreamOReaderDeployReq* p = taosArrayGet(pReq->cont.pReqs, i);
179
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &p->execId));
180
0
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &p->uid));
181
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgIdNum));
182
0
          if (vgIdNum > 0) {
183
0
            p->vgIds = taosArrayInit_s(sizeof(int32_t), vgIdNum);
184
0
            TSDB_CHECK_NULL(p->vgIds, code, lino, _exit, terrno);
185
0
          }
186
0
          for (int32_t n = 0; n < vgIdNum; ++n) {
187
0
            int32_t* vgId = taosArrayGet(p->vgIds, n);
188
0
            TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));
189
0
          }
190
0
        }
191
0
      }
192
0
      break;
193
0
    }
194
0
    default:
195
0
      code = TSDB_CODE_STREAM_INVALID_TASK_TYPE;
196
0
      break;
197
0
  }
198
199
0
_exit:
200
201
0
  return code;  
202
0
}
203
204
0
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
205
0
  int32_t code = 0;
206
0
  int32_t lino;
207
208
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->type));
209
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamId));
210
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->taskId));
211
212
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->flags));
213
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->seriousId));
214
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->deployId));
215
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->nodeId));
216
  // SKIP SESSIONID
217
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->taskIdx));
218
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->status));
219
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->detailStatus));
220
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->errorCode));
221
0
  if (pTask->pMgmtReq) {
222
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 1));
223
0
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtReq(pEncoder, pTask->pMgmtReq));
224
0
  } else {
225
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, 0));
226
0
  }
227
228
0
_exit:
229
230
0
  return code;
231
0
}
232
233
234
0
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
235
0
  int32_t code = 0;
236
0
  int32_t lino;
237
238
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->type));
239
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamId));
240
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->taskId));
241
  
242
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->flags));
243
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->seriousId));
244
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->deployId));
245
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->nodeId));
246
  // SKIP SESSIONID
247
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->taskIdx));
248
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, (int32_t*)&pTask->status));
249
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->detailStatus));
250
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->errorCode));
251
0
  int32_t req = 0;
252
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &req));
253
0
  if (req) {
254
0
    pTask->pMgmtReq = taosMemoryCalloc(1, sizeof(SStreamMgmtReq));
255
0
    TSDB_CHECK_NULL(pTask->pMgmtReq, code, lino, _exit, terrno);
256
0
    TAOS_CHECK_EXIT(tDecodeSStreamMgmtReq(pDecoder, pTask->pMgmtReq));
257
0
  }
258
259
0
_exit:
260
261
0
  return code;
262
0
}
263
264
0
int32_t tEncodeSSTriggerRecalcProgress(SEncoder* pEncoder, const SSTriggerRecalcProgress* pProgress) {
265
0
  int32_t code = 0;
266
0
  int32_t lino;
267
268
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->recalcId));
269
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pProgress->progress));
270
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->start));
271
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pProgress->end));
272
273
0
_exit:
274
275
0
  return code;
276
0
}
277
278
0
int32_t tDecodeSSTriggerRecalcProgress(SDecoder* pDecoder, SSTriggerRecalcProgress* pProgress) {
279
0
  int32_t code = 0;
280
0
  int32_t lino;
281
282
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->recalcId));
283
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pProgress->progress));
284
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->start));
285
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pProgress->end));
286
287
0
_exit:
288
289
0
  return code;
290
0
}
291
292
293
0
int32_t tEncodeSSTriggerRuntimeStatus(SEncoder* pEncoder, const SSTriggerRuntimeStatus* pStatus) {
294
0
  int32_t code = 0;
295
0
  int32_t lino;
296
297
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->autoRecalcNum));
298
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->realtimeSessionNum));
299
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->historySessionNum));
300
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->recalcSessionNum));
301
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStatus->histroyProgress));
302
303
0
  int32_t recalcNum = (int32_t)taosArrayGetSize(pStatus->userRecalcs);
304
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
305
0
  for (int32_t i = 0; i < recalcNum; ++i) {
306
0
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
307
0
    TAOS_CHECK_EXIT(tEncodeSSTriggerRecalcProgress(pEncoder, pProgress));
308
0
  }
309
310
0
_exit:
311
312
0
  return code;
313
0
}
314
315
0
int32_t tDecodeSSTriggerRuntimeStatus(SDecoder* pDecoder, SSTriggerRuntimeStatus* pStatus) {
316
0
  int32_t code = 0;
317
0
  int32_t lino;
318
319
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->autoRecalcNum));
320
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
321
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->historySessionNum));
322
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->realtimeSessionNum));
323
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pStatus->histroyProgress));
324
325
0
  int32_t recalcNum = 0;
326
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));
327
0
  if (recalcNum > 0) {
328
0
    pStatus->userRecalcs = taosArrayInit_s(sizeof(SSTriggerRecalcProgress), recalcNum);
329
0
    if (NULL == pStatus->userRecalcs) {
330
0
      code = terrno;
331
0
      goto _exit;
332
0
    }
333
0
  }
334
335
0
  for (int32_t i = 0; i < recalcNum; ++i) {
336
0
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pStatus->userRecalcs, i);
337
0
    TAOS_CHECK_EXIT(tDecodeSSTriggerRecalcProgress(pDecoder, pProgress));
338
0
  }
339
340
0
_exit:
341
342
0
  return code;
343
0
}
344
345
346
0
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
347
0
  int32_t code = 0;
348
0
  int32_t lino;
349
350
0
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
351
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dnodeId));
352
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->streamGId));
353
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->snodeId));
354
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->runnerThreadNum));
355
356
0
  int32_t vgLeaderNum = taosArrayGetSize(pReq->pVgLeaders);
357
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgLeaderNum));
358
0
  for (int32_t i = 0; i < vgLeaderNum; ++i) {
359
0
    int32_t* vgId = taosArrayGet(pReq->pVgLeaders, i);
360
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
361
0
  }
362
  
363
0
  int32_t statusNum = taosArrayGetSize(pReq->pStreamStatus);
364
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, statusNum));
365
0
  for (int32_t i = 0; i < statusNum; ++i) {
366
0
    SStmTaskStatusMsg* pStatus = taosArrayGet(pReq->pStreamStatus, i);
367
0
    TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)pStatus));
368
0
  }
369
370
0
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
371
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, reqNum));
372
0
  for (int32_t i = 0; i < reqNum; ++i) {
373
0
    int32_t* idx = taosArrayGet(pReq->pStreamReq, i);
374
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *idx));
375
0
  }
376
377
0
  int32_t triggerNum = taosArrayGetSize(pReq->pTriggerStatus);
378
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, triggerNum));
379
0
  for (int32_t i = 0; i < triggerNum; ++i) {
380
0
    SSTriggerRuntimeStatus* pTrigger = taosArrayGet(pReq->pTriggerStatus, i);
381
0
    TAOS_CHECK_EXIT(tEncodeSSTriggerRuntimeStatus(pEncoder, pTrigger));
382
0
  }
383
  
384
0
  tEndEncode(pEncoder);
385
386
0
_exit:
387
0
  if (code) {
388
0
    return code;
389
0
  } else {
390
0
    return pEncoder->pos;
391
0
  }
392
0
}
393
394
0
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
395
0
  int32_t code = 0;
396
0
  int32_t lino;
397
398
0
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
399
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dnodeId));
400
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->streamGId));
401
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->snodeId));
402
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->runnerThreadNum));
403
404
0
  int32_t vgLearderNum = 0;
405
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgLearderNum));
406
0
  if (vgLearderNum > 0) {
407
0
    pReq->pVgLeaders = taosArrayInit(vgLearderNum, sizeof(int32_t));
408
0
    if (NULL == pReq->pVgLeaders) {
409
0
      code = terrno;
410
0
      goto _exit;
411
0
    }
412
0
  }
413
0
  for (int32_t i = 0; i < vgLearderNum; ++i) {
414
0
    int32_t vgId = 0;
415
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
416
0
    if (NULL == taosArrayPush(pReq->pVgLeaders, &vgId)) {
417
0
      code = terrno;
418
0
      goto _exit;
419
0
    }
420
0
  }
421
422
423
0
  int32_t statusNum = 0;
424
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &statusNum));
425
0
  if (statusNum > 0) {
426
0
    pReq->pStreamStatus = taosArrayInit_s(sizeof(SStmTaskStatusMsg), statusNum);
427
0
    if (NULL == pReq->pStreamStatus) {
428
0
      code = terrno;
429
0
      goto _exit;
430
0
    }
431
0
  }
432
0
  for (int32_t i = 0; i < statusNum; ++i) {
433
0
    SStmTaskStatusMsg* pTask = taosArrayGet(pReq->pStreamStatus, i);
434
0
    if (NULL == pTask) {
435
0
      code = terrno;
436
0
      goto _exit;
437
0
    }
438
0
    TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)pTask));
439
0
  }
440
441
442
0
  int32_t reqNum = 0;
443
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &reqNum));
444
0
  if (reqNum > 0) {
445
0
    pReq->pStreamReq = taosArrayInit_s(sizeof(int32_t), reqNum);
446
0
    if (NULL == pReq->pStreamReq) {
447
0
      code = terrno;
448
0
      goto _exit;
449
0
    }
450
0
  }
451
0
  for (int32_t i = 0; i < reqNum; ++i) {
452
0
    int32_t* pIdx = taosArrayGet(pReq->pStreamReq, i);
453
0
    if (NULL == pIdx) {
454
0
      code = terrno;
455
0
      goto _exit;
456
0
    }
457
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, pIdx));
458
0
  }
459
460
461
0
  int32_t triggerNum = 0;
462
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerNum));
463
0
  if (triggerNum > 0) {
464
0
    pReq->pTriggerStatus = taosArrayInit_s(sizeof(SSTriggerRuntimeStatus), triggerNum);
465
0
    if (NULL == pReq->pTriggerStatus) {
466
0
      code = terrno;
467
0
      goto _exit;
468
0
    }
469
0
  }
470
0
  for (int32_t i = 0; i < triggerNum; ++i) {
471
0
    SSTriggerRuntimeStatus* pStatus = taosArrayGet(pReq->pTriggerStatus, i);
472
0
    if (NULL == pStatus) {
473
0
      code = terrno;
474
0
      goto _exit;
475
0
    }
476
0
    TAOS_CHECK_EXIT(tDecodeSSTriggerRuntimeStatus(pDecoder, pStatus));
477
0
  }
478
479
  
480
0
  tEndDecode(pDecoder);
481
482
0
_exit:
483
0
  return code;
484
0
}
485
486
0
void tFreeSSTriggerRuntimeStatus(void* param) {
487
0
  SSTriggerRuntimeStatus* pStatus = (SSTriggerRuntimeStatus*)param;
488
0
  if (NULL == pStatus) {
489
0
    return;
490
0
  }
491
0
  taosArrayDestroy(pStatus->userRecalcs);
492
0
}
493
494
0
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg, bool deepClean) {
495
0
  if (pMsg == NULL) {
496
0
    return;
497
0
  }
498
499
0
  taosArrayDestroy(pMsg->pVgLeaders);
500
0
  if (deepClean) {
501
0
    int32_t reqNum = taosArrayGetSize(pMsg->pStreamReq);
502
0
    for (int32_t i = 0; i < reqNum; ++i) {
503
0
      int32_t* idx = taosArrayGet(pMsg->pStreamReq, i);
504
0
      SStmTaskStatusMsg* pTask = taosArrayGet(pMsg->pStreamStatus, *idx);
505
0
      if (NULL == pTask) {
506
0
        continue;
507
0
      }
508
509
0
      tFreeSStreamMgmtReq(pTask->pMgmtReq);
510
0
      taosMemoryFree(pTask->pMgmtReq);
511
0
    }
512
0
  }
513
0
  taosArrayDestroy(pMsg->pStreamReq);
514
0
  taosArrayDestroy(pMsg->pStreamStatus);
515
0
  taosArrayDestroyEx(pMsg->pTriggerStatus, tFreeSSTriggerRuntimeStatus);
516
0
}
517
518
0
int32_t tEncodeSStreamReaderDeployFromTrigger(SEncoder* pEncoder, const SStreamReaderDeployFromTrigger* pMsg) {
519
0
  int32_t code = 0;
520
0
  int32_t lino;
521
522
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pMsg->triggerTblName, pMsg->triggerTblName == NULL ? 0 : (int32_t)strlen(pMsg->triggerTblName) + 1));
523
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblUid));
524
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->triggerTblSuid));
525
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerTblType));
526
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
527
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteReCalc));
528
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->deleteOutTbl));
529
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, pMsg->partitionCols == NULL ? 0 : (int32_t)strlen(pMsg->partitionCols) + 1));
530
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerCols, pMsg->triggerCols == NULL ? 0 : (int32_t)strlen(pMsg->triggerCols) + 1));
531
  //TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, pMsg->triggerPrevFilter == NULL ? 0 : (int32_t)strlen(pMsg->triggerPrevFilter) + 1));
532
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, pMsg->triggerScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->triggerScanPlan) + 1));
533
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, pMsg->calcCacheScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcCacheScanPlan) + 1));
534
535
0
_exit:
536
537
0
  return code;
538
0
}
539
540
0
int32_t tEncodeSStreamReaderDeployFromCalc(SEncoder* pEncoder, const SStreamReaderDeployFromCalc* pMsg) {
541
0
  int32_t code = 0;
542
0
  int32_t lino;
543
544
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
545
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcScanPlan, pMsg->calcScanPlan == NULL ? 0 : (int32_t)strlen(pMsg->calcScanPlan) + 1));
546
547
0
_exit:
548
549
0
  return code;
550
0
}
551
552
553
0
int32_t tEncodeSStreamReaderDeployMsg(SEncoder* pEncoder, const SStreamReaderDeployMsg* pMsg) {
554
0
  int32_t code = 0;
555
0
  int32_t lino;
556
557
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerReader));
558
0
  if (pMsg->triggerReader) {
559
0
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromTrigger(pEncoder, &pMsg->msg.trigger));
560
0
  } else {
561
0
    TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployFromCalc(pEncoder, &pMsg->msg.calc));
562
0
  }
563
  
564
0
_exit:
565
566
0
  return code;
567
0
}
568
569
0
int32_t tEncodeSStreamTaskAddr(SEncoder* pEncoder, const SStreamTaskAddr* pMsg) {
570
0
  int32_t code = 0;
571
0
  int32_t lino;
572
573
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->taskId));
574
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->nodeId));
575
0
  TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pMsg->epset));
576
577
0
_exit:
578
579
0
  return code;
580
0
}
581
582
0
int32_t tEncodeSStreamRunnerTarget(SEncoder* pEncoder, const SStreamRunnerTarget* pMsg) {
583
0
  int32_t code = 0;
584
0
  int32_t lino;
585
586
0
  TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, &pMsg->addr));
587
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
588
589
0
_exit:
590
591
0
  return code;
592
0
}
593
594
595
0
int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerDeployMsg* pMsg) {
596
0
  int32_t code = 0;
597
0
  int32_t lino;
598
599
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerType));
600
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igDisorder));
601
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistory));
602
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->fillHistoryFirst));
603
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
604
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->igNoDataTrigger));
605
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->enableMultiGroupCalc));
606
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblVirt));
607
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->triggerHasPF));
608
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->isTriggerTblStb));
609
0
  int32_t partitionColsLen = pMsg->partitionCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->partitionCols) + 1;
610
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->partitionCols, partitionColsLen));
611
612
0
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
613
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
614
0
  for (int32_t i = 0; i < addrSize; ++i) {
615
0
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
616
0
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
617
0
  }
618
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->notifyEventTypes));
619
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
620
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->notifyHistory));
621
622
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->maxDelay));
623
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->fillHistoryStartTime));
624
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->watermark));
625
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->expiredTime));
626
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->idleTimeoutMs));
627
628
0
  switch (pMsg->triggerType) {
629
0
    case WINDOW_TYPE_SESSION: {
630
      // session trigger
631
0
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.session.slotId));
632
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.session.sessionVal));
633
0
      break;
634
0
    }
635
0
    case WINDOW_TYPE_STATE: {
636
      // state trigger
637
0
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.slotId));
638
0
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->trigger.stateWin.extend));
639
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForType));
640
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.stateWin.trueForCount));
641
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.stateWin.trueForDuration));
642
0
      int32_t stateWindowZerothLen = 
643
0
          pMsg->trigger.stateWin.zeroth == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.zeroth) + 1;
644
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.zeroth, stateWindowZerothLen));
645
0
      int32_t stateWindowExprLen =
646
0
          pMsg->trigger.stateWin.expr == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.stateWin.expr) + 1;
647
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.stateWin.expr, stateWindowExprLen));
648
0
      break;
649
0
    }
650
0
    case WINDOW_TYPE_INTERVAL: {
651
      // slide trigger
652
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.intervalUnit));
653
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.slidingUnit));
654
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.offsetUnit));
655
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.soffsetUnit));
656
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.sliding.precision));
657
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.interval));
658
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.offset));
659
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.sliding));
660
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.sliding.soffset));
661
0
      break;
662
0
    }
663
0
    case WINDOW_TYPE_EVENT: {
664
      // event trigger
665
0
      int32_t eventWindowStartCondLen = pMsg->trigger.event.startCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.startCond) + 1;
666
0
      int32_t eventWindowEndCondLen = pMsg->trigger.event.endCond == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.event.endCond) + 1;
667
668
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.startCond, eventWindowStartCondLen));
669
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.event.endCond, eventWindowEndCondLen));
670
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForType));
671
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->trigger.event.trueForCount));
672
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.event.trueForDuration));
673
0
      break;
674
0
    }
675
0
    case WINDOW_TYPE_COUNT: {
676
      // count trigger
677
0
      int32_t countWindowCondColsLen = pMsg->trigger.count.condCols == NULL ? 0 : (int32_t)strlen((char*)pMsg->trigger.count.condCols) + 1;
678
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->trigger.count.condCols, countWindowCondColsLen));
679
680
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.countVal));
681
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.count.sliding));
682
0
      break;
683
0
    }
684
0
    case WINDOW_TYPE_PERIOD: {
685
      // period trigger
686
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
687
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
688
0
      TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
689
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
690
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
691
0
      break;
692
0
    }
693
0
    default:
694
0
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
695
0
      break;
696
0
  }
697
698
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->eventTypes));
699
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->placeHolderBitmap));
700
0
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcTsSlotId));
701
0
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triTsSlotId));
702
0
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->calcPkSlotId));
703
0
  TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pMsg->triPkSlotId));
704
0
  int32_t triggerPrevFilterLen = (pMsg->triggerPrevFilter == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerPrevFilter) + 1);
705
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerPrevFilter, triggerPrevFilterLen));
706
0
  int32_t triggerScanPlanLen = (pMsg->triggerScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->triggerScanPlan) + 1);
707
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->triggerScanPlan, triggerScanPlanLen));
708
0
  int32_t calcCacheScanPlanLen = (pMsg->calcCacheScanPlan == NULL) ? 0 : ((int32_t)strlen(pMsg->calcCacheScanPlan) + 1);
709
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->calcCacheScanPlan, calcCacheScanPlanLen));
710
711
0
  int32_t readerNum = taosArrayGetSize(pMsg->readerList);
712
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
713
0
  for (int32_t i = 0; i < readerNum; ++i) {
714
0
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
715
0
    TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, pAddr));
716
0
  }
717
718
0
  int32_t runnerNum = taosArrayGetSize(pMsg->runnerList);
719
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
720
0
  for (int32_t i = 0; i < runnerNum; ++i) {
721
0
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
722
0
    TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, pTarget));
723
0
  }
724
725
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
726
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
727
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
728
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->nodelayCreateSubtable));
729
730
0
_exit:
731
732
0
  return code;
733
0
}
734
735
736
0
int32_t tSerializeSFieldWithOptions(SEncoder* pEncoder, const SFieldWithOptions *pField) {
737
0
  int32_t code = 0;
738
0
  int32_t lino;
739
740
0
  TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pField->name));
741
0
  TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pField->type));
742
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pField->flags));
743
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->bytes));
744
0
  TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pField->compress));
745
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pField->typeMod));
746
747
0
_exit:
748
749
0
  return code;
750
0
}
751
752
753
0
int32_t tEncodeSStreamRunnerDeployMsg(SEncoder* pEncoder, const SStreamRunnerDeployMsg* pMsg) {
754
0
  int32_t code = 0;
755
0
  int32_t lino;
756
757
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->execReplica));
758
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
759
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->pPlan, NULL == pMsg->pPlan ? 0 : (int32_t)strlen(pMsg->pPlan) + 1));
760
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outDBFName, NULL == pMsg->outDBFName ? 0 : (int32_t)strlen(pMsg->outDBFName) + 1));
761
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->outTblName, NULL == pMsg->outTblName ? 0 : (int32_t)strlen(pMsg->outTblName) + 1));
762
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->outTblType));
763
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->calcNotifyOnly));
764
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->topPlan));
765
766
0
  int32_t addrSize = (int32_t)taosArrayGetSize(pMsg->pNotifyAddrUrls);
767
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
768
0
  for (int32_t i = 0; i < addrSize; ++i) {
769
0
    const char *url = taosArrayGetP(pMsg->pNotifyAddrUrls, i);
770
0
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, url, NULL == url ? 0 : (int32_t)strlen(url) + 1));
771
0
  }
772
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->addOptions));
773
774
0
  int32_t outColNum = (int32_t)taosArrayGetSize(pMsg->outCols);
775
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outColNum));
776
0
  for (int32_t i = 0; i < outColNum; ++i) {
777
0
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
778
0
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pCol));
779
0
  }
780
781
0
  int32_t outTagNum = (int32_t)taosArrayGetSize(pMsg->outTags);
782
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, outTagNum));
783
0
  for (int32_t i = 0; i < outTagNum; ++i) {
784
0
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
785
0
    TAOS_CHECK_EXIT(tSerializeSFieldWithOptions(pEncoder, pTag));
786
0
  }
787
788
0
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pMsg->outStbUid));
789
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->outStbSversion));
790
791
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->subTblNameExpr, NULL == pMsg->subTblNameExpr ? 0 : (int32_t)strlen(pMsg->subTblNameExpr) + 1));
792
0
  TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->tagValueExpr, NULL == pMsg->tagValueExpr ? 0 : (int32_t)strlen(pMsg->tagValueExpr) + 1));
793
794
0
  int32_t forceOutColsSize = (int32_t)taosArrayGetSize(pMsg->forceOutCols);
795
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, forceOutColsSize));
796
0
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
797
0
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
798
0
    int32_t        exprLen = pCoutCol->expr == NULL ? 0 : (int32_t)strlen((char*)pCoutCol->expr) + 1;
799
800
0
    TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pCoutCol->expr, exprLen));
801
0
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.type));
802
0
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.precision));
803
0
    TAOS_CHECK_EXIT(tEncodeU8(pEncoder, pCoutCol->type.scale));
804
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pCoutCol->type.bytes));
805
0
  }
806
807
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->lowLatencyCalc));
808
809
  // colCids and tagCids - always encode size (0 if NULL) for compatibility
810
0
  int32_t colCidsSize = (int32_t)taosArrayGetSize(pMsg->colCids);
811
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, colCidsSize));
812
0
  if (colCidsSize > 0) {
813
0
    for (int32_t i = 0; i < colCidsSize; ++i) {
814
0
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->colCids, i);
815
0
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
816
0
    }
817
0
  }
818
819
0
  int32_t tagCidsSize = (int32_t)taosArrayGetSize(pMsg->tagCids);
820
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, tagCidsSize));
821
0
  if (tagCidsSize > 0) {
822
0
    for (int32_t i = 0; i < tagCidsSize; ++i) {
823
0
      int16_t* pCid = (int16_t*)taosArrayGet(pMsg->tagCids, i);
824
0
      TAOS_CHECK_EXIT(tEncodeI16(pEncoder, *pCid));
825
0
    }
826
0
  }
827
828
0
_exit:
829
830
0
  return code;
831
0
}
832
833
0
int32_t tEncodeSStmTaskDeploy(SEncoder* pEncoder, const SStmTaskDeploy* pTask) {
834
0
  int32_t code = 0;
835
0
  int32_t lino;
836
837
0
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
838
0
  switch (pTask->task.type) {
839
0
    case STREAM_READER_TASK:
840
0
      TAOS_CHECK_EXIT(tEncodeSStreamReaderDeployMsg(pEncoder, &pTask->msg.reader));
841
0
      break;
842
0
    case STREAM_TRIGGER_TASK:
843
0
      TAOS_CHECK_EXIT(tEncodeSStreamTriggerDeployMsg(pEncoder, &pTask->msg.trigger));
844
0
      break;
845
0
    case STREAM_RUNNER_TASK:
846
0
      TAOS_CHECK_EXIT(tEncodeSStreamRunnerDeployMsg(pEncoder, &pTask->msg.runner));
847
0
      break;
848
0
    default:
849
0
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
850
0
      break;
851
0
  }
852
  
853
0
_exit:
854
855
0
  return code;
856
0
}
857
858
859
0
int32_t tEncodeSStmStreamDeploy(SEncoder* pEncoder, const SStmStreamDeploy* pStream) {
860
0
  int32_t code = 0;
861
0
  int32_t lino;
862
863
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pStream->streamId));
864
865
0
  int32_t readerNum = taosArrayGetSize(pStream->readerTasks);
866
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
867
0
  for (int32_t i = 0; i < readerNum; ++i) {
868
0
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->readerTasks, i);
869
0
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
870
0
  }
871
872
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pStream->triggerTask ? 1 : 0));
873
0
  if (pStream->triggerTask) {
874
0
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pStream->triggerTask));
875
0
  }
876
  
877
0
  int32_t runnerNum = taosArrayGetSize(pStream->runnerTasks);
878
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
879
0
  for (int32_t i = 0; i < runnerNum; ++i) {
880
0
    SStmTaskDeploy* pDeploy = taosArrayGet(pStream->runnerTasks, i);
881
0
    TAOS_CHECK_EXIT(tEncodeSStmTaskDeploy(pEncoder, pDeploy));
882
0
  }
883
884
0
_exit:
885
886
0
  return code;
887
0
}
888
889
0
int32_t tEncodeSStreamMsg(SEncoder* pEncoder, const SStreamMsg* pMsg) {
890
0
  int32_t code = 0;
891
0
  int32_t lino = 0;
892
893
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->msgType));
894
895
0
_exit:
896
0
  return code;
897
0
}
898
899
0
int32_t tDecodeSStreamMsg(SDecoder* pDecoder, SStreamMsg* pMsg) {
900
0
  int32_t code = 0;
901
0
  int32_t lino;
902
903
0
  int32_t type = 0;
904
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &type));
905
0
  pMsg->msgType = type;
906
907
0
_exit:
908
0
  return code;
909
0
}
910
911
0
int32_t tEncodeSStreamStartTaskMsg(SEncoder* pEncoder, const SStreamStartTaskMsg* pStart) {
912
0
  int32_t code = 0;
913
0
  int32_t lino;
914
915
0
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pStart->header));
916
917
0
_exit:
918
919
0
  return code;
920
0
}
921
922
0
int32_t tEncodeSStreamTaskStart(SEncoder* pEncoder, const SStreamTaskStart* pTask) {
923
0
  int32_t code = 0;
924
0
  int32_t lino;
925
926
0
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
927
0
  TAOS_CHECK_EXIT(tEncodeSStreamStartTaskMsg(pEncoder, (SStreamStartTaskMsg*)&pTask->startMsg));
928
929
0
_exit:
930
931
0
  return code;
932
0
}
933
934
0
int32_t tEncodeSStreamUndeployTaskMsg(SEncoder* pEncoder, const SStreamUndeployTaskMsg* pUndeploy) {
935
0
  int32_t code = 0;
936
0
  int32_t lino;
937
938
0
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pUndeploy->header));
939
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCheckpoint));
940
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pUndeploy->doCleanup));
941
942
0
_exit:
943
944
0
  return code;
945
0
}
946
947
0
int32_t tEncodeSStreamTaskUndeploy(SEncoder* pEncoder, const SStreamTaskUndeploy* pTask) {
948
0
  int32_t code = 0;
949
0
  int32_t lino;
950
951
0
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, (SStreamTask*)&pTask->task));
952
0
  TAOS_CHECK_EXIT(tEncodeSStreamUndeployTaskMsg(pEncoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
953
954
0
_exit:
955
956
0
  return code;
957
0
}
958
959
960
0
int32_t tEncodeSStreamRecalcReq(SEncoder* pEncoder, const SStreamRecalcReq* recalc) {
961
0
  int32_t code = 0;
962
0
  int32_t lino;
963
964
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->recalcId));
965
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->start));
966
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, recalc->end));
967
968
0
_exit:
969
970
0
  return code;
971
0
}
972
973
0
int32_t tEncodeSStreamMgmtRspCont(SEncoder* pEncoder, SStreamMsgType msgType, const SStreamMgmtRspCont* pRsp) {
974
0
  int32_t code = 0;
975
0
  int32_t lino;
976
977
0
  switch (msgType) {
978
0
    case STREAM_MSG_ORIGTBL_READER_INFO: {
979
0
      int32_t vgNum = taosArrayGetSize(pRsp->vgIds);
980
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
981
982
0
      for (int32_t i = 0; i < vgNum; ++i) {
983
0
        int32_t* vgId = taosArrayGet(pRsp->vgIds, i);
984
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
985
0
      }
986
987
0
      int32_t readerNum = taosArrayGetSize(pRsp->readerList);
988
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, readerNum));
989
      
990
0
      for (int32_t i = 0; i < readerNum; ++i) {
991
0
        SStreamTaskAddr* addr = taosArrayGet(pRsp->readerList, i);
992
0
        TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, addr));
993
0
      }
994
0
      break;
995
0
    }
996
0
    case STREAM_MSG_UPDATE_RUNNER: {
997
0
      int32_t runnerNum = taosArrayGetSize(pRsp->runnerList);
998
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, runnerNum));
999
      
1000
0
      for (int32_t i = 0; i < runnerNum; ++i) {
1001
0
        SStreamRunnerTarget* target = taosArrayGet(pRsp->runnerList, i);
1002
0
        TAOS_CHECK_EXIT(tEncodeSStreamRunnerTarget(pEncoder, target));
1003
0
      }
1004
0
      break;
1005
0
    }
1006
0
    case STREAM_MSG_USER_RECALC: {
1007
0
      int32_t recalcNum = taosArrayGetSize(pRsp->recalcList);
1008
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, recalcNum));
1009
      
1010
0
      for (int32_t i = 0; i < recalcNum; ++i) {
1011
0
        SStreamRecalcReq* recalc = taosArrayGet(pRsp->recalcList, i);
1012
0
        TAOS_CHECK_EXIT(tEncodeSStreamRecalcReq(pEncoder, recalc));
1013
0
      }
1014
0
      break;
1015
0
    }
1016
0
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
1017
0
      int32_t rspNum = taosArrayGetSize(pRsp->execRspList);
1018
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
1019
      
1020
0
      for (int32_t i = 0; i < rspNum; ++i) {
1021
0
        SStreamOReaderDeployRsp* pDeployRsp = taosArrayGet(pRsp->execRspList, i);
1022
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pDeployRsp->execId));
1023
0
        int32_t vgNum = taosArrayGetSize(pDeployRsp->vgList);
1024
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, vgNum));
1025
0
        for (int32_t n = 0; n < vgNum; ++n) {
1026
0
          TAOS_CHECK_EXIT(tEncodeSStreamTaskAddr(pEncoder, taosArrayGet(pDeployRsp->vgList, n)));
1027
0
        }
1028
0
      }
1029
0
      break;
1030
0
    }
1031
0
    default:
1032
0
      break;
1033
0
  }
1034
1035
0
_exit:
1036
1037
0
  return code;
1038
0
}
1039
1040
0
int32_t tEncodeSStreamMgmtRsp(SEncoder* pEncoder, const SStreamMgmtRsp* pRsp) {
1041
0
  int32_t code = 0;
1042
0
  int32_t lino;
1043
1044
0
  TAOS_CHECK_EXIT(tEncodeSStreamMsg(pEncoder, &pRsp->header));
1045
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId));
1046
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->code));
1047
0
  TAOS_CHECK_EXIT(tEncodeStreamTask(pEncoder, &pRsp->task));
1048
0
  TAOS_CHECK_EXIT(tEncodeSStreamMgmtRspCont(pEncoder, pRsp->header.msgType, (SStreamMgmtRspCont*)&pRsp->cont));
1049
1050
0
_exit:
1051
1052
0
  return code;
1053
0
}
1054
1055
1056
0
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
1057
0
  int32_t code = 0;
1058
0
  int32_t lino;
1059
1060
0
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1061
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->streamGId));
1062
0
  int32_t deployNum = taosArrayGetSize(pRsp->deploy.streamList);
1063
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, deployNum));
1064
0
  for (int32_t i = 0; i < deployNum; ++i) {
1065
0
    SStmStreamDeploy* pStream = (SStmStreamDeploy*)taosArrayGet(pRsp->deploy.streamList, i);
1066
0
    TAOS_CHECK_EXIT(tEncodeSStmStreamDeploy(pEncoder, pStream));
1067
0
  }
1068
1069
0
  int32_t startNum = taosArrayGetSize(pRsp->start.taskList);
1070
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, startNum));
1071
0
  for (int32_t i = 0; i < startNum; ++i) {
1072
0
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
1073
0
    TAOS_CHECK_EXIT(tEncodeSStreamTaskStart(pEncoder, pTask));
1074
0
  }
1075
1076
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->undeploy.undeployAll));
1077
0
  if (!pRsp->undeploy.undeployAll) {
1078
0
    int32_t undeployNum = taosArrayGetSize(pRsp->undeploy.taskList);
1079
0
    TAOS_CHECK_EXIT(tEncodeI32(pEncoder, undeployNum));
1080
0
    for (int32_t i = 0; i < undeployNum; ++i) {
1081
0
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
1082
0
      TAOS_CHECK_EXIT(tEncodeSStreamTaskUndeploy(pEncoder, pTask));
1083
0
    }
1084
0
  }
1085
1086
0
  int32_t rspNum = taosArrayGetSize(pRsp->rsps.rspList);
1087
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, rspNum));
1088
0
  for (int32_t i = 0; i < rspNum; ++i) {
1089
0
    SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
1090
0
    TAOS_CHECK_EXIT(tEncodeSStreamMgmtRsp(pEncoder, pMgmtRsp));
1091
0
  }
1092
  
1093
0
_exit:
1094
1095
0
  tEndEncode(pEncoder);
1096
1097
0
  return code;
1098
0
}
1099
1100
0
int32_t tDecodeSStreamReaderDeployFromTrigger(SDecoder* pDecoder, SStreamReaderDeployFromTrigger* pMsg) {
1101
0
  int32_t code = 0;
1102
0
  int32_t lino;
1103
1104
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerTblName, NULL));
1105
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblUid));
1106
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->triggerTblSuid));
1107
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerTblType));
1108
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
1109
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteReCalc));
1110
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->deleteOutTbl));
1111
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
1112
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerCols, NULL));
1113
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
1114
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
1115
1116
0
_exit:
1117
1118
0
  return code;
1119
0
}
1120
1121
1122
0
int32_t tDecodeSStreamReaderDeployFromCalc(SDecoder* pDecoder, SStreamReaderDeployFromCalc* pMsg) {
1123
0
  int32_t code = 0;
1124
0
  int32_t lino;
1125
1126
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1127
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcScanPlan, NULL));
1128
1129
0
_exit:
1130
1131
0
  return code;
1132
0
}
1133
1134
1135
0
int32_t tDecodeSStreamReaderDeployMsg(SDecoder* pDecoder, SStreamReaderDeployMsg* pMsg) {
1136
0
  int32_t code = 0;
1137
0
  int32_t lino;
1138
1139
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerReader));
1140
0
  if (pMsg->triggerReader) {
1141
0
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromTrigger(pDecoder, &pMsg->msg.trigger));
1142
0
  } else {
1143
0
    TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployFromCalc(pDecoder, &pMsg->msg.calc));
1144
0
  }
1145
  
1146
0
_exit:
1147
1148
0
  return code;
1149
0
}
1150
1151
1152
0
int32_t tDecodeSStreamTaskAddr(SDecoder* pDecoder, SStreamTaskAddr* pMsg) {
1153
0
  int32_t code = 0;
1154
0
  int32_t lino;
1155
1156
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->taskId));
1157
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->nodeId));
1158
0
  TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pMsg->epset));
1159
1160
0
_exit:
1161
1162
0
  return code;
1163
0
}
1164
1165
1166
0
int32_t tDecodeSStreamRunnerTarget(SDecoder* pDecoder, SStreamRunnerTarget* pMsg) {
1167
0
  int32_t code = 0;
1168
0
  int32_t lino;
1169
1170
0
  TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, &pMsg->addr));
1171
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1172
1173
0
_exit:
1174
1175
0
  return code;
1176
0
}
1177
1178
1179
0
int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployMsg* pMsg) {
1180
0
  int32_t code = 0;
1181
0
  int32_t lino;
1182
1183
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerType));
1184
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igDisorder));
1185
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistory));
1186
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->fillHistoryFirst));
1187
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1188
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->igNoDataTrigger));
1189
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->enableMultiGroupCalc));
1190
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblVirt));
1191
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->triggerHasPF));
1192
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->isTriggerTblStb));
1193
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->partitionCols, NULL));
1194
1195
0
  int32_t addrSize = 0;
1196
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1197
0
  if (addrSize > 0) {
1198
0
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
1199
0
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
1200
0
  }
1201
0
  for (int32_t i = 0; i < addrSize; ++i) {
1202
0
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
1203
0
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
1204
0
  }
1205
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->notifyEventTypes));
1206
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1207
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->notifyHistory));
1208
1209
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->maxDelay));
1210
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->fillHistoryStartTime));
1211
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->watermark));
1212
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->expiredTime));
1213
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->idleTimeoutMs));
1214
1215
0
  switch (pMsg->triggerType) {
1216
0
    case WINDOW_TYPE_SESSION:
1217
      // session trigger
1218
0
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.session.slotId));
1219
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.session.sessionVal));
1220
0
      break;
1221
0
    case WINDOW_TYPE_STATE:
1222
      // state trigger
1223
0
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.slotId));
1224
0
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->trigger.stateWin.extend));
1225
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForType));
1226
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.stateWin.trueForCount));
1227
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.stateWin.trueForDuration));
1228
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.zeroth, NULL));
1229
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.stateWin.expr, NULL));
1230
0
      break;
1231
    
1232
0
    case WINDOW_TYPE_INTERVAL:
1233
      // slide trigger
1234
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.intervalUnit));
1235
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.slidingUnit));
1236
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.offsetUnit));
1237
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.soffsetUnit));
1238
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.sliding.precision));
1239
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.interval));
1240
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.offset));
1241
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.sliding));
1242
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.sliding.soffset));
1243
0
      break;
1244
    
1245
0
    case WINDOW_TYPE_EVENT:
1246
      // event trigger
1247
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.startCond, NULL));
1248
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.event.endCond, NULL));
1249
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForType));
1250
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->trigger.event.trueForCount));
1251
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.event.trueForDuration));
1252
0
      break;
1253
    
1254
0
    case WINDOW_TYPE_COUNT:
1255
      // count trigger
1256
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->trigger.count.condCols, NULL));
1257
      
1258
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.countVal));
1259
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.count.sliding));
1260
0
      break;
1261
    
1262
0
    case WINDOW_TYPE_PERIOD:
1263
      // period trigger
1264
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
1265
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
1266
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
1267
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
1268
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
1269
0
      break;
1270
0
    default:
1271
0
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
1272
0
      break;
1273
0
  }
1274
1275
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->eventTypes));
1276
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->placeHolderBitmap));
1277
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcTsSlotId));
1278
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triTsSlotId));
1279
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->calcPkSlotId));
1280
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pMsg->triPkSlotId));
1281
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerPrevFilter, NULL));
1282
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->triggerScanPlan, NULL));
1283
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->calcCacheScanPlan, NULL));
1284
1285
0
  int32_t readerNum = 0;
1286
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
1287
0
  if (readerNum > 0) {
1288
0
    pMsg->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
1289
0
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
1290
0
  }
1291
0
  for (int32_t i = 0; i < readerNum; ++i) {
1292
0
    SStreamTaskAddr* pAddr = (SStreamTaskAddr*)taosArrayGet(pMsg->readerList, i);
1293
0
    TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));
1294
0
  }
1295
1296
0
  int32_t runnerNum = 0;
1297
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
1298
0
  if (runnerNum > 0) {
1299
0
    pMsg->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
1300
0
    TSDB_CHECK_NULL(pMsg->runnerList, code, lino, _exit, terrno);
1301
0
  }
1302
0
  for (int32_t i = 0; i < runnerNum; ++i) {
1303
0
    SStreamRunnerTarget* pTarget = (SStreamRunnerTarget*)taosArrayGet(pMsg->runnerList, i);
1304
0
    TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, pTarget));
1305
0
  }
1306
1307
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->leaderSnodeId));
1308
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1309
0
  if (!tDecodeIsEnd(pDecoder)) {
1310
0
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
1311
0
  }
1312
0
  if (!tDecodeIsEnd(pDecoder)) {
1313
0
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->nodelayCreateSubtable));
1314
0
  }
1315
1316
0
_exit:
1317
1318
0
  return code;
1319
0
}
1320
1321
1322
1323
0
int32_t tDeserializeSFieldWithOptions(SDecoder *pDecoder, SFieldWithOptions *pField) {
1324
0
  int32_t code = 0;
1325
0
  int32_t lino;
1326
1327
0
  TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pField->name));
1328
0
  TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pField->type));
1329
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pField->flags));
1330
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->bytes));
1331
0
  TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pField->compress));
1332
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pField->typeMod));
1333
1334
0
_exit:
1335
1336
0
  return code;
1337
0
}
1338
1339
0
void destroySStreamOutCols(void* p){
1340
0
  if (p == NULL) return;
1341
0
  SStreamOutCol* col = (SStreamOutCol*)p;
1342
0
  taosMemoryFreeClear(col->expr);
1343
0
}
1344
1345
0
int32_t tDecodeSStreamRunnerDeployMsg(SDecoder* pDecoder, SStreamRunnerDeployMsg* pMsg) {
1346
0
  int32_t code = 0;
1347
0
  int32_t lino;
1348
1349
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->execReplica));
1350
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->streamName, NULL));
1351
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->pPlan, NULL));
1352
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outDBFName, NULL));
1353
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->outTblName, NULL));
1354
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->outTblType));
1355
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->calcNotifyOnly));
1356
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->topPlan));
1357
1358
0
  int32_t addrSize = 0;
1359
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
1360
0
  if (addrSize > 0) {
1361
0
    pMsg->pNotifyAddrUrls = taosArrayInit_s(POINTER_BYTES, addrSize);
1362
0
    TSDB_CHECK_NULL(pMsg->pNotifyAddrUrls, code, lino, _exit, terrno);
1363
0
  }
1364
0
  for (int32_t i = 0; i < addrSize; ++i) {
1365
0
    const char **url = taosArrayGet(pMsg->pNotifyAddrUrls, i);
1366
0
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)url, NULL));
1367
0
  }
1368
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->addOptions));
1369
1370
0
  int32_t outColNum = 0;
1371
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColNum));
1372
0
  if (outColNum > 0) {
1373
0
    pMsg->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColNum);
1374
0
    TSDB_CHECK_NULL(pMsg->outCols, code, lino, _exit, terrno);
1375
0
  }
1376
0
  for (int32_t i = 0; i < outColNum; ++i) {
1377
0
    SFieldWithOptions *pCol = taosArrayGet(pMsg->outCols, i);
1378
0
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pCol));
1379
0
  }
1380
1381
0
  int32_t outTagNum = 0;
1382
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagNum));
1383
0
  if (outTagNum > 0) {
1384
0
    pMsg->outTags = taosArrayInit_s(sizeof(SFieldWithOptions), outTagNum);
1385
0
    TSDB_CHECK_NULL(pMsg->outTags, code, lino, _exit, terrno);
1386
0
  }
1387
0
  for (int32_t i = 0; i < outTagNum; ++i) {
1388
0
    SFieldWithOptions *pTag = taosArrayGet(pMsg->outTags, i);
1389
0
    TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pTag));
1390
0
  }
1391
1392
0
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pMsg->outStbUid));
1393
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->outStbSversion));
1394
1395
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->subTblNameExpr, NULL));
1396
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pMsg->tagValueExpr, NULL));
1397
1398
0
  int32_t forceOutColsSize = 0;
1399
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
1400
0
  if (forceOutColsSize > 0) {
1401
0
    pMsg->forceOutCols = taosArrayInit_s(sizeof(SStreamOutCol), forceOutColsSize);
1402
0
    TSDB_CHECK_NULL(pMsg->forceOutCols, code, lino, _exit, terrno);
1403
0
  }
1404
0
  for (int32_t i = 0; i < forceOutColsSize; ++i) {
1405
0
    SStreamOutCol *pCoutCol = (SStreamOutCol*)taosArrayGet(pMsg->forceOutCols, i);
1406
1407
0
    TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pCoutCol->expr, NULL));
1408
0
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.type));
1409
0
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.precision));
1410
0
    TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pCoutCol->type.scale));
1411
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pCoutCol->type.bytes));
1412
0
  }
1413
1414
0
  if (!tDecodeIsEnd(pDecoder)) {
1415
0
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->lowLatencyCalc));
1416
0
  }
1417
1418
  // colCids and tagCids - always decode size, create array only if size > 0
1419
  // For backward compatibility, check if there's more data before decoding
1420
0
  if (!tDecodeIsEnd(pDecoder)) {
1421
0
    int32_t colCidsSize = 0;
1422
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &colCidsSize));
1423
0
    if (colCidsSize > 0 && colCidsSize <= TSDB_MAX_COLUMNS) {  // Sanity check
1424
0
      pMsg->colCids = taosArrayInit(colCidsSize, sizeof(int16_t));
1425
0
      TSDB_CHECK_NULL(pMsg->colCids, code, lino, _exit, terrno);
1426
0
      for (int32_t i = 0; i < colCidsSize; ++i) {
1427
0
        int16_t cid = 0;
1428
0
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
1429
0
        if (taosArrayPush(pMsg->colCids, &cid) == NULL) {
1430
0
          TAOS_CHECK_EXIT(terrno);
1431
0
        }
1432
0
      }
1433
0
    }
1434
0
  }
1435
  // Try to decode tagCids if there's more data
1436
0
  if (!tDecodeIsEnd(pDecoder)) {
1437
0
    int32_t tagCidsSize = 0;
1438
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &tagCidsSize));
1439
0
    if (tagCidsSize > 0 && tagCidsSize <= TSDB_MAX_TAGS) {  // Sanity check
1440
0
      pMsg->tagCids = taosArrayInit(tagCidsSize, sizeof(int16_t));
1441
0
      TSDB_CHECK_NULL(pMsg->tagCids, code, lino, _exit, terrno);
1442
0
      for (int32_t i = 0; i < tagCidsSize; ++i) {
1443
0
        int16_t cid = 0;
1444
0
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &cid));
1445
0
        if (taosArrayPush(pMsg->tagCids, &cid) == NULL) {
1446
0
          TAOS_CHECK_EXIT(terrno);
1447
0
        }
1448
0
      }
1449
0
    }
1450
0
  }
1451
1452
0
_exit:
1453
1454
0
  return code;
1455
0
}
1456
1457
0
int32_t tDecodeSStmTaskDeploy(SDecoder* pDecoder, SStmTaskDeploy* pTask) {
1458
0
  int32_t code = 0;
1459
0
  int32_t lino;
1460
1461
0
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1462
0
  switch (pTask->task.type) {
1463
0
    case STREAM_READER_TASK:
1464
0
      TAOS_CHECK_EXIT(tDecodeSStreamReaderDeployMsg(pDecoder, &pTask->msg.reader));
1465
0
      break;
1466
0
    case STREAM_TRIGGER_TASK:
1467
0
      TAOS_CHECK_EXIT(tDecodeSStreamTriggerDeployMsg(pDecoder, &pTask->msg.trigger));
1468
0
      break;
1469
0
    case STREAM_RUNNER_TASK:
1470
0
      TAOS_CHECK_EXIT(tDecodeSStreamRunnerDeployMsg(pDecoder, &pTask->msg.runner));
1471
0
      break;
1472
0
    default:
1473
0
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
1474
0
      break;
1475
0
  }
1476
  
1477
0
_exit:
1478
1479
0
  return code;
1480
0
}
1481
1482
1483
0
int32_t tDecodeSStmStreamDeploy(SDecoder* pDecoder, SStmStreamDeploy* pStream) {
1484
0
  int32_t code = 0;
1485
0
  int32_t lino;
1486
1487
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pStream->streamId));
1488
1489
0
  int32_t readerNum = 0;
1490
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));
1491
0
  if (readerNum > 0) {
1492
0
    pStream->readerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), readerNum);
1493
0
    TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
1494
0
  }
1495
0
  for (int32_t i = 0; i < readerNum; ++i) {
1496
0
    SStmTaskDeploy* pTask = taosArrayGet(pStream->readerTasks, i);
1497
0
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
1498
0
  }
1499
1500
0
  int32_t triggerTask = 0;
1501
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &triggerTask));
1502
0
  if (triggerTask) {
1503
0
    pStream->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskDeploy));
1504
0
    TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
1505
0
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pStream->triggerTask));
1506
0
  }
1507
  
1508
0
  int32_t runnerNum = 0;
1509
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));
1510
0
  if (runnerNum > 0) {
1511
0
    pStream->runnerTasks = taosArrayInit_s(sizeof(SStmTaskDeploy), runnerNum);
1512
0
    TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
1513
0
  }
1514
0
  for (int32_t i = 0; i < runnerNum; ++i) {
1515
0
    SStmTaskDeploy* pTask = taosArrayGet(pStream->runnerTasks, i);
1516
0
    TAOS_CHECK_EXIT(tDecodeSStmTaskDeploy(pDecoder, pTask));
1517
0
  }
1518
1519
0
_exit:
1520
1521
0
  return code;
1522
0
}
1523
1524
1525
0
int32_t tDecodeSStreamStartTaskMsg(SDecoder* pDecoder, SStreamStartTaskMsg* pStart) {
1526
0
  int32_t code = 0;
1527
0
  int32_t lino;
1528
1529
0
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pStart->header));
1530
1531
0
_exit:
1532
1533
0
  return code;
1534
0
}
1535
1536
1537
0
int32_t tDecodeSStreamTaskStart(SDecoder* pDecoder, SStreamTaskStart* pTask) {
1538
0
  int32_t code = 0;
1539
0
  int32_t lino;
1540
1541
0
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1542
0
  TAOS_CHECK_EXIT(tDecodeSStreamStartTaskMsg(pDecoder, (SStreamStartTaskMsg*)&pTask->startMsg));
1543
1544
0
_exit:
1545
1546
0
  return code;
1547
0
}
1548
1549
1550
0
int32_t tDecodeSStreamUndeployTaskMsg(SDecoder* pDecoder, SStreamUndeployTaskMsg* pUndeploy) {
1551
0
  int32_t code = 0;
1552
0
  int32_t lino;
1553
1554
0
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pUndeploy->header));
1555
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCheckpoint));
1556
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pUndeploy->doCleanup));
1557
1558
0
_exit:
1559
1560
0
  return code;
1561
0
}
1562
1563
1564
0
int32_t tDecodeSStreamTaskUndeploy(SDecoder* pDecoder, SStreamTaskUndeploy* pTask) {
1565
0
  int32_t code = 0;
1566
0
  int32_t lino;
1567
1568
0
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, (SStreamTask*)&pTask->task));
1569
0
  TAOS_CHECK_EXIT(tDecodeSStreamUndeployTaskMsg(pDecoder, (SStreamUndeployTaskMsg*)&pTask->undeployMsg));
1570
1571
0
_exit:
1572
1573
0
  return code;
1574
0
}
1575
1576
0
int32_t tDecodeSStreamRecalcReq(SDecoder* pDecoder, SStreamRecalcReq* recalc) {
1577
0
  int32_t code = 0;
1578
0
  int32_t lino;
1579
1580
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->recalcId));
1581
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->start));
1582
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &recalc->end));
1583
1584
0
_exit:
1585
1586
0
  return code;
1587
0
}
1588
1589
0
int32_t tDecodeSStreamMgmtRspCont(SDecoder* pDecoder, SStreamMsgType msgType, SStreamMgmtRspCont* pCont) {
1590
0
  int32_t code = 0;
1591
0
  int32_t lino;
1592
1593
0
  switch (msgType) {
1594
0
    case STREAM_MSG_ORIGTBL_READER_INFO: {
1595
0
      int32_t vgNum = 0;
1596
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));  
1597
0
      if (vgNum > 0) {
1598
0
        pCont->vgIds = taosArrayInit_s(sizeof(int32_t), vgNum);
1599
0
        TSDB_CHECK_NULL(pCont->vgIds, code, lino, _exit, terrno);
1600
0
      }
1601
0
      for (int32_t i = 0; i < vgNum; ++i) {
1602
0
        int32_t *vgId = taosArrayGet(pCont->vgIds, i);
1603
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, vgId));  
1604
0
      }
1605
1606
0
      int32_t readerNum = 0;
1607
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &readerNum));  
1608
0
      if (readerNum > 0) {
1609
0
        pCont->readerList = taosArrayInit_s(sizeof(SStreamTaskAddr), readerNum);
1610
0
        TSDB_CHECK_NULL(pCont->readerList, code, lino, _exit, terrno);
1611
0
      }
1612
0
      for (int32_t i = 0; i < readerNum; ++i) {
1613
0
        SStreamTaskAddr *addr = taosArrayGet(pCont->readerList, i);
1614
0
        TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, addr));  
1615
0
      }
1616
0
      break;
1617
0
    }
1618
0
    case STREAM_MSG_UPDATE_RUNNER: {
1619
0
      int32_t runnerNum = 0;
1620
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &runnerNum));  
1621
0
      if (runnerNum > 0) {
1622
0
        pCont->runnerList = taosArrayInit_s(sizeof(SStreamRunnerTarget), runnerNum);
1623
0
        TSDB_CHECK_NULL(pCont->runnerList, code, lino, _exit, terrno);
1624
0
      }
1625
0
      for (int32_t i = 0; i < runnerNum; ++i) {
1626
0
        SStreamRunnerTarget *target = taosArrayGet(pCont->runnerList, i);
1627
0
        TAOS_CHECK_EXIT(tDecodeSStreamRunnerTarget(pDecoder, target));  
1628
0
      }
1629
0
      break;
1630
0
    }
1631
0
    case STREAM_MSG_USER_RECALC: {
1632
0
      int32_t recalcNum = 0;
1633
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &recalcNum));  
1634
0
      if (recalcNum > 0) {
1635
0
        pCont->recalcList = taosArrayInit_s(sizeof(SStreamRecalcReq), recalcNum);
1636
0
        TSDB_CHECK_NULL(pCont->recalcList, code, lino, _exit, terrno);
1637
0
      }
1638
0
      for (int32_t i = 0; i < recalcNum; ++i) {
1639
0
        SStreamRecalcReq *recalc = taosArrayGet(pCont->recalcList, i);
1640
0
        TAOS_CHECK_EXIT(tDecodeSStreamRecalcReq(pDecoder, recalc));  
1641
0
      }
1642
0
      break;
1643
0
    }
1644
0
    case STREAM_MSG_RUNNER_ORIGTBL_READER: {
1645
0
      int32_t rspNum = 0, vgNum = 0;
1646
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));  
1647
0
      if (rspNum > 0) {
1648
0
        pCont->execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), rspNum);
1649
0
        TSDB_CHECK_NULL(pCont->execRspList, code, lino, _exit, terrno);
1650
0
      }
1651
0
      for (int32_t i = 0; i < rspNum; ++i) {
1652
0
        SStreamOReaderDeployRsp *pDeployRsp = taosArrayGet(pCont->execRspList, i);
1653
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pDeployRsp->execId));  
1654
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgNum));
1655
0
        if (vgNum > 0) {
1656
0
          pDeployRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
1657
0
          TSDB_CHECK_NULL(pDeployRsp->vgList, code, lino, _exit, terrno);
1658
0
        }
1659
0
        for (int32_t n = 0; n < vgNum; ++n) {
1660
0
          SStreamTaskAddr* pAddr = taosArrayGet(pDeployRsp->vgList, n);
1661
0
          TAOS_CHECK_EXIT(tDecodeSStreamTaskAddr(pDecoder, pAddr));  
1662
0
        }
1663
0
      }
1664
0
      break;
1665
0
    }
1666
0
    default:
1667
0
      break;
1668
0
  }
1669
1670
0
_exit:
1671
1672
0
  return code;
1673
0
}
1674
1675
1676
0
int32_t tDecodeSStreamMgmtRsp(SDecoder* pDecoder, SStreamMgmtRsp* pRsp) {
1677
0
  int32_t code = 0;
1678
0
  int32_t lino;
1679
1680
0
  TAOS_CHECK_EXIT(tDecodeSStreamMsg(pDecoder, &pRsp->header));
1681
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId));
1682
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->code));
1683
0
  TAOS_CHECK_EXIT(tDecodeStreamTask(pDecoder, &pRsp->task));
1684
0
  TAOS_CHECK_EXIT(tDecodeSStreamMgmtRspCont(pDecoder, pRsp->header.msgType, &pRsp->cont));
1685
1686
0
_exit:
1687
1688
0
  return code;
1689
0
}
1690
1691
0
void tFreeSStreamOReaderDeployRsp(void* param) {
1692
0
  if (NULL == param) {
1693
0
    return;
1694
0
  }
1695
1696
0
  SStreamOReaderDeployRsp* pRsp = (SStreamOReaderDeployRsp*)param;
1697
0
  taosArrayDestroy(pRsp->vgList);
1698
0
}
1699
1700
0
void tFreeSStreamMgmtRsp(void* param) {
1701
0
  if (NULL == param) {
1702
0
    return;
1703
0
  }
1704
  
1705
0
  SStreamMgmtRsp* pRsp = (SStreamMgmtRsp*)param;
1706
1707
0
  taosArrayDestroy(pRsp->cont.vgIds);
1708
0
  taosArrayDestroy(pRsp->cont.readerList);
1709
0
  taosArrayDestroy(pRsp->cont.runnerList);
1710
0
  taosArrayDestroy(pRsp->cont.recalcList);
1711
0
  taosArrayDestroyEx(pRsp->cont.execRspList, tFreeSStreamOReaderDeployRsp);
1712
0
}
1713
1714
0
void tFreeSStreamReaderDeployMsg(SStreamReaderDeployMsg* pReader) {
1715
0
  if (NULL == pReader) {
1716
0
    return;
1717
0
  }
1718
  
1719
0
  if (pReader->triggerReader) {
1720
0
    SStreamReaderDeployFromTrigger* pMsg = (SStreamReaderDeployFromTrigger*)&pReader->msg.trigger;
1721
0
    taosMemoryFree(pMsg->triggerTblName);
1722
0
    taosMemoryFree(pMsg->partitionCols);
1723
0
    taosMemoryFree(pMsg->triggerCols);
1724
0
    taosMemoryFree(pMsg->triggerScanPlan);
1725
0
    taosMemoryFree(pMsg->calcCacheScanPlan);
1726
0
  } else {
1727
0
    SStreamReaderDeployFromCalc* pMsg = (SStreamReaderDeployFromCalc*)&pReader->msg.calc;
1728
0
    taosMemoryFree(pMsg->calcScanPlan);
1729
0
  }
1730
0
}
1731
1732
0
void tFreeStreamNotifyUrl(void* param) {
1733
0
  if (NULL == param) {
1734
0
    return;
1735
0
  }
1736
1737
0
  taosMemoryFree(*(void**)param);
1738
0
}
1739
1740
0
void tFreeSStreamTriggerDeployMsg(SStreamTriggerDeployMsg* pTrigger) {
1741
0
  if (NULL == pTrigger) {
1742
0
    return;
1743
0
  }
1744
  
1745
0
  taosArrayDestroyEx(pTrigger->pNotifyAddrUrls, tFreeStreamNotifyUrl);
1746
0
  switch (pTrigger->triggerType) {
1747
0
    case WINDOW_TYPE_STATE:
1748
0
      taosMemoryFree(pTrigger->trigger.stateWin.zeroth);
1749
0
      taosMemoryFree(pTrigger->trigger.stateWin.expr);
1750
0
      break;
1751
0
    case WINDOW_TYPE_EVENT:
1752
0
      taosMemoryFree(pTrigger->trigger.event.startCond);
1753
0
      taosMemoryFree(pTrigger->trigger.event.endCond);
1754
0
      break;
1755
0
    case WINDOW_TYPE_COUNT:
1756
0
      taosMemoryFree(pTrigger->trigger.count.condCols);  
1757
0
      break;
1758
0
    default:
1759
0
      break;
1760
0
  }
1761
1762
0
  taosMemoryFree(pTrigger->partitionCols);
1763
0
  taosMemoryFree(pTrigger->triggerPrevFilter);
1764
0
  taosMemoryFree(pTrigger->triggerScanPlan);
1765
0
  taosMemoryFree(pTrigger->calcCacheScanPlan);
1766
1767
0
  taosArrayDestroy(pTrigger->readerList);
1768
0
  taosArrayDestroy(pTrigger->runnerList);
1769
0
  taosMemoryFree(pTrigger->streamName);
1770
0
}
1771
1772
0
void tFreeSStreamOutCol(void* param) {
1773
0
  if (NULL == param) {
1774
0
    return;
1775
0
  }
1776
1777
0
  SStreamOutCol* pOut = (SStreamOutCol*)param;
1778
0
  taosMemoryFree(pOut->expr);
1779
0
}
1780
1781
0
void tFreeSStreamRunnerDeployMsg(SStreamRunnerDeployMsg* pRunner) {
1782
0
  if (NULL == pRunner) {
1783
0
    return;
1784
0
  }
1785
1786
0
  taosMemoryFree(pRunner->streamName);
1787
0
  taosMemoryFree(pRunner->pPlan);
1788
0
  taosMemoryFree(pRunner->outDBFName);
1789
0
  taosMemoryFree(pRunner->outTblName);
1790
1791
0
  taosArrayDestroyEx(pRunner->pNotifyAddrUrls, tFreeStreamNotifyUrl);
1792
0
  taosArrayDestroy(pRunner->outCols);
1793
0
  taosArrayDestroy(pRunner->outTags);
1794
1795
0
  taosMemoryFree(pRunner->subTblNameExpr);
1796
0
  taosMemoryFree(pRunner->tagValueExpr);
1797
0
  taosArrayDestroyEx(pRunner->forceOutCols, tFreeSStreamOutCol);
1798
0
}
1799
1800
0
void tFreeSStmTaskDeploy(void* param) {
1801
0
  if (NULL == param) {
1802
0
    return;
1803
0
  }
1804
1805
0
  SStmTaskDeploy* pTask = (SStmTaskDeploy*)param;
1806
0
  switch (pTask->task.type)  {
1807
0
    case STREAM_READER_TASK:
1808
0
      tFreeSStreamReaderDeployMsg(&pTask->msg.reader);
1809
0
      break;
1810
0
    case STREAM_TRIGGER_TASK:
1811
0
      tFreeSStreamTriggerDeployMsg(&pTask->msg.trigger);
1812
0
      break;
1813
0
    case STREAM_RUNNER_TASK:
1814
0
      tFreeSStreamRunnerDeployMsg(&pTask->msg.runner);
1815
0
      break;
1816
0
    default:
1817
0
      break;
1818
0
  }
1819
0
}
1820
1821
1822
0
void tFreeSStmStreamDeploy(void* param) {
1823
0
  if (NULL == param) {
1824
0
    return;
1825
0
  }
1826
  
1827
0
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
1828
0
  int32_t readerNum = taosArrayGetSize(pDeploy->readerTasks);
1829
0
  for (int32_t i = 0; i < readerNum; ++i) {
1830
0
    SStmTaskDeploy* pReader = taosArrayGet(pDeploy->readerTasks, i);
1831
0
    if (!pReader->msg.reader.triggerReader && pReader->msg.reader.msg.calc.freeScanPlan) {
1832
0
      taosMemoryFreeClear(pReader->msg.reader.msg.calc.calcScanPlan);
1833
0
    }
1834
0
  }
1835
0
  taosArrayDestroy(pDeploy->readerTasks);
1836
1837
0
  if (pDeploy->triggerTask) {
1838
0
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.readerList);
1839
0
    taosArrayDestroy(pDeploy->triggerTask->msg.trigger.runnerList);
1840
0
    taosMemoryFree(pDeploy->triggerTask);
1841
0
  }
1842
1843
0
  int32_t runnerNum = taosArrayGetSize(pDeploy->runnerTasks);
1844
0
  for (int32_t i = 0; i < runnerNum; ++i) {
1845
0
    SStmTaskDeploy* pRunner = taosArrayGet(pDeploy->runnerTasks, i);
1846
0
    taosMemoryFree(pRunner->msg.runner.pPlan);
1847
0
  }
1848
0
  taosArrayDestroy(pDeploy->runnerTasks);
1849
0
}
1850
1851
0
void tDeepFreeSStmStreamDeploy(void* param) {
1852
0
  if (NULL == param) {
1853
0
    return;
1854
0
  }
1855
  
1856
0
  SStmStreamDeploy* pDeploy = (SStmStreamDeploy*)param;
1857
0
  taosArrayDestroyEx(pDeploy->readerTasks, tFreeSStmTaskDeploy);
1858
0
  tFreeSStmTaskDeploy(pDeploy->triggerTask);
1859
0
  taosMemoryFree(pDeploy->triggerTask);
1860
0
  taosArrayDestroyEx(pDeploy->runnerTasks, tFreeSStmTaskDeploy);
1861
0
}
1862
1863
1864
0
void tFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
1865
0
  if (NULL == pRsp) {
1866
0
    return;
1867
0
  }
1868
0
  taosArrayDestroyEx(pRsp->deploy.streamList, tFreeSStmStreamDeploy);
1869
0
  taosArrayDestroy(pRsp->start.taskList);
1870
0
  taosArrayDestroy(pRsp->undeploy.taskList);
1871
0
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
1872
0
}
1873
1874
0
void tDeepFreeSMStreamHbRspMsg(SMStreamHbRspMsg* pRsp) {
1875
0
  if (NULL == pRsp) {
1876
0
    return;
1877
0
  }
1878
0
  taosArrayDestroyEx(pRsp->deploy.streamList, tDeepFreeSStmStreamDeploy);
1879
0
  taosArrayDestroy(pRsp->start.taskList);
1880
0
  taosArrayDestroy(pRsp->undeploy.taskList);
1881
0
  taosArrayDestroyEx(pRsp->rsps.rspList, tFreeSStreamMgmtRsp);
1882
0
}
1883
1884
1885
1886
0
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
1887
0
  int32_t code = 0;
1888
0
  int32_t lino;
1889
1890
0
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
1891
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->streamGId));
1892
0
  int32_t deployNum = 0;
1893
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &deployNum));
1894
0
  if (deployNum > 0) {
1895
0
    pRsp->deploy.streamList = taosArrayInit_s(sizeof(SStmStreamDeploy), deployNum);
1896
0
    TSDB_CHECK_NULL(pRsp->deploy.streamList, code, lino, _exit, terrno);
1897
0
  }
1898
0
  for (int32_t i = 0; i < deployNum; ++i) {
1899
0
    SStmStreamDeploy* pStream = taosArrayGet(pRsp->deploy.streamList, i);
1900
0
    TAOS_CHECK_EXIT(tDecodeSStmStreamDeploy(pDecoder, pStream));
1901
0
  }
1902
1903
0
  int32_t startNum = 0;
1904
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &startNum));
1905
0
  if (startNum > 0) {
1906
0
    pRsp->start.taskList = taosArrayInit_s(sizeof(SStreamTaskStart), startNum);
1907
0
    TSDB_CHECK_NULL(pRsp->start.taskList, code, lino, _exit, terrno);
1908
0
  }
1909
0
  for (int32_t i = 0; i < startNum; ++i) {
1910
0
    SStreamTaskStart* pTask = (SStreamTaskStart*)taosArrayGet(pRsp->start.taskList, i);
1911
0
    TAOS_CHECK_EXIT(tDecodeSStreamTaskStart(pDecoder, pTask));
1912
0
  }
1913
1914
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->undeploy.undeployAll));
1915
0
  if (!pRsp->undeploy.undeployAll) {
1916
0
    int32_t undeployNum = 0;
1917
0
    TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &undeployNum));
1918
0
    if (undeployNum > 0) {
1919
0
      pRsp->undeploy.taskList = taosArrayInit_s(sizeof(SStreamTaskUndeploy), undeployNum);
1920
0
      TSDB_CHECK_NULL(pRsp->undeploy.taskList, code, lino, _exit, terrno);
1921
0
    }
1922
0
    for (int32_t i = 0; i < undeployNum; ++i) {
1923
0
      SStreamTaskUndeploy* pTask = (SStreamTaskUndeploy*)taosArrayGet(pRsp->undeploy.taskList, i);
1924
0
      TAOS_CHECK_EXIT(tDecodeSStreamTaskUndeploy(pDecoder, pTask));
1925
0
    }
1926
0
  }  
1927
1928
0
  int32_t rspNum = 0;
1929
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &rspNum));
1930
0
  if (rspNum > 0) {
1931
0
    pRsp->rsps.rspList = taosArrayInit_s(sizeof(SStreamMgmtRsp), rspNum);
1932
0
    TSDB_CHECK_NULL(pRsp->rsps.rspList, code, lino, _exit, terrno);
1933
0
    for (int32_t i = 0; i < rspNum; ++i) {
1934
0
      SStreamMgmtRsp* pMgmtRsp = (SStreamMgmtRsp*)taosArrayGet(pRsp->rsps.rspList, i);
1935
0
      TAOS_CHECK_EXIT(tDecodeSStreamMgmtRsp(pDecoder, pMgmtRsp));
1936
0
    }
1937
0
  }
1938
1939
0
  tEndDecode(pDecoder);
1940
1941
0
_exit:
1942
0
  return code;
1943
0
}
1944
1945
0
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
1946
0
  int32_t code = 0;
1947
0
  int32_t lino;
1948
1949
0
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1950
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
1951
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
1952
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
1953
0
  tEndEncode(pEncoder);
1954
1955
0
_exit:
1956
0
  return code;
1957
0
}
1958
1959
0
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
1960
0
  int32_t code = 0;
1961
0
  int32_t lino;
1962
1963
0
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
1964
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1965
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
1966
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
1967
0
  tEndDecode(pDecoder);
1968
1969
0
_exit:
1970
0
  return code;
1971
0
}
1972
1973
0
int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) {
1974
0
  int32_t code = 0;
1975
0
  int32_t lino;
1976
1977
0
  TAOS_CHECK_EXIT(tStartEncode(pEncoder));
1978
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
1979
0
  tEndEncode(pEncoder);
1980
1981
0
_exit:
1982
0
  return code;
1983
0
}
1984
1985
0
int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) {
1986
0
  int32_t code = 0;
1987
0
  int32_t lino;
1988
1989
0
  TAOS_CHECK_EXIT(tStartDecode(pDecoder));
1990
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
1991
0
  tEndDecode(pDecoder);
1992
1993
0
_exit:
1994
0
  return code;
1995
1996
0
}
1997
1998
1999
0
int32_t tSerializeSCMCreateStreamReqImpl(SEncoder* pEncoder, const SCMCreateStreamReq *pReq) {
2000
0
  int32_t code = TSDB_CODE_SUCCESS;
2001
0
  int32_t lino = 0;
2002
2003
0
  char*   json = NULL;
2004
0
  int32_t jsonLen = 0;
2005
0
  TAOS_CHECK_EXIT(scmCreateStreamReqToJson(pReq, false, &json, &jsonLen));
2006
0
  TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, json, jsonLen));
2007
2008
0
_exit:
2009
0
  taosMemoryFreeClear(json);
2010
0
  if (code) {
2011
0
    return code;
2012
0
  }
2013
  
2014
0
  return 0;
2015
0
}
2016
2017
0
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
2018
0
  SEncoder encoder = {0};
2019
0
  tEncoderInit(&encoder, buf, bufLen);
2020
0
  int32_t code = 0;
2021
0
  int32_t lino;
2022
2023
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2024
2025
0
  TAOS_CHECK_EXIT(tSerializeSCMCreateStreamReqImpl(&encoder, pReq));
2026
2027
0
  tEndEncode(&encoder);
2028
2029
0
_exit:
2030
0
  if (code) {
2031
0
    tEncoderClear(&encoder);
2032
0
    return code;
2033
0
  } else {
2034
0
    int32_t tlen = encoder.pos;
2035
0
    tEncoderClear(&encoder);
2036
0
    return tlen;
2037
0
  }
2038
0
  return 0;
2039
0
}
2040
2041
// Old version deserialization for backward compatibility,
2042
// especially for stream version number 7
2043
0
int32_t tDeserializeSCMCreateStreamReqImplOld(SDecoder *pDecoder, SCMCreateStreamReq *pReq, int32_t leftBytes) {
2044
0
  int32_t code = 0;
2045
0
  int32_t lino;
2046
0
  pReq->calcPkSlotId = -1;
2047
0
  pReq->triPkSlotId = -1;
2048
2049
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2050
2051
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->name, NULL));
2052
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->sql, NULL));
2053
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outDB, NULL));
2054
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->streamDB, NULL));
2055
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerDB, NULL));
2056
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerTblName, NULL));
2057
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->outTblName, NULL));
2058
2059
0
  int32_t calcDbSize = 0;
2060
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcDbSize));
2061
0
  pReq->calcDB = taosArrayInit(calcDbSize, POINTER_BYTES);
2062
0
  if (pReq->calcDB == NULL) {
2063
0
    TAOS_CHECK_EXIT(terrno);
2064
0
  }
2065
0
  for (int32_t i = 0; i < calcDbSize; ++i) {
2066
0
    char *calcDb = NULL;
2067
0
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &calcDb));
2068
0
    calcDb = taosStrndup(calcDb, TSDB_DB_FNAME_LEN);
2069
0
    if (calcDb == NULL) {
2070
0
      TAOS_CHECK_EXIT(terrno);
2071
0
    }
2072
0
    if (taosArrayPush(pReq->calcDB, &calcDb) == NULL) {
2073
0
      taosMemoryFree(calcDb);
2074
0
      TAOS_CHECK_EXIT(terrno);
2075
0
    }
2076
0
  }
2077
2078
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igExists));
2079
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerType));
2080
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igDisorder));
2081
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteReCalc));
2082
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->deleteOutTbl));
2083
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistory));
2084
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->fillHistoryFirst));
2085
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->calcNotifyOnly));
2086
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->lowLatencyCalc));
2087
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->igNoDataTrigger));
2088
2089
0
  int32_t addrSize = 0;
2090
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
2091
0
  if (addrSize > 0) {
2092
0
    pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
2093
0
    if (pReq->pNotifyAddrUrls == NULL) {
2094
0
      TAOS_CHECK_EXIT(terrno);
2095
0
    }
2096
0
  }
2097
0
  for (int32_t i = 0; i < addrSize; ++i) {
2098
0
    char *url = NULL;
2099
0
    TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
2100
0
    url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
2101
0
    if (url == NULL) {
2102
0
      TAOS_CHECK_EXIT(terrno);
2103
0
    }
2104
0
    if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
2105
0
      taosMemoryFree(url);
2106
0
      TAOS_CHECK_EXIT(terrno);
2107
0
    }
2108
0
  }
2109
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->notifyEventTypes));
2110
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->addOptions));
2111
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->notifyHistory));
2112
2113
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerFilterCols, NULL));
2114
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerCols, NULL));
2115
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->partitionCols, NULL));
2116
2117
0
  int32_t outColSize = 0;
2118
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outColSize));
2119
0
  if (outColSize > 0) {
2120
0
    pReq->outCols = taosArrayInit_s(sizeof(SFieldWithOptions), outColSize);
2121
0
    if (pReq->outCols == NULL) {
2122
0
      TAOS_CHECK_EXIT(terrno);
2123
0
    }
2124
2125
0
    for (int32_t i = 0; i < outColSize; ++i) {
2126
0
      SFieldWithOptions* pField = taosArrayGet(pReq->outCols, i);
2127
0
      TAOS_CHECK_EXIT(tDeserializeSFieldWithOptions(pDecoder, pField));
2128
0
    }
2129
0
  }
2130
2131
0
  int32_t outTagSize = 0;
2132
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outTagSize));
2133
0
  if (outTagSize > 0) {
2134
0
    pReq->outTags = taosArrayInit(outTagSize, sizeof(SFieldWithOptions));
2135
0
    if (pReq->outTags == NULL) {
2136
0
      TAOS_CHECK_EXIT(terrno);
2137
0
    }
2138
2139
0
    for (int32_t i = 0; i < outTagSize; ++i) {
2140
0
      SFieldWithOptions field = {0};
2141
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.type));
2142
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &field.flags));
2143
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &field.bytes));
2144
0
      TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, field.name));
2145
0
      if (taosArrayPush(pReq->outTags, &field) == NULL) {
2146
0
        TAOS_CHECK_EXIT(terrno);
2147
0
      }
2148
0
    }
2149
0
  }
2150
2151
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->maxDelay));
2152
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->fillHistoryStartTime));
2153
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->watermark));
2154
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->expiredTime));
2155
2156
0
  switch (pReq->triggerType) {
2157
0
    case WINDOW_TYPE_SESSION: {
2158
      // session trigger
2159
0
      TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.session.slotId));
2160
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.session.sessionVal));
2161
0
      break;
2162
0
    }
2163
0
      case WINDOW_TYPE_STATE: {
2164
        // state trigger
2165
0
        TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.slotId));
2166
0
        pReq->trigger.stateWin.extend = 0;
2167
0
        pReq->trigger.stateWin.trueForType = 0;
2168
0
        pReq->trigger.stateWin.trueForCount = 0;
2169
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.stateWin.trueForDuration));
2170
0
        break;
2171
0
      }
2172
0
      case WINDOW_TYPE_INTERVAL: {
2173
        // slide trigger
2174
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.intervalUnit));
2175
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.slidingUnit));
2176
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.offsetUnit));
2177
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.soffsetUnit));
2178
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.precision));
2179
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.interval));
2180
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.offset));
2181
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.sliding));
2182
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.sliding.soffset));
2183
0
        break;
2184
0
      }
2185
0
      case WINDOW_TYPE_EVENT: {
2186
        // event trigger
2187
0
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.startCond, NULL));
2188
0
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.event.endCond, NULL));
2189
0
        pReq->trigger.event.trueForType = 0;
2190
0
        pReq->trigger.event.trueForCount = 0;
2191
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.event.trueForDuration));
2192
0
        break;
2193
0
      }
2194
0
      case WINDOW_TYPE_COUNT: {
2195
0
        TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.count.condCols, NULL));
2196
2197
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.countVal));
2198
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.count.sliding));
2199
0
        break;
2200
0
      }
2201
0
      case WINDOW_TYPE_PERIOD: {
2202
        // period trigger
2203
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.precision));
2204
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.periodUnit));
2205
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.period.offsetUnit));
2206
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.period));
2207
0
        TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->trigger.period.offset));
2208
0
        break;
2209
0
      }
2210
0
      default:
2211
0
        TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
2212
0
  }
2213
2214
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerTblType));
2215
0
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblUid));
2216
0
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->triggerTblSuid));
2217
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->vtableCalc));
2218
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outTblType));
2219
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->outStbExists));
2220
0
  TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pReq->outStbUid));
2221
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outStbSversion));
2222
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->eventTypes));
2223
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->flags));
2224
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->tsmaId));
2225
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->placeHolderBitmap));
2226
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->calcTsSlotId));
2227
0
  TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->triTsSlotId));
2228
2229
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->triggerTblVgId));
2230
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->outTblVgId));
2231
2232
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerScanPlan, NULL));
2233
2234
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->triggerHasPF));
2235
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->triggerPrevFilter, NULL));
2236
2237
0
  int32_t calcScanPlanListSize = 0;
2238
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &calcScanPlanListSize));
2239
0
  if (calcScanPlanListSize > 0) {
2240
0
    pReq->calcScanPlanList = taosArrayInit(calcScanPlanListSize, sizeof(SStreamCalcScan));
2241
0
    if (pReq->calcScanPlanList == NULL) {
2242
0
      TAOS_CHECK_EXIT(terrno);
2243
0
    }
2244
0
    for (int32_t i = 0; i < calcScanPlanListSize; ++i) {
2245
0
      SStreamCalcScan calcScan = {0};
2246
0
      int32_t         vgListSize = 0;
2247
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgListSize));
2248
0
      if (vgListSize > 0) {
2249
0
        calcScan.vgList = taosArrayInit(vgListSize, sizeof(int32_t));
2250
0
        if (calcScan.vgList == NULL) {
2251
0
          TAOS_CHECK_EXIT(terrno);
2252
0
        }
2253
0
        for (int32_t j = 0; j < vgListSize; ++j) {
2254
0
          int32_t vgId = 0;
2255
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
2256
0
          if (taosArrayPush(calcScan.vgList, &vgId) == NULL) {
2257
0
            TAOS_CHECK_EXIT(terrno);
2258
0
          }
2259
0
        }
2260
0
      }
2261
0
      TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &calcScan.readFromCache));
2262
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&calcScan.scanPlan, NULL));
2263
0
      if (taosArrayPush(pReq->calcScanPlanList, &calcScan) == NULL) {
2264
0
        TAOS_CHECK_EXIT(terrno);
2265
0
      }
2266
0
    }
2267
0
  }
2268
2269
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfCalcSubplan));
2270
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->calcPlan, NULL));
2271
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->subTblNameExpr, NULL));
2272
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->tagValueExpr, NULL));
2273
2274
0
  int32_t forceOutColsSize = 0;
2275
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &forceOutColsSize));
2276
0
  if (forceOutColsSize > 0) {
2277
0
    pReq->forceOutCols = taosArrayInit(forceOutColsSize, sizeof(SStreamOutCol));
2278
0
    if (pReq->forceOutCols == NULL) {
2279
0
      TAOS_CHECK_EXIT(terrno);
2280
0
    }
2281
0
    for (int32_t i = 0; i < forceOutColsSize; ++i) {
2282
0
      SStreamOutCol outCol = {0};
2283
0
      int64_t       exprLen = 0;
2284
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&outCol.expr, &exprLen));
2285
0
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.type));
2286
0
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.precision));
2287
0
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &outCol.type.scale));
2288
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &outCol.type.bytes));
2289
0
      if (taosArrayPush(pReq->forceOutCols, &outCol) == NULL) {
2290
0
        TAOS_CHECK_EXIT(terrno);
2291
0
      }
2292
0
    }
2293
0
  }
2294
2295
  // LeftBytes is the size of all fields at the tail of SStreamObj.
2296
  // If there are more data in the buffer, then it means
2297
  // the new fields are added in SStreamObj, need to decode them.
2298
0
  if (pDecoder->size - pDecoder->pos > leftBytes) {
2299
0
    switch (pReq->triggerType) {
2300
0
      case WINDOW_TYPE_STATE: {
2301
        // state trigger
2302
0
        if (!tDecodeIsEnd(pDecoder)) {
2303
0
          TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->trigger.stateWin.expr, NULL));
2304
0
        }
2305
0
        if (!tDecodeIsEnd(pDecoder)) {
2306
0
          TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pReq->trigger.stateWin.extend));
2307
0
        }
2308
0
        break;
2309
0
      }
2310
0
      case WINDOW_TYPE_INTERVAL: {
2311
0
        if (!tDecodeIsEnd(pDecoder)) {
2312
0
          TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->trigger.sliding.overlap));
2313
0
        }
2314
0
        break;
2315
0
      }
2316
0
      default:
2317
0
        break;
2318
0
    }
2319
0
  }
2320
2321
0
  if (pDecoder->size - pDecoder->pos > leftBytes) {
2322
0
    if (!tDecodeIsEnd(pDecoder)) {
2323
0
      TAOS_CHECK_EXIT(tDecodeU8(pDecoder, &pReq->triggerPrec));
2324
0
    }
2325
0
  }
2326
2327
0
_exit:
2328
2329
0
  return code;
2330
0
}
2331
2332
// New deserialization using JSON
2333
// start from taosd ver-3.3.8.6, stream version number 8
2334
0
int32_t tDeserializeSCMCreateStreamReqImpl(SDecoder *pDecoder, SCMCreateStreamReq *pReq) {
2335
0
  int32_t code = 0;
2336
0
  int32_t lino;
2337
2338
0
  char* json = NULL;
2339
0
  SJson* pJson = NULL;
2340
0
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &json));
2341
0
  pJson = tjsonParse(json);
2342
0
  if (pJson == NULL) {
2343
0
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INVALID_JSON);
2344
0
  }
2345
0
  TAOS_CHECK_EXIT(jsonToSCMCreateStreamReq(pJson, pReq));
2346
2347
0
_exit:
2348
0
  taosMemoryFreeClear(json);
2349
0
  if (NULL != pJson) {
2350
0
    tjsonDelete(pJson);
2351
0
  }
2352
2353
0
  return code;
2354
0
}
2355
2356
2357
0
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
2358
0
  SDecoder decoder = {0};
2359
0
  tDecoderInit(&decoder, buf, bufLen);
2360
0
  int32_t code = 0;
2361
0
  int32_t lino;
2362
2363
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2364
  
2365
0
  code = tDeserializeSCMCreateStreamReqImpl(&decoder, pReq);
2366
0
  if (TSDB_CODE_MND_STREAM_INVALID_JSON == code) {
2367
0
    uError("invalid json for stream create request, try old deserialization");
2368
    // try old deserialization for backward compatibility
2369
0
    tDecoderClear(&decoder);
2370
0
    tDecoderInit(&decoder, buf, bufLen);
2371
0
    TAOS_CHECK_EXIT(tStartDecode(&decoder));
2372
0
    TAOS_CHECK_EXIT(tDeserializeSCMCreateStreamReqImplOld(&decoder, pReq, 0));
2373
0
  }
2374
2375
0
  tEndDecode(&decoder);
2376
2377
0
_exit:
2378
2379
0
  tDecoderClear(&decoder);
2380
0
  return code;
2381
0
}
2382
2383
2384
0
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
2385
0
  int32_t  code = 0;
2386
0
  int32_t  lino;
2387
0
  int32_t  tlen;
2388
0
  SEncoder encoder = {0};
2389
0
  tEncoderInit(&encoder, buf, bufLen);
2390
2391
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2392
2393
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->count));
2394
0
  for (int32_t i = 0; i < pReq->count; i++) {
2395
0
    int32_t nameLen = pReq->name[i] == NULL ? 0 : (int32_t)strlen(pReq->name[i]) + 1;
2396
0
    TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name[i], nameLen));
2397
0
  }
2398
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
2399
2400
0
  tEndEncode(&encoder);
2401
2402
0
_exit:
2403
0
  if (code) {
2404
0
    tlen = code;
2405
0
  } else {
2406
0
    tlen = encoder.pos;
2407
0
  }
2408
0
  tEncoderClear(&encoder);
2409
0
  return tlen;
2410
0
}
2411
2412
0
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
2413
0
  SDecoder decoder = {0};
2414
0
  int32_t  code = 0;
2415
0
  int32_t  lino;
2416
0
  tDecoderInit(&decoder, buf, bufLen);
2417
2418
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2419
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->count));
2420
0
  if (pReq->count > 0) {
2421
0
    pReq->name = taosMemoryCalloc(pReq->count, sizeof(char*));
2422
0
    if (pReq->name == NULL) {
2423
0
      code = terrno;
2424
0
      goto _exit;
2425
0
    }
2426
0
    for (int32_t i = 0; i < pReq->count; i++) {
2427
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name[i], NULL));
2428
0
    }
2429
0
  }
2430
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
2431
2432
0
  tEndDecode(&decoder);
2433
2434
0
_exit:
2435
0
  tDecoderClear(&decoder);
2436
0
  return code;
2437
0
}
2438
2439
0
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
2440
0
  if (NULL == pReq) {
2441
0
    return;
2442
0
  }
2443
0
  if (pReq->name) {
2444
0
    for (int32_t i = 0; i < pReq->count; i++) {
2445
0
      taosMemoryFreeClear(pReq->name[i]);
2446
0
    }
2447
0
    taosMemoryFreeClear(pReq->name);
2448
0
  }
2449
0
}
2450
2451
0
static FORCE_INLINE void tFreeStreamCalcScan(void* pScan) {
2452
0
  if (pScan == NULL) {
2453
0
    return;
2454
0
  }
2455
0
  SStreamCalcScan *pCalcScan = (SStreamCalcScan *)pScan;
2456
0
  taosArrayDestroy(pCalcScan->vgList);
2457
0
  taosMemoryFreeClear(pCalcScan->scanPlan);
2458
0
}
2459
2460
0
void tFreeStreamOutCol(void* pCol) {
2461
0
  if (pCol == NULL) {
2462
0
    return;
2463
0
  }
2464
0
  SStreamOutCol *pOutCol = (SStreamOutCol *)pCol;
2465
0
  taosMemoryFreeClear(pOutCol->expr);
2466
0
}
2467
2468
2469
2470
0
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
2471
0
  if (NULL == pReq) {
2472
0
    return;
2473
0
  }
2474
0
  taosMemoryFreeClear(pReq->name);
2475
0
  taosMemoryFreeClear(pReq->sql);
2476
0
  taosMemoryFreeClear(pReq->streamDB);
2477
0
  taosMemoryFreeClear(pReq->triggerDB);
2478
0
  taosMemoryFreeClear(pReq->outDB);
2479
0
  taosMemoryFreeClear(pReq->triggerTblName);
2480
0
  taosMemoryFreeClear(pReq->outTblName);
2481
2482
0
  taosArrayDestroyP(pReq->calcDB, NULL);
2483
0
  pReq->calcDB = NULL;
2484
0
  taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
2485
0
  pReq->pNotifyAddrUrls = NULL;
2486
2487
0
  taosMemoryFreeClear(pReq->triggerFilterCols);
2488
0
  taosMemoryFreeClear(pReq->triggerCols);
2489
0
  taosMemoryFreeClear(pReq->partitionCols);
2490
2491
0
  taosArrayDestroy(pReq->outTags);
2492
0
  pReq->outTags = NULL;
2493
0
  taosArrayDestroy(pReq->outCols);
2494
0
  pReq->outCols = NULL;
2495
2496
0
  switch (pReq->triggerType) {
2497
0
    case WINDOW_TYPE_STATE:
2498
0
      taosMemoryFreeClear(pReq->trigger.stateWin.zeroth);
2499
0
      taosMemoryFreeClear(pReq->trigger.stateWin.expr);
2500
0
      break;
2501
0
    case WINDOW_TYPE_EVENT:
2502
0
      taosMemoryFreeClear(pReq->trigger.event.startCond);
2503
0
      taosMemoryFreeClear(pReq->trigger.event.endCond);
2504
0
      break;
2505
0
    default:
2506
0
      break;
2507
0
  }
2508
2509
0
  taosMemoryFreeClear(pReq->triggerScanPlan);
2510
0
  taosArrayDestroyEx(pReq->calcScanPlanList, tFreeStreamCalcScan);
2511
0
  pReq->calcScanPlanList = NULL;
2512
0
  taosMemoryFreeClear(pReq->triggerPrevFilter);
2513
2514
0
  taosMemoryFreeClear(pReq->calcPlan);
2515
0
  taosMemoryFreeClear(pReq->subTblNameExpr);
2516
0
  taosMemoryFreeClear(pReq->tagValueExpr);
2517
0
  taosArrayDestroyEx(pReq->forceOutCols, tFreeStreamOutCol);
2518
0
  pReq->forceOutCols = NULL;
2519
0
  taosArrayDestroy(pReq->colCids);
2520
0
  pReq->colCids = NULL;
2521
0
  taosArrayDestroy(pReq->tagCids);
2522
0
  pReq->tagCids = NULL;
2523
0
}
2524
2525
0
int32_t tCloneStreamCreateDeployPointers(SCMCreateStreamReq *pSrc, SCMCreateStreamReq** ppDst) {
2526
0
  int32_t code = 0, lino = 0;
2527
0
  if (NULL == pSrc) {
2528
0
    return code;
2529
0
  } 
2530
2531
0
  void* p = NULL;
2532
0
  int32_t num = 0;
2533
0
  *ppDst = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq));
2534
0
  TSDB_CHECK_NULL(*ppDst, code, lino, _exit, terrno);
2535
2536
0
  SCMCreateStreamReq* pDst = *ppDst;
2537
2538
0
  if (pSrc->outDB) {
2539
0
    pDst->outDB = COPY_STR(pSrc->outDB);
2540
0
    TSDB_CHECK_NULL(pDst->outDB, code, lino, _exit, terrno);
2541
0
  }
2542
  
2543
0
  if (pSrc->triggerTblName) {
2544
0
    pDst->triggerTblName = COPY_STR(pSrc->triggerTblName);
2545
0
    TSDB_CHECK_NULL(pDst->triggerTblName, code, lino, _exit, terrno);
2546
0
  }
2547
  
2548
0
  if (pSrc->outTblName) {
2549
0
    pDst->outTblName = COPY_STR(pSrc->outTblName);
2550
0
    TSDB_CHECK_NULL(pDst->outTblName, code, lino, _exit, terrno);
2551
0
  }
2552
  
2553
0
  if (pSrc->pNotifyAddrUrls) {
2554
0
    num = taosArrayGetSize(pSrc->pNotifyAddrUrls);
2555
0
    if (num > 0) {
2556
0
      pDst->pNotifyAddrUrls = taosArrayInit(num, POINTER_BYTES);
2557
0
      TSDB_CHECK_NULL(pDst->pNotifyAddrUrls, code, lino, _exit, terrno);
2558
0
    }
2559
0
    for (int32_t i = 0; i < num; ++i) {
2560
0
      p = taosStrdup(taosArrayGetP(pSrc->pNotifyAddrUrls, i));
2561
0
      TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
2562
0
      TSDB_CHECK_NULL(taosArrayPush(pDst->pNotifyAddrUrls, &p), code, lino, _exit, terrno);
2563
0
    }
2564
0
  }
2565
  
2566
0
  if (pSrc->triggerFilterCols) {
2567
0
    pDst->triggerFilterCols = COPY_STR(pSrc->triggerFilterCols);
2568
0
    TSDB_CHECK_NULL(pDst->triggerFilterCols, code, lino, _exit, terrno);
2569
0
  }
2570
  
2571
0
  if (pSrc->triggerCols) {
2572
0
    pDst->triggerCols = COPY_STR(pSrc->triggerCols);
2573
0
    TSDB_CHECK_NULL(pDst->triggerCols, code, lino, _exit, terrno);
2574
0
  }
2575
  
2576
0
  if (pSrc->partitionCols) {
2577
0
    pDst->partitionCols = COPY_STR(pSrc->partitionCols);
2578
0
    TSDB_CHECK_NULL(pDst->partitionCols, code, lino, _exit, terrno);
2579
0
  }
2580
  
2581
0
  if (pSrc->outCols) {
2582
0
    pDst->outCols = taosArrayDup(pSrc->outCols, NULL);
2583
0
    TSDB_CHECK_NULL(pDst->outCols, code, lino, _exit, terrno);
2584
0
  }
2585
  
2586
0
  if (pSrc->outTags) {
2587
0
    pDst->outTags = taosArrayDup(pSrc->outTags, NULL);
2588
0
    TSDB_CHECK_NULL(pDst->outTags, code, lino, _exit, terrno);
2589
0
  }
2590
2591
0
  pDst->triggerType = pSrc->triggerType;
2592
  
2593
0
  switch (pSrc->triggerType) {
2594
0
    case WINDOW_TYPE_STATE:
2595
0
      pDst->trigger.stateWin.slotId = pSrc->trigger.stateWin.slotId;
2596
0
      pDst->trigger.stateWin.extend = pSrc->trigger.stateWin.extend;
2597
0
      pDst->trigger.stateWin.trueForType = pSrc->trigger.stateWin.trueForType;
2598
0
      pDst->trigger.stateWin.trueForCount = pSrc->trigger.stateWin.trueForCount;
2599
0
      pDst->trigger.stateWin.trueForDuration = pSrc->trigger.stateWin.trueForDuration;
2600
0
      if (pSrc->trigger.stateWin.zeroth) {
2601
0
        pDst->trigger.stateWin.zeroth = COPY_STR(pSrc->trigger.stateWin.zeroth);
2602
0
        TSDB_CHECK_NULL(pDst->trigger.stateWin.zeroth, code, lino, _exit, terrno);
2603
0
      }
2604
0
      if (pSrc->trigger.stateWin.expr) {
2605
0
        pDst->trigger.stateWin.expr = COPY_STR(pSrc->trigger.stateWin.expr);
2606
0
        TSDB_CHECK_NULL(pDst->trigger.stateWin.expr, code, lino, _exit, terrno);
2607
0
      }
2608
0
      break;
2609
0
    case WINDOW_TYPE_EVENT:
2610
0
      if (pSrc->trigger.event.startCond) {
2611
0
        pDst->trigger.event.startCond = COPY_STR(pSrc->trigger.event.startCond);
2612
0
        TSDB_CHECK_NULL(pDst->trigger.event.startCond, code, lino, _exit, terrno);
2613
0
      }
2614
      
2615
0
      if (pSrc->trigger.event.endCond) {
2616
0
        pDst->trigger.event.endCond = COPY_STR(pSrc->trigger.event.endCond);
2617
0
        TSDB_CHECK_NULL(pDst->trigger.event.endCond, code, lino, _exit, terrno);
2618
0
      }
2619
0
      pDst->trigger.event.trueForType = pSrc->trigger.event.trueForType;
2620
0
      pDst->trigger.event.trueForCount = pSrc->trigger.event.trueForCount;
2621
0
      pDst->trigger.event.trueForDuration = pSrc->trigger.event.trueForDuration;
2622
0
      break;
2623
0
    case WINDOW_TYPE_COUNT:
2624
0
      pDst->trigger.count.countVal = pSrc->trigger.count.countVal;
2625
0
      pDst->trigger.count.sliding = pSrc->trigger.count.sliding;
2626
0
      if (pSrc->trigger.count.condCols) {
2627
0
        pDst->trigger.count.condCols = COPY_STR(pSrc->trigger.count.condCols);
2628
0
        TSDB_CHECK_NULL(pDst->trigger.count.condCols, code, lino, _exit, terrno);
2629
0
      }
2630
0
      break;
2631
0
    default:
2632
0
      pDst->trigger = pSrc->trigger;
2633
0
      break;
2634
0
  }
2635
2636
2637
0
  if (pSrc->triggerScanPlan) {
2638
0
    pDst->triggerScanPlan = COPY_STR(pSrc->triggerScanPlan);
2639
0
    TSDB_CHECK_NULL(pDst->triggerScanPlan, code, lino, _exit, terrno);
2640
0
  }
2641
  
2642
0
  if (pSrc->calcScanPlanList) {
2643
0
    num = taosArrayGetSize(pSrc->calcScanPlanList);
2644
0
    if (num > 0) {
2645
0
      pDst->calcScanPlanList = taosArrayInit(num, sizeof(SStreamCalcScan));
2646
0
      TSDB_CHECK_NULL(pDst->calcScanPlanList, code, lino, _exit, terrno);
2647
0
    }
2648
0
    for (int32_t i = 0; i < num; ++i) {
2649
0
      SStreamCalcScan* sscan = taosArrayGet(pSrc->calcScanPlanList, i);
2650
0
      SStreamCalcScan  dscan = {.readFromCache = sscan->readFromCache};
2651
2652
0
      dscan.vgList = taosArrayDup(sscan->vgList, NULL);
2653
0
      TSDB_CHECK_NULL(dscan.vgList, code, lino, _exit, terrno);
2654
2655
0
      dscan.scanPlan = COPY_STR(sscan->scanPlan);
2656
0
      TSDB_CHECK_NULL(dscan.scanPlan, code, lino, _exit, terrno);
2657
      
2658
0
      TSDB_CHECK_NULL(taosArrayPush(pDst->calcScanPlanList, &dscan), code, lino, _exit, terrno);
2659
0
    }
2660
0
  }
2661
  
2662
0
  if (pSrc->triggerPrevFilter) {
2663
0
    pDst->triggerPrevFilter = COPY_STR(pSrc->triggerPrevFilter);
2664
0
    TSDB_CHECK_NULL(pDst->triggerPrevFilter, code, lino, _exit, terrno);
2665
0
  }
2666
  
2667
0
  if (pSrc->calcPlan) {
2668
0
    pDst->calcPlan = COPY_STR(pSrc->calcPlan);
2669
0
    TSDB_CHECK_NULL(pDst->calcPlan, code, lino, _exit, terrno);
2670
0
  }
2671
  
2672
0
  if (pSrc->subTblNameExpr) {
2673
0
    pDst->subTblNameExpr = COPY_STR(pSrc->subTblNameExpr);
2674
0
    TSDB_CHECK_NULL(pDst->subTblNameExpr, code, lino, _exit, terrno);
2675
0
  }
2676
  
2677
0
  if (pSrc->tagValueExpr) {
2678
0
    pDst->tagValueExpr = COPY_STR(pSrc->tagValueExpr);
2679
0
    TSDB_CHECK_NULL(pDst->tagValueExpr, code, lino, _exit, terrno);
2680
0
  }
2681
  
2682
0
  if (pSrc->forceOutCols) {
2683
0
    num = taosArrayGetSize(pSrc->forceOutCols);
2684
0
    if (num > 0) {
2685
0
      pDst->forceOutCols = taosArrayInit(num, sizeof(SStreamOutCol));
2686
0
      TSDB_CHECK_NULL(pDst->forceOutCols, code, lino, _exit, terrno);
2687
0
    }
2688
0
    for (int32_t i = 0; i < num; ++i) {
2689
0
      SStreamOutCol* scol = taosArrayGet(pSrc->forceOutCols, i);
2690
0
      SStreamOutCol  dcol = {.type = scol->type};
2691
2692
0
      dcol.expr = COPY_STR(scol->expr);
2693
0
      TSDB_CHECK_NULL(dcol.expr, code, lino, _exit, terrno);
2694
      
2695
0
      TSDB_CHECK_NULL(taosArrayPush(pDst->forceOutCols, &dcol), code, lino, _exit, terrno);
2696
0
    }
2697
0
  }
2698
2699
0
  if (pSrc->colCids) {
2700
0
    pDst->colCids = taosArrayDup(pSrc->colCids, NULL);
2701
0
    TSDB_CHECK_NULL(pDst->colCids, code, lino, _exit, terrno);
2702
0
  }
2703
2704
0
  if (pSrc->tagCids) {
2705
0
    pDst->tagCids = taosArrayDup(pSrc->tagCids, NULL);
2706
0
    TSDB_CHECK_NULL(pDst->tagCids, code, lino, _exit, terrno);
2707
0
  }
2708
2709
0
  pDst->triggerTblUid = pSrc->triggerTblUid;
2710
0
  pDst->triggerTblSuid = pSrc->triggerTblSuid;
2711
0
  pDst->triggerTblType = pSrc->triggerTblType;
2712
0
  pDst->triggerPrec = pSrc->triggerPrec;
2713
0
  pDst->deleteReCalc = pSrc->deleteReCalc;
2714
0
  pDst->deleteOutTbl = pSrc->deleteOutTbl;
2715
0
  pDst->flags = pSrc->flags;
2716
  
2717
0
_exit:
2718
2719
0
  if (code) {
2720
0
    tFreeSCMCreateStreamReq(pDst);
2721
0
    uError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
2722
0
  }
2723
2724
0
  return code;
2725
0
}
2726
2727
2728
0
int32_t tSerializeSMPauseStreamReq(void *buf, int32_t bufLen, const SMPauseStreamReq *pReq) {
2729
0
  int32_t  code = 0;
2730
0
  int32_t  lino;
2731
0
  int32_t  tlen;
2732
0
  SEncoder encoder = {0};
2733
0
  tEncoderInit(&encoder, buf, bufLen);
2734
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2735
2736
0
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2737
0
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
2738
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
2739
0
  tEndEncode(&encoder);
2740
2741
0
_exit:
2742
0
  if (code) {
2743
0
    tlen = code;
2744
0
  } else {
2745
0
    tlen = encoder.pos;
2746
0
  }
2747
0
  tEncoderClear(&encoder);
2748
0
  return tlen;
2749
0
}
2750
2751
0
int32_t tDeserializeSMPauseStreamReq(void *buf, int32_t bufLen, SMPauseStreamReq *pReq) {
2752
0
  SDecoder decoder = {0};
2753
0
  int32_t  code = 0;
2754
0
  int32_t  lino;
2755
2756
0
  tDecoderInit(&decoder, buf, bufLen);
2757
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2758
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
2759
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
2760
0
  tEndDecode(&decoder);
2761
2762
0
_exit:
2763
0
  tDecoderClear(&decoder);
2764
0
  return code;
2765
0
}
2766
2767
0
void tFreeMPauseStreamReq(SMPauseStreamReq *pReq) {
2768
0
  taosMemoryFreeClear(pReq->name);
2769
0
}
2770
2771
0
int32_t tSerializeSMResumeStreamReq(void *buf, int32_t bufLen, const SMResumeStreamReq *pReq) {
2772
0
  SEncoder encoder = {0};
2773
0
  int32_t  code = 0;
2774
0
  int32_t  lino;
2775
0
  int32_t  tlen;
2776
0
  tEncoderInit(&encoder, buf, bufLen);
2777
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2778
0
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2779
0
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
2780
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
2781
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igUntreated));
2782
0
  tEndEncode(&encoder);
2783
2784
0
_exit:
2785
0
  if (code) {
2786
0
    tlen = code;
2787
0
  } else {
2788
0
    tlen = encoder.pos;
2789
0
  }
2790
0
  tEncoderClear(&encoder);
2791
0
  return tlen;
2792
0
}
2793
2794
0
int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamReq *pReq) {
2795
0
  SDecoder decoder = {0};
2796
0
  int32_t  code = 0;
2797
0
  int32_t  lino;
2798
2799
0
  tDecoderInit(&decoder, buf, bufLen);
2800
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2801
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
2802
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
2803
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igUntreated));
2804
0
  tEndDecode(&decoder);
2805
2806
0
_exit:
2807
0
  tDecoderClear(&decoder);
2808
0
  return code;
2809
0
}
2810
2811
0
void tFreeMResumeStreamReq(SMResumeStreamReq *pReq) {
2812
0
  taosMemoryFreeClear(pReq->name);
2813
0
}
2814
2815
0
int32_t tSerializeSMRecalcStreamReq(void *buf, int32_t bufLen, const SMRecalcStreamReq *pReq) {
2816
0
  SEncoder encoder = {0};
2817
0
  int32_t  code = 0;
2818
0
  int32_t  lino;
2819
0
  int32_t  tlen;
2820
0
  tEncoderInit(&encoder, buf, bufLen);
2821
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2822
0
  int32_t nameLen = pReq->name == NULL ? 0 : (int32_t)strlen(pReq->name) + 1;
2823
0
  TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pReq->name, nameLen));
2824
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->calcAll));
2825
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
2826
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
2827
0
  tEndEncode(&encoder);
2828
2829
0
_exit:
2830
0
  if (code) {
2831
0
    tlen = code;
2832
0
  } else {
2833
0
    tlen = encoder.pos;
2834
0
  }
2835
0
  tEncoderClear(&encoder);
2836
0
  return tlen;
2837
0
}
2838
2839
0
int32_t tDeserializeSMRecalcStreamReq(void *buf, int32_t bufLen, SMRecalcStreamReq *pReq) {
2840
0
  SDecoder decoder = {0};
2841
0
  int32_t  code = 0;
2842
0
  int32_t  lino;
2843
2844
0
  tDecoderInit(&decoder, buf, bufLen);
2845
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2846
2847
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void**)&pReq->name, NULL));
2848
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->calcAll));
2849
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
2850
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
2851
0
  tEndDecode(&decoder);
2852
2853
0
_exit:
2854
0
  tDecoderClear(&decoder);
2855
0
  return code;
2856
0
}
2857
2858
0
void tFreeMRecalcStreamReq(SMRecalcStreamReq *pReq) {
2859
0
  taosMemoryFreeClear(pReq->name);
2860
0
}
2861
2862
0
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
2863
0
  int32_t code = 0;
2864
0
  int32_t lino;
2865
2866
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
2867
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->taskId));
2868
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->fetchIdx));
2869
2870
0
_exit:
2871
0
  return code;
2872
0
}
2873
2874
0
int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) {
2875
0
  SEncoder encoder = {0};
2876
0
  int32_t  code = 0;
2877
0
  int32_t  lino;
2878
0
  int32_t  tlen;
2879
0
  tEncoderInit(&encoder, buf, bufLen);
2880
2881
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2882
0
  TAOS_CHECK_EXIT(tEncodeStreamProgressReq(&encoder, pReq));
2883
2884
0
  tEndEncode(&encoder);
2885
2886
0
_exit:
2887
0
  if (code) {
2888
0
    tlen = code;
2889
0
  } else {
2890
0
    tlen = encoder.pos;
2891
0
  }
2892
0
  tEncoderClear(&encoder);
2893
0
  return tlen;
2894
0
}
2895
2896
0
static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) {
2897
0
  int32_t code = 0;
2898
0
  int32_t lino;
2899
2900
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
2901
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->taskId));
2902
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->fetchIdx));
2903
2904
0
_exit:
2905
0
  return code;
2906
0
}
2907
2908
0
int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) {
2909
0
  SDecoder decoder = {0};
2910
0
  int32_t  code = 0;
2911
0
  int32_t  lino;
2912
2913
0
  tDecoderInit(&decoder, (char *)buf, bufLen);
2914
2915
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2916
0
  TAOS_CHECK_EXIT(tDecodeStreamProgressReq(&decoder, pReq));
2917
2918
0
  tEndDecode(&decoder);
2919
2920
0
_exit:
2921
0
  tDecoderClear(&decoder);
2922
0
  return code;
2923
0
}
2924
2925
0
static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) {
2926
0
  int32_t code = 0;
2927
0
  int32_t lino;
2928
2929
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
2930
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->fillHisFinished));
2931
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->progressDelay));
2932
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->fetchIdx));
2933
2934
0
_exit:
2935
0
  return code;
2936
0
}
2937
2938
0
int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) {
2939
0
  SEncoder encoder = {0};
2940
0
  int32_t  code = 0;
2941
0
  int32_t  lino;
2942
0
  int32_t  tlen;
2943
0
  tEncoderInit(&encoder, buf, bufLen);
2944
2945
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2946
0
  TAOS_CHECK_EXIT(tEncodeStreamProgressRsp(&encoder, pRsp));
2947
2948
0
  tEndEncode(&encoder);
2949
2950
0
_exit:
2951
0
  if (code) {
2952
0
    tlen = code;
2953
0
  } else {
2954
0
    tlen = encoder.pos;
2955
0
  }
2956
0
  tEncoderClear(&encoder);
2957
0
  return tlen;
2958
0
}
2959
2960
0
static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) {
2961
0
  int32_t code = 0;
2962
0
  int32_t lino;
2963
2964
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
2965
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished));
2966
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->progressDelay));
2967
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->fetchIdx));
2968
2969
0
_exit:
2970
0
  return code;
2971
0
}
2972
2973
0
int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) {
2974
0
  SDecoder decoder = {0};
2975
0
  int32_t  code = 0;
2976
0
  int32_t  lino;
2977
2978
0
  tDecoderInit(&decoder, buf, bufLen);
2979
2980
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
2981
0
  TAOS_CHECK_EXIT(tDecodeStreamProgressRsp(&decoder, pRsp));
2982
2983
0
  tEndDecode(&decoder);
2984
2985
0
_exit:
2986
0
  tDecoderClear(&decoder);
2987
0
  return code;
2988
0
}
2989
2990
0
int32_t tSerializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, const SSTriggerOrigTableInfoRsp* pRsp){
2991
0
  SEncoder encoder = {0};
2992
0
  int32_t  code = TSDB_CODE_SUCCESS;
2993
0
  int32_t  lino = 0;
2994
0
  int32_t  tlen = 0;
2995
2996
0
  tEncoderInit(&encoder, buf, bufLen);
2997
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2998
2999
0
  int32_t size = taosArrayGetSize(pRsp->cols);
3000
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
3001
0
  for (int32_t i = 0; i < size; ++i) {
3002
0
    OTableInfoRsp* oInfo = taosArrayGet(pRsp->cols, i);
3003
0
    if (oInfo == NULL) {
3004
0
      uError("col id is NULL at index %d", i);
3005
0
      code = TSDB_CODE_INVALID_PARA;
3006
0
      goto _exit;
3007
0
    }
3008
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->suid));
3009
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, oInfo->uid));
3010
0
    TAOS_CHECK_EXIT(tEncodeI16(&encoder, oInfo->cid));
3011
0
  }
3012
3013
0
  tEndEncode(&encoder);
3014
3015
0
_exit:
3016
0
  if (code != TSDB_CODE_SUCCESS) {
3017
0
    tlen = code;
3018
0
  } else {
3019
0
    tlen = encoder.pos;
3020
0
  }
3021
0
  tEncoderClear(&encoder);
3022
0
  return tlen;
3023
0
}
3024
3025
0
int32_t tDserializeSTriggerOrigTableInfoRsp(void* buf, int32_t bufLen, SSTriggerOrigTableInfoRsp* pRsp){
3026
0
  SDecoder decoder = {0};
3027
0
  int32_t  code = TSDB_CODE_SUCCESS;
3028
0
  int32_t  lino = 0;
3029
3030
0
  tDecoderInit(&decoder, buf, bufLen);
3031
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3032
3033
0
  int32_t size = 0;
3034
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
3035
0
  pRsp->cols = taosArrayInit(size, sizeof(OTableInfoRsp));
3036
0
  if (pRsp->cols == NULL) {
3037
0
    code = terrno;
3038
0
    uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
3039
0
    goto _exit;
3040
0
  }
3041
0
  for (int32_t i = 0; i < size; ++i) {
3042
0
    OTableInfoRsp* oInfo = taosArrayReserve(pRsp->cols, 1);
3043
0
    if (oInfo == NULL) {
3044
0
      code = terrno;
3045
0
      uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
3046
0
      goto _exit;
3047
0
    }
3048
0
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->suid));
3049
0
    TAOS_CHECK_RETURN(tDecodeI64(&decoder, &oInfo->uid));
3050
0
    TAOS_CHECK_RETURN(tDecodeI16(&decoder, &oInfo->cid));
3051
0
  }
3052
3053
0
  tEndDecode(&decoder);
3054
3055
0
_exit:
3056
0
  tDecoderClear(&decoder);
3057
0
  return code;
3058
0
}
3059
3060
0
void    tDestroySTriggerOrigTableInfoRsp(SSTriggerOrigTableInfoRsp* pRsp){
3061
0
  taosArrayDestroy(pRsp->cols);
3062
0
}
3063
3064
0
void tDestroySTriggerPullRequest(SSTriggerPullRequestUnion* pReq) {
3065
0
  if (pReq == NULL) return;
3066
0
  if (pReq->base.type == STRIGGER_PULL_WAL_DATA_NEW || pReq->base.type == STRIGGER_PULL_WAL_CALC_DATA_NEW) {
3067
0
    SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
3068
0
    taosArrayDestroy(pRequest->versions);
3069
0
    tSimpleHashCleanup(pRequest->ranges);
3070
0
  } else if (pReq->base.type == STRIGGER_PULL_TSDB_DATA) {
3071
0
    SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
3072
0
    if (pRequest->cids != NULL) {
3073
0
      taosArrayDestroy(pRequest->cids);
3074
0
      pRequest->cids = NULL;
3075
0
    }
3076
0
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_INFO) {
3077
0
    SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
3078
0
    if (pRequest->cids != NULL) {
3079
0
      taosArrayDestroy(pRequest->cids);
3080
0
      pRequest->cids = NULL;
3081
0
    }
3082
0
    if (pRequest->uids != NULL) {
3083
0
      taosArrayDestroy(pRequest->uids);
3084
0
      pRequest->uids = NULL;
3085
0
    }
3086
0
  } else if (pReq->base.type == STRIGGER_PULL_VTABLE_PSEUDO_COL) {
3087
0
    SSTriggerVirTablePseudoColRequest *pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
3088
0
    if (pRequest->cids != NULL) {
3089
0
      taosArrayDestroy(pRequest->cids);
3090
0
      pRequest->cids = NULL;
3091
0
    }
3092
0
  } else if (pReq->base.type == STRIGGER_PULL_OTABLE_INFO) {
3093
0
    SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
3094
0
    if (pRequest->cols != NULL) {
3095
0
      taosArrayDestroy(pRequest->cols);
3096
0
      pRequest->cols = NULL;
3097
0
    }
3098
0
  } else if (pReq->base.type == STRIGGER_PULL_SET_TABLE) {
3099
0
    SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
3100
0
    tSimpleHashCleanup(pRequest->uidInfoTrigger);
3101
0
    tSimpleHashCleanup(pRequest->uidInfoCalc);
3102
0
  }
3103
0
}
3104
3105
0
int32_t encodePlainArray(SEncoder *encoder, SArray *pArr) {
3106
0
  int32_t  code = TSDB_CODE_SUCCESS;
3107
0
  int32_t  lino = 0;
3108
0
  int32_t  nEle = taosArrayGetSize(pArr);
3109
0
  uint8_t* buf = (nEle > 0) ? TARRAY_DATA(pArr) : NULL;
3110
0
  int32_t  len = (nEle > 0) ? (nEle * pArr->elemSize) : 0;
3111
0
  TAOS_CHECK_EXIT(tEncodeBinary(encoder, buf, len));
3112
3113
0
_exit:
3114
0
  return code;
3115
0
}
3116
3117
0
int32_t decodePlainArray(SDecoder* decoder, SArray** ppArr, uint32_t elemSize) {
3118
0
  int32_t  code = TSDB_CODE_SUCCESS;
3119
0
  int32_t  lino = 0;
3120
0
  void*    buf = NULL;
3121
0
  uint64_t len = 0;
3122
0
  TAOS_CHECK_EXIT(tDecodeBinaryAlloc(decoder, &buf, &len));
3123
3124
0
  if (len > 0) {
3125
0
    *ppArr = taosArrayInit(0, elemSize);
3126
0
    TSDB_CHECK_NULL(*ppArr, code, lino, _exit, terrno);
3127
0
    TSWAP((*ppArr)->pData, buf);
3128
0
    (*ppArr)->size = (*ppArr)->capacity = len / elemSize;
3129
0
  }
3130
3131
0
_exit:
3132
0
  if (buf != NULL) {
3133
0
    taosMemoryFree(buf);
3134
0
  }
3135
0
  return code;
3136
0
}
3137
3138
0
static int32_t encodeSetTableMapInfo(SEncoder* encoder, SSHashObj* pInfo) {
3139
0
  int32_t  code = TSDB_CODE_SUCCESS;
3140
0
  int32_t  lino = 0;
3141
0
  int32_t size = tSimpleHashGetSize(pInfo);
3142
0
  TAOS_CHECK_EXIT(tEncodeI32(encoder, size));
3143
0
  int32_t iter = 0;
3144
0
  void*   px = tSimpleHashIterate(pInfo, NULL, &iter);
3145
0
  while (px != NULL) {
3146
0
    int64_t* uid = tSimpleHashGetKey(px, NULL);
3147
0
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *uid));
3148
0
    TAOS_CHECK_EXIT(tEncodeI64(encoder, *(uid + 1)));
3149
0
    SSHashObj* info = *(SSHashObj**)px;
3150
0
    int32_t len = tSimpleHashGetSize(info);
3151
0
    TAOS_CHECK_EXIT(tEncodeI32(encoder, len));
3152
0
    int32_t iter1 = 0;
3153
0
    void*   px1 = tSimpleHashIterate(info, NULL, &iter1);
3154
0
    while (px1 != NULL) {
3155
0
      int16_t* slot = tSimpleHashGetKey(px1, NULL);
3156
0
      int16_t* cid = (int16_t*)px1;
3157
0
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *slot));
3158
0
      TAOS_CHECK_EXIT(tEncodeI16(encoder, *cid));
3159
3160
0
      px1 = tSimpleHashIterate(info, px1, &iter1);
3161
0
    }
3162
3163
0
    px = tSimpleHashIterate(pInfo, px, &iter);
3164
0
  }
3165
  
3166
0
_exit:
3167
0
  return code;
3168
0
}
3169
3170
0
int32_t tSerializeSTriggerPullRequest(void* buf, int32_t bufLen, const SSTriggerPullRequest* pReq) {
3171
0
  SEncoder encoder = {0};
3172
0
  int32_t  code = TSDB_CODE_SUCCESS;
3173
0
  int32_t  lino = 0;
3174
0
  int32_t  tlen = 0;
3175
3176
0
  tEncoderInit(&encoder, buf, bufLen);
3177
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
3178
3179
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
3180
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
3181
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->readerTaskId));
3182
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
3183
3184
0
  switch (pReq->type) {
3185
0
    case STRIGGER_PULL_SET_TABLE: {
3186
0
      SSTriggerSetTableRequest* pRequest = (SSTriggerSetTableRequest*)pReq;
3187
0
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoTrigger));
3188
0
      TAOS_CHECK_EXIT(encodeSetTableMapInfo(&encoder, pRequest->uidInfoCalc));
3189
0
      break;
3190
0
    }
3191
0
    case STRIGGER_PULL_LAST_TS: {
3192
0
      break;
3193
0
    }
3194
0
    case STRIGGER_PULL_FIRST_TS: {
3195
0
      SSTriggerFirstTsRequest* pRequest = (SSTriggerFirstTsRequest*)pReq;
3196
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3197
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
3198
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3199
0
      break;
3200
0
    }
3201
0
    case STRIGGER_PULL_TSDB_META: {
3202
0
      SSTriggerTsdbMetaRequest* pRequest = (SSTriggerTsdbMetaRequest*)pReq;
3203
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
3204
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->endTime));
3205
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3206
0
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
3207
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3208
0
      break;
3209
0
    }
3210
0
    case STRIGGER_PULL_TSDB_META_NEXT: {
3211
0
      break;
3212
0
    }
3213
0
    case STRIGGER_PULL_TSDB_TS_DATA: {
3214
0
      SSTriggerTsdbTsDataRequest* pRequest = (SSTriggerTsdbTsDataRequest*)pReq;
3215
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
3216
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
3217
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
3218
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
3219
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3220
0
      break;
3221
0
    }
3222
0
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
3223
0
      SSTriggerTsdbTriggerDataRequest* pRequest = (SSTriggerTsdbTriggerDataRequest*)pReq;
3224
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->startTime));
3225
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3226
0
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
3227
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3228
0
      break;
3229
0
    }
3230
0
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
3231
0
      break;
3232
0
    }
3233
0
    case STRIGGER_PULL_TSDB_CALC_DATA: {
3234
0
      SSTriggerTsdbCalcDataRequest* pRequest = (SSTriggerTsdbCalcDataRequest*)pReq;
3235
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3236
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
3237
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
3238
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3239
0
      break;
3240
0
    }
3241
0
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
3242
0
      break;
3243
0
    }
3244
0
    case STRIGGER_PULL_TSDB_DATA: {
3245
0
      SSTriggerTsdbDataRequest* pRequest = (SSTriggerTsdbDataRequest*)pReq;
3246
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->suid));
3247
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
3248
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->skey));
3249
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ekey));
3250
0
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
3251
0
      TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRequest->order));
3252
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3253
0
      break;
3254
0
    }
3255
0
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
3256
0
      break;
3257
0
    }
3258
0
    case STRIGGER_PULL_WAL_META_NEW: {
3259
0
      SSTriggerWalMetaNewRequest* pRequest = (SSTriggerWalMetaNewRequest*)pReq;
3260
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
3261
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ctime));
3262
0
      break;
3263
0
    }
3264
0
    case STRIGGER_PULL_WAL_DATA_NEW:
3265
0
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3266
0
      SSTriggerWalDataNewRequest* pRequest = (SSTriggerWalDataNewRequest*)pReq;
3267
0
      int32_t                     nVersion = taosArrayGetSize(pRequest->versions);
3268
0
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVersion));
3269
0
      for (int32_t i = 0; i < nVersion; i++) {
3270
0
        int64_t ver = *(int64_t*)TARRAY_GET_ELEM(pRequest->versions, i);
3271
0
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, ver));
3272
0
      }
3273
0
      int32_t nRanges = tSimpleHashGetSize(pRequest->ranges);
3274
0
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nRanges));
3275
0
      int32_t iter = 0;
3276
0
      void*   px = tSimpleHashIterate(pRequest->ranges, NULL, &iter);
3277
0
      while (px != NULL) {
3278
0
        uint64_t* gid = tSimpleHashGetKey(px, NULL);
3279
0
        TAOS_CHECK_EXIT(tEncodeU64(&encoder, *gid));
3280
0
        int64_t* key = (int64_t*)px;
3281
0
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[0]));
3282
0
        TAOS_CHECK_EXIT(tEncodeI64(&encoder, key[1]));
3283
3284
0
        px = tSimpleHashIterate(pRequest->ranges, px, &iter);
3285
0
      }
3286
0
      break;
3287
0
    }
3288
0
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3289
0
      SSTriggerWalMetaDataNewRequest* pRequest = (SSTriggerWalMetaDataNewRequest*)pReq;
3290
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->lastVer));
3291
0
      break;
3292
0
    }
3293
0
    case STRIGGER_PULL_GROUP_COL_VALUE: {
3294
0
      SSTriggerGroupColValueRequest* pRequest = (SSTriggerGroupColValueRequest*)pReq;
3295
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->gid));
3296
0
      break;
3297
0
    }
3298
0
    case STRIGGER_PULL_VTABLE_INFO: {
3299
0
      SSTriggerVirTableInfoRequest* pRequest = (SSTriggerVirTableInfoRequest*)pReq;
3300
0
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
3301
0
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->uids));
3302
0
      TAOS_CHECK_EXIT(tEncodeBool(&encoder, pRequest->fetchAllTable));
3303
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3304
0
      break;
3305
0
    }
3306
0
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
3307
0
      SSTriggerVirTablePseudoColRequest* pRequest = (SSTriggerVirTablePseudoColRequest*)pReq;
3308
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->uid));
3309
0
      TAOS_CHECK_EXIT(encodePlainArray(&encoder, pRequest->cids));
3310
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3311
0
      break;
3312
0
    }
3313
0
    case STRIGGER_PULL_OTABLE_INFO: {
3314
0
      SSTriggerOrigTableInfoRequest* pRequest = (SSTriggerOrigTableInfoRequest*)pReq;
3315
0
      int32_t size = taosArrayGetSize(pRequest->cols);
3316
0
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
3317
0
      for (int32_t i = 0; i < size; ++i) {
3318
0
        OTableInfo* oInfo = taosArrayGet(pRequest->cols, i);
3319
0
        if (oInfo == NULL) {
3320
0
          uError("col id is NULL at index %d", i);
3321
0
          code = TSDB_CODE_INVALID_PARA;
3322
0
          goto _exit;
3323
0
        }
3324
0
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refTableName));
3325
0
        TAOS_CHECK_EXIT(tEncodeCStr(&encoder, oInfo->refColName));
3326
0
      }
3327
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRequest->ver));
3328
0
      break; 
3329
0
    }
3330
0
    default: {
3331
0
      uError("unknown pull type %d", pReq->type);
3332
0
      code = TSDB_CODE_INVALID_PARA;
3333
0
      break;
3334
0
    }
3335
0
  }
3336
3337
0
  tEndEncode(&encoder);
3338
3339
0
_exit:
3340
0
  if (code != TSDB_CODE_SUCCESS) {
3341
0
    tlen = code;
3342
0
  } else {
3343
0
    tlen = encoder.pos;
3344
0
  }
3345
0
  tEncoderClear(&encoder);
3346
0
  return tlen;
3347
0
}
3348
3349
0
static void destroyHash(void* data){
3350
0
  if (data){
3351
0
    SSHashObj* tmp = *(SSHashObj**)data;
3352
0
    tSimpleHashCleanup(tmp);
3353
0
  }
3354
0
}
3355
3356
0
static int32_t decodeSetTableMapInfo(SDecoder* decoder, SSHashObj** ppInfo) {
3357
0
  int32_t  code = TSDB_CODE_SUCCESS;
3358
0
  int32_t  lino = 0;
3359
0
  int32_t size = 0;
3360
0
  TAOS_CHECK_EXIT(tDecodeI32(decoder, &size));
3361
0
  *ppInfo = tSimpleHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
3362
0
  if (*ppInfo == NULL) {
3363
0
    TAOS_CHECK_EXIT(terrno);
3364
0
  }
3365
0
  tSimpleHashSetFreeFp(*ppInfo, destroyHash);
3366
  
3367
0
  for (int32_t i = 0; i < size; ++i) {
3368
0
    int64_t id[2] = {0};
3369
0
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id));
3370
0
    TAOS_CHECK_EXIT(tDecodeI64(decoder, id+1));
3371
0
    int32_t len = 0;
3372
0
    TAOS_CHECK_EXIT(tDecodeI32(decoder, &len));
3373
0
    SSHashObj* tmp = tSimpleHashInit(len, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT));
3374
0
    if (tmp == NULL) {
3375
0
      TAOS_CHECK_EXIT(terrno);
3376
0
    }
3377
0
    TAOS_CHECK_EXIT(tSimpleHashPut(*ppInfo, id, sizeof(id), &tmp, POINTER_BYTES));
3378
3379
0
    for (int32_t j = 0; j < len; ++j) {
3380
0
      int16_t slotId = 0;
3381
0
      int16_t cid = 0;
3382
0
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &slotId));
3383
0
      TAOS_CHECK_EXIT(tDecodeI16(decoder, &cid));
3384
0
      TAOS_CHECK_EXIT(tSimpleHashPut(tmp, &slotId, sizeof(slotId), &cid, sizeof(cid)));
3385
0
    }
3386
0
  }
3387
0
_exit:
3388
0
  if (code != TSDB_CODE_SUCCESS) {
3389
0
    tSimpleHashCleanup(*ppInfo);
3390
0
    *ppInfo = NULL;
3391
0
  }
3392
0
  return code;
3393
0
}
3394
3395
0
int32_t tDeserializeSTriggerPullRequest(void* buf, int32_t bufLen, SSTriggerPullRequestUnion* pReq) {
3396
0
  SDecoder decoder = {0};
3397
0
  int32_t  code = TSDB_CODE_SUCCESS;
3398
0
  int32_t  lino = 0;
3399
3400
0
  tDecoderInit(&decoder, buf, bufLen);
3401
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3402
3403
0
  int32_t type = 0;
3404
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
3405
0
  SSTriggerPullRequest* pBase = &(pReq->base);
3406
0
  pBase->type = type;
3407
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->streamId));
3408
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->readerTaskId));
3409
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBase->sessionId));
3410
3411
0
  switch (type) {
3412
0
    case STRIGGER_PULL_SET_TABLE: {
3413
0
      SSTriggerSetTableRequest* pRequest = &(pReq->setTableReq);
3414
0
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoTrigger));
3415
0
      TAOS_CHECK_EXIT(decodeSetTableMapInfo(&decoder, &pRequest->uidInfoCalc));
3416
0
      break;
3417
0
    }
3418
0
    case STRIGGER_PULL_LAST_TS: {
3419
0
      break;
3420
0
    }
3421
0
    case STRIGGER_PULL_FIRST_TS: {
3422
0
      SSTriggerFirstTsRequest* pRequest = &(pReq->firstTsReq);
3423
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
3424
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
3425
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3426
0
      break;
3427
0
    }
3428
0
    case STRIGGER_PULL_TSDB_META: {
3429
0
      SSTriggerTsdbMetaRequest* pRequest = &(pReq->tsdbMetaReq);
3430
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
3431
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->endTime));
3432
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
3433
0
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
3434
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3435
0
      break;
3436
0
    }
3437
0
    case STRIGGER_PULL_TSDB_META_NEXT: {
3438
0
      break;
3439
0
    }
3440
0
    case STRIGGER_PULL_TSDB_TS_DATA: {
3441
0
      SSTriggerTsdbTsDataRequest* pRequest = &(pReq->tsdbTsDataReq);
3442
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
3443
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
3444
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
3445
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
3446
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3447
0
      break;
3448
0
    }
3449
0
    case STRIGGER_PULL_TSDB_TRIGGER_DATA: {
3450
0
      SSTriggerTsdbTriggerDataRequest* pRequest = &(pReq->tsdbTriggerDataReq);
3451
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->startTime));
3452
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
3453
0
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
3454
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3455
0
      break;
3456
0
    }
3457
0
    case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT: {
3458
0
      break;
3459
0
    }
3460
0
    case STRIGGER_PULL_TSDB_CALC_DATA: {
3461
0
      SSTriggerTsdbCalcDataRequest* pRequest = &(pReq->tsdbCalcDataReq);
3462
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
3463
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
3464
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
3465
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3466
0
      break;
3467
0
    }
3468
0
    case STRIGGER_PULL_TSDB_CALC_DATA_NEXT: {
3469
0
      break;
3470
0
    }
3471
0
    case STRIGGER_PULL_TSDB_DATA: {
3472
0
      SSTriggerTsdbDataRequest* pRequest = &(pReq->tsdbDataReq);
3473
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->suid));
3474
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
3475
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->skey));
3476
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ekey));
3477
0
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
3478
0
      TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRequest->order));
3479
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3480
0
      break;
3481
0
    }
3482
0
    case STRIGGER_PULL_TSDB_DATA_NEXT: {
3483
0
      break;
3484
0
    }
3485
0
    case STRIGGER_PULL_WAL_META_NEW: {
3486
0
      SSTriggerWalMetaNewRequest* pRequest = &(pReq->walMetaNewReq);
3487
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
3488
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ctime));
3489
0
      break;
3490
0
    }
3491
0
    case STRIGGER_PULL_WAL_DATA_NEW:
3492
0
    case STRIGGER_PULL_WAL_CALC_DATA_NEW: {
3493
0
      SSTriggerWalDataNewRequest* pRequest = &(pReq->walDataNewReq);
3494
0
      int32_t                     nVersion = 0;
3495
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVersion));
3496
0
      pRequest->versions = taosArrayInit_s(sizeof(int64_t), nVersion);
3497
0
      for (int32_t i = 0; i < nVersion; i++) {
3498
0
        int64_t* pVer = TARRAY_GET_ELEM(pRequest->versions, i);
3499
0
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, pVer));
3500
0
      }
3501
0
      int32_t nRanges = 0;
3502
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nRanges));
3503
0
      pRequest->ranges = tSimpleHashInit(nRanges, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
3504
0
      if (pRequest->ranges == NULL) {
3505
0
        TAOS_CHECK_EXIT(terrno);
3506
0
      }
3507
0
      for (int32_t i = 0; i < nRanges; i++) {
3508
0
        uint64_t gid = 0;
3509
0
        int64_t pRange[2] = {0};
3510
0
        TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
3511
0
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[0]));
3512
0
        TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRange[1]));
3513
0
        TAOS_CHECK_EXIT(tSimpleHashPut(pRequest->ranges, &gid, sizeof(gid), pRange, sizeof(pRange)));
3514
0
      }
3515
0
      break;
3516
0
    }
3517
0
    case STRIGGER_PULL_WAL_META_DATA_NEW: {
3518
0
      SSTriggerWalMetaDataNewRequest* pRequest = &(pReq->walMetaDataNewReq);
3519
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->lastVer));
3520
0
      break;
3521
0
    }
3522
0
    case STRIGGER_PULL_GROUP_COL_VALUE: {
3523
0
      SSTriggerGroupColValueRequest* pRequest = &(pReq->groupColValueReq);
3524
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->gid));
3525
0
      break;
3526
0
    }
3527
0
    case STRIGGER_PULL_VTABLE_INFO: {
3528
0
      SSTriggerVirTableInfoRequest* pRequest = &(pReq->virTableInfoReq);
3529
0
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
3530
0
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->uids, sizeof(int64_t)));
3531
0
      TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pRequest->fetchAllTable));
3532
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3533
0
      break;
3534
0
    }
3535
0
    case STRIGGER_PULL_VTABLE_PSEUDO_COL: {
3536
0
      SSTriggerVirTablePseudoColRequest* pRequest = &(pReq->virTablePseudoColReq);
3537
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->uid));
3538
0
      TAOS_CHECK_EXIT(decodePlainArray(&decoder, &pRequest->cids, sizeof(col_id_t)));
3539
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3540
0
      break;
3541
0
    }
3542
0
    case STRIGGER_PULL_OTABLE_INFO: {
3543
0
      SSTriggerOrigTableInfoRequest* pRequest = &(pReq->origTableInfoReq);
3544
0
      int32_t size = 0;
3545
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
3546
0
      pRequest->cols = taosArrayInit(size, sizeof(OTableInfo));
3547
0
      if (pRequest->cols == NULL) {
3548
0
        code = terrno;
3549
0
        uError("failed to allocate memory for cids, size: %d, errno: %d", size, code);
3550
0
        goto _exit;
3551
0
      }
3552
0
      for (int32_t i = 0; i < size; ++i) {
3553
0
        OTableInfo* oInfo = taosArrayReserve(pRequest->cols, 1);
3554
0
        if (oInfo == NULL) {
3555
0
          code = terrno;
3556
0
          uError("failed to reserve memory for OTableInfo, size: %d, errno: %d", size, code);
3557
0
          goto _exit;
3558
0
        }
3559
0
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refTableName));
3560
0
        TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, oInfo->refColName));
3561
0
      }
3562
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRequest->ver));
3563
3564
0
      break;
3565
0
    }
3566
0
    default: {
3567
0
      uError("unknown pull type %d", type);
3568
0
      code = TSDB_CODE_INVALID_PARA;
3569
0
      break;
3570
0
    }
3571
0
  }
3572
3573
0
  tEndDecode(&decoder);
3574
3575
0
_exit:
3576
0
  tDecoderClear(&decoder);
3577
0
  return code;
3578
0
}
3579
3580
0
static int32_t tSerializeSTriggerCalcParam(SEncoder* pEncoder, SArray* pParams, bool ignoreNotificationInfo, bool full) {
3581
0
  int32_t size = full ? taosArrayGetSize(pParams) : 0;
3582
0
  int32_t code = 0;
3583
0
  int32_t lino = 0;
3584
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
3585
0
  for (int32_t i = 0; i < size; ++i) {
3586
0
    SSTriggerCalcParam* param = taosArrayGet(pParams, i);
3587
0
    if (param == NULL) {
3588
0
      TAOS_CHECK_EXIT(terrno);
3589
0
    }
3590
0
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
3591
0
    if (pEncoder->data) {
3592
0
      TAOS_MEMCPY(pEncoder->data + pEncoder->pos, param, plainFieldSize);
3593
0
    }
3594
0
    pEncoder->pos += plainFieldSize;
3595
3596
0
    if (!ignoreNotificationInfo) {
3597
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, param->notifyType));
3598
0
      uint64_t len = (param->extraNotifyContent != NULL) ? strlen(param->extraNotifyContent) + 1 : 0;
3599
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (uint8_t*)param->extraNotifyContent, len));
3600
0
    }
3601
0
  }
3602
0
_exit:
3603
0
  return code;
3604
0
}
3605
3606
0
void tDestroySSTriggerCalcParam(void* ptr) {
3607
0
  SSTriggerCalcParam* pParam = ptr;
3608
0
  if (pParam && pParam->extraNotifyContent != NULL) {
3609
0
    taosMemoryFreeClear(pParam->extraNotifyContent);
3610
0
  }
3611
0
  if (pParam && pParam->resultNotifyContent != NULL) {
3612
0
    taosMemoryFreeClear(pParam->resultNotifyContent);
3613
0
  }
3614
0
  if (pParam && pParam->pExternalWindowData != NULL) {
3615
0
    taosArrayDestroyEx(pParam->pExternalWindowData, tDestroySStreamGroupValue);
3616
0
    pParam->pExternalWindowData = NULL;
3617
0
  }
3618
0
}
3619
3620
0
void tDestroySSTriggerGroupCalcInfo(void* ptr) {
3621
0
  SSTriggerGroupCalcInfo* pCalcInfo = ptr;
3622
0
  if (pCalcInfo && pCalcInfo->pParams != NULL) {
3623
0
    taosArrayDestroyEx(pCalcInfo->pParams, tDestroySSTriggerCalcParam);
3624
0
    pCalcInfo->pParams = NULL;
3625
0
  }
3626
0
  if (pCalcInfo && pCalcInfo->pGroupColVals != NULL) {
3627
0
    taosArrayDestroyEx(pCalcInfo->pGroupColVals, tDestroySStreamGroupValue);
3628
0
    pCalcInfo->pGroupColVals = NULL;
3629
0
  }
3630
0
}
3631
3632
0
void tDestroySSTriggerGroupReadInfo(void* ptr) {
3633
0
  SSTriggerGroupReadInfo* pReadInfo = ptr;
3634
0
  if (pReadInfo && pReadInfo->pTables) {
3635
0
    taosArrayDestroy(pReadInfo->pTables);
3636
0
    pReadInfo->pTables = NULL;
3637
0
  }
3638
0
}
3639
3640
0
void tDestroySSTriggerGroupReadInfoArray(void* ptr) {
3641
0
  if (ptr != NULL && *(SArray**)ptr != NULL) {
3642
0
    SArray* pArray = *(SArray**)ptr;
3643
0
    taosArrayDestroyEx(pArray, tDestroySSTriggerGroupReadInfo);
3644
0
    *(SArray**)ptr = NULL;
3645
0
  }
3646
0
}
3647
3648
0
void tDestroySStreamGroupValue(void* ptr) {
3649
0
  SStreamGroupValue* pValue = ptr;
3650
0
  if ((pValue != NULL) && (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL)) {
3651
0
    taosMemoryFreeClear(pValue->data.pData);
3652
0
    pValue->data.nData = 0;
3653
0
  }
3654
0
}
3655
3656
0
static int32_t tDeserializeSTriggerCalcParam(SDecoder* pDecoder, SArray**ppParams, bool ignoreNotificationInfo) {
3657
0
  int32_t size = 0, code = 0, lino = 0;
3658
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
3659
0
  if (size <= 0) {
3660
0
    return code;
3661
0
  }
3662
  
3663
0
  *ppParams = taosArrayInit(size, sizeof(SSTriggerCalcParam));
3664
0
  if (*ppParams == NULL) {
3665
0
    TAOS_CHECK_EXIT(terrno);
3666
0
  }
3667
0
  for (int32_t i = 0; i < size; ++i) {
3668
0
    SSTriggerCalcParam* param = taosArrayReserve(*ppParams, 1);
3669
0
    if (param == NULL) {
3670
0
      TAOS_CHECK_EXIT(terrno);
3671
0
    }
3672
0
    int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
3673
0
    TAOS_MEMCPY(param, pDecoder->data + pDecoder->pos, plainFieldSize);
3674
0
    pDecoder->pos += plainFieldSize;
3675
3676
0
    if (!ignoreNotificationInfo) {
3677
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &param->notifyType));
3678
0
      uint64_t len = 0;
3679
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&param->extraNotifyContent, &len));
3680
0
    }
3681
0
  }
3682
3683
0
_exit:
3684
0
  return code;
3685
0
}
3686
3687
0
static int32_t tSerializeStriggerGroupColVals(SEncoder* pEncoder, SArray* pGroupColVals, int32_t vgId) {
3688
0
  int32_t code = TSDB_CODE_SUCCESS;
3689
0
  int32_t lino = 0;
3690
3691
0
  int32_t size = taosArrayGetSize(pGroupColVals);
3692
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size));
3693
0
  for (int32_t i = 0; i < size; ++i) {
3694
0
    SStreamGroupValue* pValue = taosArrayGet(pGroupColVals, i);
3695
0
    if (pValue == NULL) {
3696
0
      TAOS_CHECK_EXIT(terrno);
3697
0
    }
3698
0
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isNull));
3699
0
    if (pValue->isNull) {
3700
0
      continue;
3701
0
    }
3702
0
    TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pValue->isTbname));
3703
0
    if (pValue->isTbname) {
3704
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->uid));
3705
0
      if (vgId != -1) { pValue->vgId = vgId; }
3706
0
      TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pValue->vgId));
3707
0
    }
3708
0
    TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pValue->data.type));
3709
0
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
3710
0
      TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pValue->data.pData, pValue->data.nData));
3711
0
    } else {
3712
0
      TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pValue->data.val));
3713
0
    }
3714
0
  }
3715
3716
0
_exit:
3717
0
  return code;
3718
0
}
3719
3720
0
static int32_t tDeserializeStriggerGroupColVals(SDecoder* pDecoder, SArray** ppGroupColVals) {
3721
0
  int32_t code = TSDB_CODE_SUCCESS;
3722
0
  int32_t lino = 0;
3723
0
  int32_t size = 0;
3724
3725
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size));
3726
0
  taosArrayClearEx(*ppGroupColVals, tDestroySStreamGroupValue);
3727
0
  if (size > 0) {
3728
0
    if (*ppGroupColVals == NULL) {
3729
0
      *ppGroupColVals = taosArrayInit(size, sizeof(SStreamGroupValue));
3730
0
      if (*ppGroupColVals == NULL) {
3731
0
        TAOS_CHECK_EXIT(terrno);
3732
0
      }
3733
0
    } else {
3734
0
      TAOS_CHECK_EXIT(taosArrayEnsureCap(*ppGroupColVals, size));
3735
0
    }
3736
0
  }
3737
0
  for (int32_t i = 0; i < size; ++i) {
3738
0
    SStreamGroupValue* pValue = taosArrayReserve(*ppGroupColVals, 1);
3739
0
    if (pValue == NULL) {
3740
0
      TAOS_CHECK_EXIT(terrno);
3741
0
    }
3742
0
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isNull));
3743
0
    if (pValue->isNull) {
3744
0
      continue;
3745
0
    }
3746
0
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pValue->isTbname));
3747
0
    if (pValue->isTbname) {
3748
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->uid));
3749
0
      TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pValue->vgId));
3750
0
    }
3751
0
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pValue->data.type));
3752
0
    if (IS_VAR_DATA_TYPE(pValue->data.type)) {
3753
0
      uint64_t len = 0;
3754
0
      TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pValue->data.pData, &len));
3755
0
      pValue->data.nData = len;
3756
0
    } else {
3757
0
      TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pValue->data.val));
3758
0
    }
3759
0
  }
3760
0
_exit:
3761
0
  return code;
3762
0
}
3763
3764
0
int32_t tSerializeSStreamGroupInfo(void* buf, int32_t bufLen, const SStreamGroupInfo* gInfo, int32_t vgId) {
3765
0
  SEncoder encoder = {0};
3766
0
  int32_t  code = TSDB_CODE_SUCCESS;
3767
0
  int32_t  lino = 0;
3768
0
  int32_t  tlen = 0;
3769
3770
0
  tEncoderInit(&encoder, buf, bufLen);
3771
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
3772
3773
0
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, gInfo->gInfo, vgId));
3774
3775
0
  tEndEncode(&encoder);
3776
3777
0
_exit:
3778
0
  if (code != TSDB_CODE_SUCCESS) {
3779
0
    tlen = code;
3780
0
  } else {
3781
0
    tlen = encoder.pos;
3782
0
  }
3783
0
  tEncoderClear(&encoder);
3784
0
  return tlen;
3785
0
}
3786
3787
0
int32_t tDeserializeSStreamGroupInfo(void* buf, int32_t bufLen, SStreamGroupInfo* gInfo) {
3788
0
  SDecoder decoder = {0};
3789
0
  int32_t  code = TSDB_CODE_SUCCESS;
3790
0
  int32_t  lino = 0;
3791
0
  int32_t  size = 0;
3792
3793
0
  tDecoderInit(&decoder, buf, bufLen);
3794
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3795
3796
0
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &gInfo->gInfo));
3797
3798
0
  tEndDecode(&decoder);
3799
3800
0
_exit:
3801
0
  tDecoderClear(&decoder);
3802
0
  return code;
3803
0
}
3804
3805
0
static int32_t tSerializeSSTriggerGroupCalcInfo(SEncoder* pEncoder, SSTriggerGroupCalcInfo* pInfo) {
3806
0
  int32_t code = 0;
3807
0
  int32_t lino = 0;
3808
3809
0
  TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pParams, false, true));
3810
0
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pGroupColVals, -1));
3811
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->createTable));
3812
3813
0
_exit:
3814
0
  return code;
3815
0
}
3816
3817
0
static int32_t tSerializeSSTriggerGroupReadInfo(SEncoder* pEncoder, SSTriggerGroupReadInfo* pInfo) {
3818
0
  int32_t code = 0;
3819
0
  int32_t lino = 0;
3820
3821
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->gid));
3822
0
  int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
3823
0
  if (pEncoder->data) {
3824
0
    TAOS_MEMCPY(pEncoder->data + pEncoder->pos, &pInfo->firstParam, plainFieldSize);
3825
0
  }
3826
0
  pEncoder->pos += plainFieldSize;
3827
0
  if (pEncoder->data) {
3828
0
    TAOS_MEMCPY(pEncoder->data + pEncoder->pos, &pInfo->lastParam, plainFieldSize);
3829
0
  }
3830
0
  pEncoder->pos += plainFieldSize;
3831
3832
0
  int32_t nTables = taosArrayGetSize(pInfo->pTables);
3833
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nTables));
3834
0
  if (pEncoder->data && nTables > 0) {
3835
0
    TAOS_MEMCPY(pEncoder->data + pEncoder->pos, pInfo->pTables->pData, nTables * sizeof(int64_t));
3836
0
  }
3837
0
  pEncoder->pos += nTables * sizeof(int64_t);
3838
3839
0
_exit:
3840
0
  return code;
3841
0
}
3842
3843
0
int32_t tSerializeSTriggerCalcRequest(void* buf, int32_t bufLen, const SSTriggerCalcRequest* pReq) {
3844
0
  SEncoder encoder = {0};
3845
0
  int32_t  code = TSDB_CODE_SUCCESS;
3846
0
  int32_t  lino = 0;
3847
0
  int32_t  tlen = 0;
3848
3849
0
  tEncoderInit(&encoder, buf, bufLen);
3850
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
3851
3852
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
3853
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
3854
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
3855
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->triggerType));
3856
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->isMultiGroupCalc));
3857
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->stbPartByTbname));
3858
3859
0
  if (!pReq->isMultiGroupCalc) {
3860
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
3861
0
    TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(&encoder, pReq->params, false, true));
3862
0
    TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
3863
0
    TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->createTable));
3864
0
  } else {
3865
0
    int32_t nGroups = tSimpleHashGetSize(pReq->pGroupCalcInfos);
3866
0
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, nGroups));
3867
0
    int32_t                 iter1 = 0;
3868
0
    SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashIterate(pReq->pGroupCalcInfos, NULL, &iter1);
3869
0
    while (pCalcInfo != NULL) {
3870
0
      int64_t* gid = tSimpleHashGetKey(pCalcInfo, NULL);
3871
0
      TAOS_CHECK_EXIT(tEncodeI64(&encoder, *gid));
3872
0
      TAOS_CHECK_EXIT(tSerializeSSTriggerGroupCalcInfo(&encoder, pCalcInfo));
3873
0
      pCalcInfo = tSimpleHashIterate(pReq->pGroupCalcInfos, pCalcInfo, &iter1);
3874
0
    }
3875
3876
0
    int32_t nVnodes = tSimpleHashGetSize(pReq->pGroupReadInfos);
3877
0
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, nVnodes));
3878
0
    int32_t iter2 = 0;
3879
0
    void*   px = tSimpleHashIterate(pReq->pGroupReadInfos, NULL, &iter2);
3880
0
    while (px != NULL) {
3881
0
      int32_t* vgId = tSimpleHashGetKey(px, NULL);
3882
0
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
3883
0
      SArray* pInfos = *(SArray**)px;
3884
0
      int32_t nGroups = taosArrayGetSize(pInfos);
3885
0
      TAOS_CHECK_EXIT(tEncodeI32(&encoder, nGroups));
3886
0
      for (int32_t i = 0; i < nGroups; ++i) {
3887
0
        SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfos, i);
3888
0
        TAOS_CHECK_EXIT(tSerializeSSTriggerGroupReadInfo(&encoder, pReadInfo));
3889
0
      }
3890
0
      px = tSimpleHashIterate(pReq->pGroupReadInfos, px, &iter2);
3891
0
    }
3892
0
  }
3893
3894
0
  TAOS_CHECK_EXIT(tEncodeBool(&encoder, pReq->isWindowTrigger));
3895
0
  TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->precision));
3896
3897
0
  tEndEncode(&encoder);
3898
3899
0
_exit:
3900
0
  if (code != TSDB_CODE_SUCCESS) {
3901
0
    tlen = code;
3902
0
  } else {
3903
0
    tlen = encoder.pos;
3904
0
  }
3905
0
  tEncoderClear(&encoder);
3906
0
  return tlen;
3907
0
}
3908
3909
0
static int32_t tDeserializeSSTriggerGroupCalcInfo(SDecoder* pDecoder, SSTriggerGroupCalcInfo* pInfo) {
3910
0
  int32_t code = 0;
3911
0
  int32_t lino = 0;
3912
3913
0
  TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pParams, false));
3914
0
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pGroupColVals));
3915
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->createTable));
3916
3917
0
_exit:
3918
0
  return code;
3919
0
}
3920
3921
0
static int32_t tDeserializeSSTriggerGroupReadInfo(SDecoder* pDecoder, SSTriggerGroupReadInfo* pInfo) {
3922
0
  int32_t code = 0;
3923
0
  int32_t lino = 0;
3924
3925
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->gid));
3926
0
  int64_t plainFieldSize = offsetof(SSTriggerCalcParam, notifyType);
3927
0
  TAOS_MEMCPY(&pInfo->firstParam, pDecoder->data + pDecoder->pos, plainFieldSize);
3928
0
  pDecoder->pos += plainFieldSize;
3929
0
  TAOS_MEMCPY(&pInfo->lastParam, pDecoder->data + pDecoder->pos, plainFieldSize);
3930
0
  pDecoder->pos += plainFieldSize;
3931
3932
0
  int32_t nTables = 0;
3933
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nTables));
3934
0
  if (nTables > 0) {
3935
0
    pInfo->pTables = taosArrayInit_s(sizeof(int64_t), nTables);
3936
0
    QUERY_CHECK_NULL(pInfo->pTables, code, lino, _exit, terrno);
3937
0
    TAOS_MEMCPY(pInfo->pTables->pData, pDecoder->data + pDecoder->pos, nTables * sizeof(int64_t));
3938
0
  }
3939
0
  pDecoder->pos += nTables * sizeof(int64_t);
3940
3941
0
_exit:
3942
0
  return code;
3943
0
}
3944
3945
0
int32_t tDeserializeSTriggerCalcRequest(void* buf, int32_t bufLen, SSTriggerCalcRequest* pReq) {
3946
0
  SDecoder decoder = {0};
3947
0
  int32_t  code = TSDB_CODE_SUCCESS;
3948
0
  int32_t  lino = 0;
3949
3950
0
  tDecoderInit(&decoder, buf, bufLen);
3951
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
3952
3953
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
3954
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
3955
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
3956
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->triggerType));
3957
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->isMultiGroupCalc));
3958
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->stbPartByTbname));
3959
3960
0
  if (!pReq->isMultiGroupCalc) {
3961
0
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
3962
0
    TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(&decoder, &pReq->params, false));
3963
0
    TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
3964
0
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->createTable));
3965
0
  } else {
3966
0
    pReq->pGroupCalcInfos = tSimpleHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3967
0
    QUERY_CHECK_NULL(pReq->pGroupCalcInfos, code, lino, _exit, terrno);
3968
0
    tSimpleHashSetFreeFp(pReq->pGroupCalcInfos, tDestroySSTriggerGroupCalcInfo);
3969
0
    int32_t nGroups = 0;
3970
0
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nGroups));
3971
0
    for (int32_t i = 0; i < nGroups; i++) {
3972
0
      int64_t gid = 0;
3973
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &gid));
3974
0
      SSTriggerGroupCalcInfo info = {0};
3975
0
      TAOS_CHECK_EXIT(tSimpleHashPut(pReq->pGroupCalcInfos, &gid, sizeof(int64_t), &info, sizeof(info)));
3976
0
      SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashGet(pReq->pGroupCalcInfos, &gid, sizeof(int64_t));
3977
0
      QUERY_CHECK_NULL(pCalcInfo, code, lino, _exit, TSDB_CODE_INTERNAL_ERROR);
3978
0
      TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupCalcInfo(&decoder, pCalcInfo));
3979
0
    }
3980
3981
0
    int32_t nVnodes = 0;
3982
0
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nVnodes));
3983
0
    pReq->pGroupReadInfos = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
3984
0
    QUERY_CHECK_NULL(pReq->pGroupReadInfos, code, lino, _exit, terrno);
3985
0
    tSimpleHashSetFreeFp(pReq->pGroupReadInfos, tDestroySSTriggerGroupReadInfoArray);
3986
0
    for (int32_t i = 0; i < nVnodes; i++) {
3987
0
      int32_t vgId = 0;
3988
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
3989
0
      int32_t nGroups = 0;
3990
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nGroups));
3991
0
      SArray* pInfos = taosArrayInit_s(sizeof(SSTriggerGroupReadInfo), nGroups);
3992
0
      QUERY_CHECK_NULL(pInfos, code, lino, _exit, terrno);
3993
0
      code = tSimpleHashPut(pReq->pGroupReadInfos, &vgId, sizeof(int32_t), &pInfos, POINTER_BYTES);
3994
0
      if (code != TSDB_CODE_SUCCESS) {
3995
0
        taosArrayDestroy(pInfos);
3996
0
        TAOS_CHECK_EXIT(code);
3997
0
      }
3998
0
      for (int32_t j = 0; j < nGroups; ++j) {
3999
0
        SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfos, j);
4000
0
        TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupReadInfo(&decoder, pReadInfo));
4001
0
      }
4002
0
    }
4003
0
  }
4004
4005
0
  if (!tDecodeIsEnd(&decoder)) {
4006
0
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &pReq->isWindowTrigger));
4007
0
    TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->precision));
4008
0
  }
4009
4010
0
  tEndDecode(&decoder);
4011
4012
0
_exit:
4013
0
  tDecoderClear(&decoder);
4014
0
  return code;
4015
0
}
4016
4017
0
void tDestroySTriggerCalcRequest(SSTriggerCalcRequest* pReq) {
4018
0
  if (pReq != NULL) {
4019
0
    if (pReq->params != NULL) {
4020
0
      taosArrayDestroyEx(pReq->params, tDestroySSTriggerCalcParam);
4021
0
      pReq->params = NULL;
4022
0
    }
4023
0
    if (pReq->groupColVals != NULL) {
4024
0
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
4025
0
      pReq->groupColVals = NULL;
4026
0
    }
4027
0
    if (pReq->pGroupCalcInfos != NULL) {
4028
0
      tSimpleHashCleanup(pReq->pGroupCalcInfos);
4029
0
      pReq->pGroupCalcInfos = NULL;
4030
0
    }
4031
0
    if (pReq->pGroupReadInfos != NULL) {
4032
0
      tSimpleHashCleanup(pReq->pGroupReadInfos);
4033
0
      pReq->pGroupReadInfos = NULL;
4034
0
    }
4035
0
    blockDataDestroy(pReq->pOutBlock);
4036
0
    pReq->pOutBlock = NULL;
4037
0
  }
4038
0
}
4039
4040
0
int32_t tSerializeSTriggerDropTableRequest(void* buf, int32_t bufLen, const SSTriggerDropRequest* pReq) {
4041
0
  SEncoder encoder = {0};
4042
0
  int32_t  code = TSDB_CODE_SUCCESS;
4043
0
  int32_t  lino = 0;
4044
0
  int32_t  tlen = 0;
4045
4046
0
  tEncoderInit(&encoder, buf, bufLen);
4047
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4048
4049
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
4050
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->runnerTaskId));
4051
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
4052
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->gid));
4053
4054
0
  TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(&encoder, pReq->groupColVals, -1));
4055
4056
0
  tEndEncode(&encoder);
4057
4058
0
_exit:
4059
0
  if (code != TSDB_CODE_SUCCESS) {
4060
0
    tlen = code;
4061
0
  } else {
4062
0
    tlen = encoder.pos;
4063
0
  }
4064
0
  tEncoderClear(&encoder);
4065
0
  return tlen;
4066
0
}
4067
4068
0
int32_t tDeserializeSTriggerDropTableRequest(void* buf, int32_t bufLen, SSTriggerDropRequest* pReq) {
4069
0
  SDecoder decoder = {0};
4070
0
  int32_t  code = TSDB_CODE_SUCCESS;
4071
0
  int32_t  lino = 0;
4072
4073
0
  tDecoderInit(&decoder, buf, bufLen);
4074
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4075
4076
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
4077
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->runnerTaskId));
4078
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
4079
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->gid));
4080
4081
0
  TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(&decoder, &pReq->groupColVals));
4082
4083
0
  tEndDecode(&decoder);
4084
4085
0
_exit:
4086
0
  tDecoderClear(&decoder);
4087
0
  return code;
4088
0
}
4089
4090
0
void tDestroySSTriggerDropRequest(SSTriggerDropRequest* pReq) {
4091
0
  if (pReq != NULL) {
4092
0
    if (pReq->groupColVals != NULL) {
4093
0
      taosArrayDestroyEx(pReq->groupColVals, tDestroySStreamGroupValue);
4094
0
      pReq->groupColVals = NULL;
4095
0
    }
4096
0
  }
4097
0
}
4098
4099
0
int32_t tSerializeSTriggerCtrlRequest(void* buf, int32_t bufLen, const SSTriggerCtrlRequest* pReq) {
4100
0
  SEncoder encoder = {0};
4101
0
  int32_t  code = TSDB_CODE_SUCCESS;
4102
0
  int32_t  lino = 0;
4103
0
  int32_t  tlen = 0;
4104
4105
0
  tEncoderInit(&encoder, buf, bufLen);
4106
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4107
4108
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->type));
4109
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->streamId));
4110
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->taskId));
4111
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->sessionId));
4112
4113
0
  tEndEncode(&encoder);
4114
4115
0
_exit:
4116
0
  if (code != TSDB_CODE_SUCCESS) {
4117
0
    tlen = code;
4118
0
  } else {
4119
0
    tlen = encoder.pos;
4120
0
  }
4121
0
  tEncoderClear(&encoder);
4122
0
  return tlen;
4123
0
}
4124
4125
0
int32_t tDeserializeSTriggerCtrlRequest(void* buf, int32_t bufLen, SSTriggerCtrlRequest* pReq) {
4126
0
  SDecoder decoder = {0};
4127
0
  int32_t  code = TSDB_CODE_SUCCESS;
4128
0
  int32_t  lino = 0;
4129
4130
0
  tDecoderInit(&decoder, buf, bufLen);
4131
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4132
4133
0
  int32_t type = 0;
4134
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
4135
0
  pReq->type = type;
4136
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->streamId));
4137
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->taskId));
4138
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->sessionId));
4139
4140
0
  tEndDecode(&decoder);
4141
4142
0
_exit:
4143
0
  tDecoderClear(&decoder);
4144
0
  return code;
4145
0
}
4146
4147
0
int32_t tSerializeStRtFuncInfo(SEncoder* pEncoder, const SStreamRuntimeFuncInfo* pInfo, bool needStreamRtInfo, bool needStreamGrpInfo) {
4148
0
  int32_t code = 0, lino = 0;
4149
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->isMultiGroupCalc));
4150
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->stbPartByTbname));
4151
0
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, needStreamRtInfo));
4152
0
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, needStreamGrpInfo));
4153
0
  if (pInfo->isMultiGroupCalc) {
4154
0
    if (needStreamRtInfo) {
4155
0
      if (needStreamGrpInfo) {
4156
0
        int32_t nGroups = taosArrayGetSize(pInfo->curGrpRead);
4157
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nGroups));
4158
0
        int8_t withGrpCalcInfo = 1;
4159
0
        for (int32_t i = 0; i < nGroups; ++i) {
4160
0
          SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfo->curGrpRead, i);
4161
0
          TAOS_CHECK_EXIT(tSerializeSSTriggerGroupReadInfo(pEncoder, pReadInfo));
4162
0
          if (0 == i) {
4163
0
            withGrpCalcInfo = (taosArrayGetSize(pReadInfo->pTables) <= 0);
4164
0
          }
4165
0
        }
4166
        
4167
0
        TAOS_CHECK_EXIT(tEncodeI8(pEncoder, withGrpCalcInfo));
4168
0
        if (withGrpCalcInfo) {
4169
0
          int32_t nGroups = tSimpleHashGetSize(pInfo->pGroupCalcInfos);
4170
0
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nGroups));
4171
0
          int32_t                 iter1 = 0;
4172
0
          SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashIterate(pInfo->pGroupCalcInfos, NULL, &iter1);
4173
0
          while (pCalcInfo != NULL) {
4174
0
            int64_t* gid = tSimpleHashGetKey(pCalcInfo, NULL);
4175
0
            TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *gid));
4176
0
            TAOS_CHECK_EXIT(tSerializeSSTriggerGroupCalcInfo(pEncoder, pCalcInfo));
4177
0
            pCalcInfo = tSimpleHashIterate(pInfo->pGroupCalcInfos, pCalcInfo, &iter1);
4178
0
          }
4179
0
        }
4180
0
      } else {
4181
0
        int32_t nGroups = tSimpleHashGetSize(pInfo->pGroupCalcInfos);
4182
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nGroups));
4183
0
        int32_t                 iter1 = 0;
4184
0
        SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashIterate(pInfo->pGroupCalcInfos, NULL, &iter1);
4185
0
        while (pCalcInfo != NULL) {
4186
0
          int64_t* gid = tSimpleHashGetKey(pCalcInfo, NULL);
4187
0
          TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *gid));
4188
0
          TAOS_CHECK_EXIT(tSerializeSSTriggerGroupCalcInfo(pEncoder, pCalcInfo));
4189
0
          pCalcInfo = tSimpleHashIterate(pInfo->pGroupCalcInfos, pCalcInfo, &iter1);
4190
0
        }
4191
4192
0
        int32_t nVnodes = tSimpleHashGetSize(pInfo->pGroupReadInfos);
4193
0
        TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nVnodes));
4194
0
        int32_t iter2 = 0;
4195
0
        void*   px = tSimpleHashIterate(pInfo->pGroupReadInfos, NULL, &iter2);
4196
0
        while (px != NULL) {
4197
0
          int32_t* vgId = tSimpleHashGetKey(px, NULL);
4198
0
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *vgId));
4199
0
          SArray* pInfos = *(SArray**)px;
4200
0
          int32_t nGroups = taosArrayGetSize(pInfos);
4201
0
          TAOS_CHECK_EXIT(tEncodeI32(pEncoder, nGroups));
4202
0
          for (int32_t i = 0; i < nGroups; ++i) {
4203
0
            SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfos, i);
4204
0
            TAOS_CHECK_EXIT(tSerializeSSTriggerGroupReadInfo(pEncoder, pReadInfo));
4205
0
          }
4206
0
          px = tSimpleHashIterate(pInfo->pGroupReadInfos, px, &iter2);
4207
0
        }
4208
0
      }
4209
0
    }
4210
0
  } else {
4211
0
    TAOS_CHECK_EXIT(tSerializeSTriggerCalcParam(pEncoder, pInfo->pStreamPesudoFuncVals, true, needStreamRtInfo));
4212
0
    TAOS_CHECK_EXIT(tSerializeStriggerGroupColVals(pEncoder, pInfo->pStreamPartColVals, -1));
4213
0
    TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->groupId));
4214
0
  }
4215
  
4216
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.skey));
4217
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->curWindow.ekey));
4218
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->curIdx));
4219
0
  TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pInfo->sessionId));
4220
0
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->withExternalWindow));
4221
0
  TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->triggerType));
4222
0
  TAOS_CHECK_EXIT(tEncodeBool(pEncoder, pInfo->isWindowTrigger));
4223
0
  TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pInfo->precision));
4224
0
  TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pInfo->streamGen));
4225
0
_exit:
4226
0
  return code;
4227
0
}
4228
4229
0
int32_t tDeserializeStRtFuncInfo(SDecoder* pDecoder, SStreamRuntimeFuncInfo* pInfo) {
4230
0
  int32_t code = 0, lino = 0;
4231
0
  int32_t size = 0;
4232
0
  bool needStreamRtInfo = false;
4233
0
  bool needStreamGrpInfo = false;
4234
  
4235
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->isMultiGroupCalc));
4236
0
  TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->stbPartByTbname));
4237
0
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &needStreamRtInfo));
4238
0
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &needStreamGrpInfo));
4239
  
4240
0
  if (pInfo->isMultiGroupCalc) {
4241
0
    if (needStreamRtInfo) {
4242
0
      if (needStreamGrpInfo) {
4243
0
        int32_t nGroups = 0;
4244
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nGroups));
4245
0
        if (nGroups > 0) {
4246
0
          pInfo->curGrpRead = taosArrayInit_s(sizeof(SSTriggerGroupReadInfo), nGroups);
4247
0
          QUERY_CHECK_NULL(pInfo->curGrpRead, code, lino, _exit, terrno);
4248
0
        }
4249
0
        for (int32_t j = 0; j < nGroups; ++j) {
4250
0
          SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfo->curGrpRead, j);
4251
0
          TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupReadInfo(pDecoder, pReadInfo));
4252
0
        }
4253
0
        int8_t withGrpCalcInfo = 0;
4254
0
        TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &withGrpCalcInfo));
4255
0
        if (withGrpCalcInfo) {
4256
0
          int32_t nGroups = 0;
4257
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nGroups));
4258
0
          pInfo->pGroupCalcInfos = tSimpleHashInit(nGroups, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
4259
0
          QUERY_CHECK_NULL(pInfo->pGroupCalcInfos, code, lino, _exit, terrno);
4260
0
          tSimpleHashSetFreeFp(pInfo->pGroupCalcInfos, tDestroySSTriggerGroupCalcInfo);
4261
4262
0
          for (int32_t i = 0; i < nGroups; i++) {
4263
0
            int64_t gid = 0;
4264
0
            TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &gid));
4265
0
            SSTriggerGroupCalcInfo info = {0};
4266
0
            TAOS_CHECK_EXIT(tSimpleHashPut(pInfo->pGroupCalcInfos, &gid, sizeof(int64_t), &info, sizeof(info)));
4267
0
            SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashGet(pInfo->pGroupCalcInfos, &gid, sizeof(int64_t));
4268
0
            QUERY_CHECK_NULL(pCalcInfo, code, lino, _exit, TSDB_CODE_INTERNAL_ERROR);
4269
0
            TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupCalcInfo(pDecoder, pCalcInfo));
4270
0
          }
4271
0
        }
4272
0
      } else {
4273
0
        int32_t nGroups = 0;
4274
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nGroups));
4275
0
        pInfo->pGroupCalcInfos = tSimpleHashInit(nGroups, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
4276
0
        QUERY_CHECK_NULL(pInfo->pGroupCalcInfos, code, lino, _exit, terrno);
4277
0
        tSimpleHashSetFreeFp(pInfo->pGroupCalcInfos, tDestroySSTriggerGroupCalcInfo);
4278
4279
0
        for (int32_t i = 0; i < nGroups; i++) {
4280
0
          int64_t gid = 0;
4281
0
          TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &gid));
4282
0
          SSTriggerGroupCalcInfo info = {0};
4283
0
          TAOS_CHECK_EXIT(tSimpleHashPut(pInfo->pGroupCalcInfos, &gid, sizeof(int64_t), &info, sizeof(info)));
4284
0
          SSTriggerGroupCalcInfo* pCalcInfo = tSimpleHashGet(pInfo->pGroupCalcInfos, &gid, sizeof(int64_t));
4285
0
          QUERY_CHECK_NULL(pCalcInfo, code, lino, _exit, TSDB_CODE_INTERNAL_ERROR);
4286
0
          TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupCalcInfo(pDecoder, pCalcInfo));
4287
0
        }
4288
4289
0
        int32_t nVnodes = 0;
4290
0
        TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nVnodes));
4291
0
        pInfo->pGroupReadInfos = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
4292
0
        QUERY_CHECK_NULL(pInfo->pGroupReadInfos, code, lino, _exit, terrno);
4293
0
        tSimpleHashSetFreeFp(pInfo->pGroupReadInfos, tDestroySSTriggerGroupReadInfoArray);
4294
0
        for (int32_t i = 0; i < nVnodes; i++) {
4295
0
          int32_t vgId = 0;
4296
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId));
4297
0
          int32_t nGroups = 0;
4298
0
          TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &nGroups));
4299
0
          SArray* pInfos = taosArrayInit_s(sizeof(SSTriggerGroupReadInfo), nGroups);
4300
0
          QUERY_CHECK_NULL(pInfos, code, lino, _exit, terrno);
4301
0
          code = tSimpleHashPut(pInfo->pGroupReadInfos, &vgId, sizeof(int32_t), &pInfos, POINTER_BYTES);
4302
0
          if (code != TSDB_CODE_SUCCESS) {
4303
0
            taosArrayDestroy(pInfos);
4304
0
            TAOS_CHECK_EXIT(code);
4305
0
          }
4306
0
          for (int32_t j = 0; j < nGroups; ++j) {
4307
0
            SSTriggerGroupReadInfo* pReadInfo = TARRAY_GET_ELEM(pInfos, j);
4308
0
            TAOS_CHECK_EXIT(tDeserializeSSTriggerGroupReadInfo(pDecoder, pReadInfo));
4309
0
          }
4310
0
        }
4311
0
      }
4312
0
    }
4313
0
  } else {
4314
0
    TAOS_CHECK_EXIT(tDeserializeSTriggerCalcParam(pDecoder, &pInfo->pStreamPesudoFuncVals, true));
4315
0
    TAOS_CHECK_EXIT(tDeserializeStriggerGroupColVals(pDecoder, &pInfo->pStreamPartColVals));
4316
0
    TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->groupId));
4317
0
  }
4318
  
4319
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.skey));
4320
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->curWindow.ekey));
4321
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->curIdx));
4322
0
  TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pInfo->sessionId));
4323
0
  TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->withExternalWindow));
4324
0
  TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pInfo->triggerType));
4325
0
  if (!tDecodeIsEnd(pDecoder)) {
4326
0
    TAOS_CHECK_EXIT(tDecodeBool(pDecoder, &pInfo->isWindowTrigger));
4327
0
    TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pInfo->precision));
4328
0
  }
4329
0
  if (!tDecodeIsEnd(pDecoder)) {
4330
0
    TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pInfo->streamGen));
4331
0
  }
4332
0
_exit:
4333
0
  return code;
4334
0
}
4335
4336
0
void tDestroyStRtFuncInfo(SStreamRuntimeFuncInfo* pInfo){
4337
0
  if (pInfo == NULL) return;
4338
0
  if (pInfo->pStreamPesudoFuncVals != NULL) {
4339
0
    taosArrayDestroyEx(pInfo->pStreamPesudoFuncVals, tDestroySSTriggerCalcParam);
4340
0
    pInfo->pStreamPesudoFuncVals = NULL;
4341
0
  }
4342
0
  if (pInfo->pStreamPartColVals != NULL) {
4343
0
    taosArrayDestroyEx(pInfo->pStreamPartColVals, tDestroySStreamGroupValue);
4344
0
    pInfo->pStreamPartColVals = NULL;
4345
0
  }
4346
0
  if (pInfo->pGroupCalcInfos != NULL) {
4347
0
    tSimpleHashCleanup(pInfo->pGroupCalcInfos);
4348
0
    pInfo->pGroupCalcInfos = NULL;
4349
0
  }
4350
0
  if (pInfo->pGroupReadInfos != NULL) {
4351
0
    tSimpleHashCleanup(pInfo->pGroupReadInfos);
4352
0
    pInfo->pGroupReadInfos = NULL;
4353
0
  }  
4354
0
  if (pInfo->outNormalTable != NULL) {
4355
0
    taosMemoryFreeClear(pInfo->outNormalTable);
4356
0
  }
4357
0
}
4358
4359
0
int32_t tSerializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, const SStreamMsgVTableInfo* pRsp){
4360
0
  SEncoder encoder = {0};
4361
0
  int32_t  code = TSDB_CODE_SUCCESS;
4362
0
  int32_t  lino = 0;
4363
0
  int32_t  tlen = 0;
4364
4365
0
  tEncoderInit(&encoder, buf, bufLen);
4366
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4367
4368
0
  int32_t size = taosArrayGetSize(pRsp->infos);
4369
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
4370
0
  for (int32_t i = 0; i < size; ++i) {
4371
0
    VTableInfo* info = taosArrayGet(pRsp->infos, i);
4372
0
    if (info == NULL) {
4373
0
      TAOS_CHECK_EXIT(terrno);
4374
0
    }
4375
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->gId));
4376
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, info->uid));
4377
0
    TAOS_CHECK_EXIT(tEncodeSColRefWrapper(&encoder, &info->cols));
4378
0
  }
4379
4380
0
  tEndEncode(&encoder);
4381
4382
0
_exit:
4383
0
  if (code != TSDB_CODE_SUCCESS) {
4384
0
    tlen = code;
4385
0
  } else {
4386
0
    tlen = encoder.pos;
4387
0
  }
4388
0
  tEncoderClear(&encoder);
4389
0
  return tlen;
4390
0
}
4391
4392
0
int32_t tDeserializeSStreamMsgVTableInfo(void* buf, int32_t bufLen, SStreamMsgVTableInfo *vTableInfo){
4393
0
  SDecoder decoder = {0};
4394
0
  int32_t  code = TSDB_CODE_SUCCESS;
4395
0
  int32_t  lino = 0;
4396
0
  int32_t  size = 0;
4397
4398
0
  tDecoderInit(&decoder, buf, bufLen);
4399
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4400
4401
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &size));
4402
0
  vTableInfo->infos = taosArrayInit(size, sizeof(VTableInfo));
4403
0
  if (vTableInfo->infos == NULL) {
4404
0
    TAOS_CHECK_EXIT(terrno);
4405
0
  }
4406
0
  for (int32_t i = 0; i < size; ++i) {
4407
0
    VTableInfo* info = taosArrayReserve(vTableInfo->infos, 1);
4408
0
    if (info == NULL) {
4409
0
      TAOS_CHECK_EXIT(terrno);
4410
0
    }
4411
0
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->gId));
4412
0
    TAOS_CHECK_EXIT(tDecodeI64(&decoder, &info->uid));
4413
0
    TAOS_CHECK_EXIT(tDecodeSColRefWrapperEx(&decoder, &info->cols, false));
4414
0
  }
4415
4416
0
  tEndDecode(&decoder);
4417
4418
0
_exit:
4419
0
  tDecoderClear(&decoder);
4420
0
  return code;
4421
0
}
4422
4423
4424
0
void tDestroyVTableInfo(void *ptr) {
4425
0
  if (NULL == ptr) {
4426
0
    return;
4427
0
  }
4428
0
  VTableInfo* pTable = (VTableInfo*)ptr;
4429
0
  taosMemoryFree(pTable->cols.pColRef);
4430
0
}
4431
4432
0
void tDestroySStreamMsgVTableInfo(SStreamMsgVTableInfo *ptr) {
4433
0
  if (ptr == NULL) return;
4434
0
  taosArrayDestroyEx(ptr->infos, tDestroyVTableInfo);
4435
0
  ptr->infos = NULL;
4436
0
}
4437
4438
0
int32_t tSerializeSStreamTsResponse(void* buf, int32_t bufLen, const SStreamTsResponse* pRsp) {
4439
0
  SEncoder encoder = {0};
4440
0
  int32_t  code = TSDB_CODE_SUCCESS;
4441
0
  int32_t  lino = 0;
4442
0
  int32_t  tlen = 0;
4443
4444
0
  tEncoderInit(&encoder, buf, bufLen);
4445
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4446
4447
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
4448
0
  int32_t size = taosArrayGetSize(pRsp->tsInfo);
4449
0
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, size));
4450
0
  for (int32_t i = 0; i < size; ++i) {
4451
0
    STsInfo* tsInfo = taosArrayGet(pRsp->tsInfo, i);
4452
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->gId));
4453
0
    TAOS_CHECK_EXIT(tEncodeI64(&encoder, tsInfo->ts));
4454
0
  }
4455
4456
0
  tEndEncode(&encoder);
4457
4458
0
_exit:
4459
0
  if (code != TSDB_CODE_SUCCESS) {
4460
0
    tlen = code;
4461
0
  } else {
4462
0
    tlen = encoder.pos;
4463
0
  }
4464
0
  tEncoderClear(&encoder);
4465
0
  return tlen;
4466
0
}
4467
4468
0
int32_t tDeserializeSStreamTsResponse(void* buf, int32_t bufLen, void *pBlock) {
4469
0
  SDecoder decoder = {0};
4470
0
  int32_t  code = TSDB_CODE_SUCCESS;
4471
0
  int32_t  lino = 0;
4472
0
  SSDataBlock *pResBlock = pBlock;
4473
4474
0
  tDecoderInit(&decoder, buf, bufLen);
4475
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4476
4477
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, (int64_t*)&pResBlock->info.id.groupId));
4478
0
  int32_t numOfCols = 2;
4479
0
  if (pResBlock->pDataBlock == NULL) {
4480
0
    pResBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
4481
0
    if (pResBlock->pDataBlock == NULL) {
4482
0
      TAOS_CHECK_EXIT(terrno);
4483
0
    }
4484
0
    for (int32_t i = 0; i< numOfCols; ++i) {
4485
0
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, i);
4486
0
      if (pColInfoData == NULL) {
4487
0
        TAOS_CHECK_EXIT(terrno);
4488
0
      }
4489
0
      pColInfoData->info.type = TSDB_DATA_TYPE_BIGINT;
4490
0
      pColInfoData->info.bytes = sizeof(int64_t);
4491
0
    }
4492
0
  }
4493
0
  int32_t numOfRows = 0;
4494
0
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfRows));
4495
0
  TAOS_CHECK_EXIT(blockDataEnsureCapacity(pResBlock, numOfRows));
4496
0
  for (int32_t i = 0; i < numOfRows; ++i) {
4497
0
    for (int32_t j = 0; j < numOfCols; ++j) {
4498
0
      SColumnInfoData *pColInfoData = taosArrayGet(pResBlock->pDataBlock, j);
4499
0
      if (pColInfoData == NULL) {
4500
0
        TAOS_CHECK_EXIT(terrno);
4501
0
      }
4502
0
      int64_t value = 0;
4503
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &value));
4504
0
      colDataSetInt64(pColInfoData, i, &value);
4505
0
    }
4506
0
  }
4507
4508
0
  pResBlock->info.dataLoad = 1;
4509
0
  pResBlock->info.rows = numOfRows;
4510
4511
0
  tEndDecode(&decoder);
4512
4513
0
_exit:
4514
0
  tDecoderClear(&decoder);
4515
0
  return code;
4516
0
}
4517
4518
0
static int32_t encodeData(SEncoder* encoder, void* pBlock, SSHashObj* indexHash) {
4519
0
  int32_t code = TSDB_CODE_SUCCESS;
4520
0
  int32_t lino = 0;
4521
0
  int32_t len = 0;
4522
0
  if (encoder->data == NULL){
4523
0
    len = blockGetEncodeSize(pBlock);
4524
0
  } else {
4525
0
    len = blockEncode(pBlock, (char*)(encoder->data + encoder->pos), encoder->size - encoder->pos, blockDataGetNumOfCols(pBlock));
4526
0
    if (len < 0) {
4527
0
      TAOS_CHECK_EXIT(terrno);
4528
0
    }
4529
0
  }
4530
0
  encoder->pos += len;
4531
4532
0
  if (indexHash == NULL) {
4533
0
    goto _exit;
4534
0
  } 
4535
  
4536
0
  uint32_t pos = encoder->pos;
4537
0
  encoder->pos += sizeof(uint32_t); // reserve space for tables
4538
0
  int32_t tables = 0;
4539
  
4540
0
  void*   pe = NULL;
4541
0
  int32_t iter = 0;
4542
0
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4543
0
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
4544
0
    if (pInfo->gId == -1){
4545
0
      continue;
4546
0
    }
4547
0
    int64_t uid = *(int64_t*)(tSimpleHashGetKey(pe, NULL));
4548
0
    TAOS_CHECK_EXIT(tEncodeI64(encoder, uid));
4549
0
    TAOS_CHECK_EXIT(tEncodeU64(encoder, pInfo->gId));
4550
0
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->startRowIdx));
4551
0
    TAOS_CHECK_EXIT(tEncodeI32(encoder, pInfo->numRows));
4552
0
    tables++;
4553
0
  }
4554
0
  uint32_t tmpPos = encoder->pos;
4555
0
  encoder->pos = pos;
4556
0
  TAOS_CHECK_EXIT(tEncodeI32(encoder, tables));
4557
0
  encoder->pos = tmpPos;
4558
0
_exit:
4559
0
  return code;
4560
0
}
4561
 
4562
0
static int32_t encodeBlock(SEncoder* encoder, void* block, SSHashObj* indexHash) {
4563
0
  int32_t  code = TSDB_CODE_SUCCESS;
4564
0
  int32_t  lino = 0;
4565
0
  if (block != NULL && ((SSDataBlock*)block)->info.rows > 0) {
4566
0
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 1));
4567
0
    TAOS_CHECK_EXIT(encodeData(encoder, block, indexHash));
4568
0
  } else {
4569
0
    TAOS_CHECK_EXIT(tEncodeI8(encoder, 0));
4570
0
  }
4571
4572
0
_exit:
4573
0
  return code;
4574
0
}
4575
4576
0
int32_t tSerializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* rsp) {
4577
0
  SEncoder encoder = {0};
4578
0
  int32_t  code = TSDB_CODE_SUCCESS;
4579
0
  int32_t  lino = 0;
4580
0
  int32_t  tlen = 0;
4581
4582
0
  tEncoderInit(&encoder, buf, bufLen);
4583
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4584
4585
0
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->dataBlock, rsp->indexHash));
4586
0
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->metaBlock, NULL));
4587
0
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->deleteBlock, NULL));
4588
0
  TAOS_CHECK_EXIT(encodeBlock(&encoder, rsp->tableBlock, NULL));
4589
4590
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->ver));
4591
0
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, rsp->verTime));
4592
0
  tEndEncode(&encoder);
4593
4594
0
_exit:
4595
0
  if (code != TSDB_CODE_SUCCESS) {
4596
0
    tlen = code;
4597
0
  } else {
4598
0
    tlen = encoder.pos;
4599
0
  }
4600
0
  tEncoderClear(&encoder);
4601
0
  return tlen;
4602
0
}
4603
4604
0
static int32_t decodeBlock(SDecoder* decoder, void* pBlock) {
4605
0
  int32_t  code = TSDB_CODE_SUCCESS;
4606
0
  int32_t  lino = 0;
4607
  
4608
0
  int8_t hasData = false;
4609
0
  TAOS_CHECK_EXIT(tDecodeI8(decoder, &hasData));
4610
0
  if (hasData) {
4611
0
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
4612
0
    const char* pEndPos = NULL;
4613
0
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder->data + decoder->pos, &pEndPos));
4614
0
    decoder->pos = (uint8_t*)pEndPos - decoder->data;
4615
0
  } else if (pBlock != NULL) {
4616
0
    blockDataEmpty(pBlock);
4617
0
  }
4618
4619
0
_exit:
4620
0
  return code;
4621
0
}
4622
4623
0
int32_t tDeserializeSStreamWalDataResponse(void* buf, int32_t bufLen, SSTriggerWalNewRsp* pRsp, SArray* pSlices){
4624
0
  SDecoder     decoder = {0};
4625
0
  int32_t      code = TSDB_CODE_SUCCESS;
4626
0
  int32_t      lino = 0;
4627
0
  SSDataBlock* pBlock = NULL;
4628
4629
0
  tDecoderInit(&decoder, buf, bufLen);
4630
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4631
4632
  // decode data block
4633
0
  int8_t hasData = false;
4634
0
  TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasData));
4635
0
  pBlock = pRsp->dataBlock;
4636
0
  if (hasData) {
4637
0
    TAOS_CHECK_EXIT(pBlock != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_PARA);
4638
0
    const char* pEndPos = NULL;
4639
0
    TAOS_CHECK_EXIT(blockDecode(pBlock, (char*)decoder.data + decoder.pos, &pEndPos));
4640
0
    decoder.pos = (uint8_t*)pEndPos - decoder.data;
4641
4642
0
    int32_t nSlices = 0;
4643
0
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nSlices));
4644
0
    TAOS_CHECK_EXIT(taosArrayEnsureCap(pSlices, nSlices));
4645
0
    taosArrayClear(pSlices);
4646
0
    int64_t  uid = 0;
4647
0
    uint64_t gid = 0;
4648
0
    int32_t  startIdx = 0;
4649
0
    int32_t  numRows = 0;
4650
0
    for (int32_t i = 0; i < nSlices; i++) {
4651
0
      TAOS_CHECK_EXIT(tDecodeI64(&decoder, &uid));
4652
0
      TAOS_CHECK_EXIT(tDecodeU64(&decoder, &gid));
4653
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &startIdx));
4654
0
      TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numRows));
4655
0
      int32_t endIdx = startIdx + numRows;
4656
0
      int64_t value[3] = {gid, uid, (int64_t)startIdx << 32 | endIdx};
4657
0
      void*   px = taosArrayPush(pSlices, value);
4658
0
      if (px == NULL) {
4659
0
        code = terrno;
4660
0
        goto _exit;
4661
0
      }
4662
0
    }
4663
0
  } else if (pBlock != NULL) {
4664
0
    blockDataEmpty(pBlock);
4665
0
    taosArrayClear(pSlices);
4666
0
  }
4667
4668
0
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->metaBlock));
4669
0
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->deleteBlock));
4670
0
  TAOS_CHECK_EXIT(decodeBlock(&decoder, pRsp->tableBlock));
4671
  
4672
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
4673
0
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->verTime));
4674
4675
0
  tEndDecode(&decoder);
4676
4677
0
_exit:
4678
0
  if (code != TSDB_CODE_SUCCESS) {
4679
0
    uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4680
0
  }
4681
0
  tDecoderClear(&decoder);
4682
0
  return code;
4683
0
}
4684
4685
0
int32_t tSerializeGetStreamCreateSqlReq(void* buf, int32_t bufLen, const SGetStreamCreateSqlReq* pReq) {
4686
0
  SEncoder encoder = {0};
4687
0
  tEncoderInit(&encoder, buf, bufLen);
4688
0
  int32_t code = 0, lino = 0;
4689
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4690
0
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name));
4691
0
  tEndEncode(&encoder);
4692
0
_exit:
4693
0
  if (code) uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4694
0
  int32_t len = encoder.pos;
4695
0
  tEncoderClear(&encoder);
4696
0
  return (code != 0) ? code : len;
4697
0
}
4698
4699
0
int32_t tDeserializeGetStreamCreateSqlReq(void* buf, int32_t bufLen, SGetStreamCreateSqlReq* pReq) {
4700
0
  SDecoder decoder = {0};
4701
0
  tDecoderInit(&decoder, buf, bufLen);
4702
0
  int32_t code = 0, lino = 0;
4703
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4704
0
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name));
4705
0
  tEndDecode(&decoder);
4706
0
_exit:
4707
0
  if (code) uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4708
0
  tDecoderClear(&decoder);
4709
0
  return code;
4710
0
}
4711
4712
0
int32_t tSerializeGetStreamCreateSqlRsp(void* buf, int32_t bufLen, const SGetStreamCreateSqlRsp* pRsp) {
4713
0
  SEncoder encoder = {0};
4714
0
  tEncoderInit(&encoder, buf, bufLen);
4715
0
  int32_t code = 0, lino = 0;
4716
0
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
4717
0
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pRsp->sql ? pRsp->sql : ""));
4718
0
  tEndEncode(&encoder);
4719
0
_exit:
4720
0
  if (code) uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4721
0
  int32_t len = encoder.pos;
4722
0
  tEncoderClear(&encoder);
4723
0
  return (code != 0) ? code : len;
4724
0
}
4725
4726
0
int32_t tDeserializeGetStreamCreateSqlRsp(void* buf, int32_t bufLen, SGetStreamCreateSqlRsp* pRsp) {
4727
0
  SDecoder decoder = {0};
4728
0
  tDecoderInit(&decoder, buf, bufLen);
4729
0
  int32_t code = 0, lino = 0;
4730
0
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
4731
0
  TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pRsp->sql));
4732
0
  tEndDecode(&decoder);
4733
0
_exit:
4734
0
  if (code) uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
4735
0
  tDecoderClear(&decoder);
4736
0
  return code;
4737
0
}
4738
4739
0
void tFreeGetStreamCreateSqlRsp(SGetStreamCreateSqlRsp* pRsp) {
4740
0
  if (pRsp) {
4741
    taosMemoryFreeClear(pRsp->sql);
4742
0
  }
4743
0
}