Coverage Report

Created: 2026-05-23 07:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/tdengine/include/libs/function/taosudf.h
Line
Count
Source
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
16
#ifndef TDENGINE_TAOSUDF_H
17
#define TDENGINE_TAOSUDF_H
18
19
#include <stdbool.h>
20
#include <stdint.h>
21
#include <stdlib.h>
22
#include <string.h>
23
24
#include <taos.h>
25
#include <taoserror.h>
26
27
#ifdef __cplusplus
28
extern "C" {
29
#endif
30
31
#if defined(__GNUC__)
32
#define FORCE_INLINE inline __attribute__((always_inline))
33
#else
34
#define FORCE_INLINE
35
#endif
36
37
#define TAOS_UDF_CHECK_RETURN(CMD)   \
38
  do {                               \
39
    int32_t code = (CMD);            \
40
    if (code != TSDB_CODE_SUCCESS) { \
41
      return (CMD);                  \
42
    }                                \
43
  } while (0)
44
45
typedef struct SUdfColumnMeta {
46
  int16_t type;
47
  int32_t bytes;
48
  uint8_t precision;
49
  uint8_t scale;
50
} SUdfColumnMeta;
51
52
typedef struct SUdfColumnData {
53
  int32_t numOfRows;
54
  int32_t rowsAlloc;
55
  union {
56
    struct {
57
      int32_t nullBitmapLen;
58
      char   *nullBitmap;
59
      int32_t dataLen;
60
      char   *data;
61
    } fixLenCol;
62
63
    struct {
64
      int32_t  varOffsetsLen;
65
      int32_t *varOffsets;
66
      int32_t  payloadLen;
67
      char    *payload;
68
      int32_t  payloadAllocLen;
69
    } varLenCol;
70
  };
71
} SUdfColumnData;
72
73
typedef struct SUdfColumn {
74
  SUdfColumnMeta colMeta;
75
  bool           hasNull;
76
  SUdfColumnData colData;
77
} SUdfColumn;
78
79
typedef struct SUdfDataBlock {
80
  int32_t      numOfRows;
81
  int32_t      numOfCols;
82
  SUdfColumn **udfCols;
83
} SUdfDataBlock;
84
85
typedef struct SUdfInterBuf {
86
  int32_t bufLen;
87
  char   *buf;
88
  int8_t  numOfResult;  // zero or one
89
} SUdfInterBuf;
90
typedef void *UdfcFuncHandle;
91
92
#define UDF_MEMORY_EXP_GROWTH 1.5
93
0
#define NBIT                  (3u)
94
0
#define BitPos(_n)            ((_n) & ((1 << NBIT) - 1))
95
0
#define BMCharPos(bm_, r_)    ((bm_)[(r_) >> NBIT])
96
0
#define BitmapLen(_n)         (((_n) + ((1 << NBIT) - 1)) >> NBIT)
97
98
0
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
99
#define udfColDataIsNull_f(pColumn, row) \
100
0
  ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
101
#define udfColDataSetNull_f(pColumn, row)                                                \
102
  do {                                                                                   \
103
    BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
104
  } while (0)
105
106
#define udfColDataSetNotNull_f(pColumn, r_)                                             \
107
  do {                                                                                  \
108
    BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
109
  } while (0)
110
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
111
112
typedef uint16_t VarDataLenT;  // maxVarDataLen: 65535
113
0
#define VARSTR_HEADER_SIZE     sizeof(VarDataLenT)
114
0
#define varDataLen(v)          ((VarDataLenT *)(v))[0]
115
0
#define varDataVal(v)          ((char *)(v) + VARSTR_HEADER_SIZE)
116
0
#define varDataTLen(v)         (sizeof(VarDataLenT) + varDataLen(v))
117
#define varDataCopy(dst, v)    (void)memcpy((dst), (void *)(v), varDataTLen(v))
118
#define varDataLenByData(v)    (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
119
0
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
120
121
typedef int32_t  VarDataOffsetT;
122
typedef uint32_t BlobDataLenT;  // maxVarDataLen: 2^32 - 1
123
0
#define BLOBSTR_HEADER_SIZE     sizeof(BlobDataLenT)
124
0
#define blobDataTLen(v)         (sizeof(BlobDataLenT) + blobDataLen(v))
125
#define blobDataCopy(dst, v)    (void)memcpy((dst), (void *)(v), blobDataTLen(v))
126
#define blobDataLenByData(v)    (*(BlobDataLenT *)(((char *)(v)) - BLOBSTR_HEADER_SIZE))
127
#define blobDataSetLen(v, _len) (((BlobDataLenT *)(v))[0] = (BlobDataLenT)(_len))
128
129
#define IS_VAR_DATA_TYPE(t)                                                                                 \
130
0
  (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
131
0
   ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY) || ((t) == TSDB_DATA_TYPE_BLOB) ||      \
132
0
   ((t) == TSDB_DATA_TYPE_MEDIUMBLOB))
133
134
#define IS_STR_DATA_TYPE(t) \
135
0
  (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
136
137
0
#define IS_STR_DATA_BLOB(t) ((t) == TSDB_DATA_TYPE_BLOB || (t) == TSDB_DATA_TYPE_MEDIUMBLOB)
138
139
0
static FORCE_INLINE char *udfColDataGetData(const SUdfColumn *pColumn, int32_t row) {
140
0
  if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
141
0
    return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
142
0
  } else {
143
0
    return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
144
0
  }
145
0
}
Unexecuted instantiation: functionMgt.c:udfColDataGetData
Unexecuted instantiation: tudf.c:udfColDataGetData
Unexecuted instantiation: builtinsimpl.c:udfColDataGetData
Unexecuted instantiation: tavgfunction.c:udfColDataGetData
Unexecuted instantiation: tminmax.c:udfColDataGetData
Unexecuted instantiation: scalar.c:udfColDataGetData
146
147
0
static FORCE_INLINE int32_t udfColDataGetDataLen(const SUdfColumn *pColumn, int32_t row) {
148
0
  if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
149
0
    return *(uint16_t *)(pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row]);
150
0
  } else {
151
0
    return pColumn->colMeta.bytes;
152
0
  }
153
0
}
Unexecuted instantiation: functionMgt.c:udfColDataGetDataLen
Unexecuted instantiation: tudf.c:udfColDataGetDataLen
Unexecuted instantiation: builtinsimpl.c:udfColDataGetDataLen
Unexecuted instantiation: tavgfunction.c:udfColDataGetDataLen
Unexecuted instantiation: tminmax.c:udfColDataGetDataLen
Unexecuted instantiation: scalar.c:udfColDataGetDataLen
154
155
0
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn *pColumn, int32_t row) {
156
0
  if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
157
0
    if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
158
0
      if (udfColDataIsNull_var(pColumn, row)) {
159
0
        return true;
160
0
      }
161
0
      char *data = udfColDataGetData(pColumn, row);
162
0
      return (*data == TSDB_DATA_TYPE_NULL);
163
0
    } else {
164
0
      return udfColDataIsNull_var(pColumn, row);
165
0
    }
166
0
  } else {
167
0
    return udfColDataIsNull_f(pColumn, row);
168
0
  }
169
0
}
Unexecuted instantiation: functionMgt.c:udfColDataIsNull
Unexecuted instantiation: tudf.c:udfColDataIsNull
Unexecuted instantiation: builtinsimpl.c:udfColDataIsNull
Unexecuted instantiation: tavgfunction.c:udfColDataIsNull
Unexecuted instantiation: tminmax.c:udfColDataIsNull
Unexecuted instantiation: scalar.c:udfColDataIsNull
170
171
0
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t newCapacity) {
172
0
  SUdfColumnMeta *meta = &pColumn->colMeta;
173
0
  SUdfColumnData *data = &pColumn->colData;
174
0
175
0
  if (newCapacity == 0 || newCapacity <= data->rowsAlloc) {
176
0
    return TSDB_CODE_SUCCESS;
177
0
  }
178
0
179
0
  int allocCapacity = (data->rowsAlloc < 8) ? 8 : data->rowsAlloc;
180
0
  while (allocCapacity < newCapacity) {
181
0
    allocCapacity *= UDF_MEMORY_EXP_GROWTH;
182
0
  }
183
0
184
0
  int32_t existedRows = data->numOfRows;
185
0
186
0
  if (IS_VAR_DATA_TYPE(meta->type)) {
187
0
    char *tmp = (char *)realloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
188
0
    if (tmp == NULL) {
189
0
      return TSDB_CODE_OUT_OF_MEMORY;
190
0
    }
191
0
    data->varLenCol.varOffsets = (int32_t *)tmp;
192
0
    data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
193
0
    (void)memset(&data->varLenCol.varOffsets[existedRows], 0, sizeof(int32_t) * (allocCapacity - existedRows));
194
0
    // for payload, add data in udfColDataAppend
195
0
  } else {
196
0
    char *tmp = (char *)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
197
0
    if (tmp == NULL) {
198
0
      return TSDB_CODE_OUT_OF_MEMORY;
199
0
    }
200
0
    uint32_t extend = BitmapLen(allocCapacity) - BitmapLen(data->rowsAlloc);
201
0
    (void)memset(tmp + BitmapLen(data->rowsAlloc), 0, extend);
202
0
    data->fixLenCol.nullBitmap = tmp;
203
0
    data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
204
0
    int32_t oldLen = BitmapLen(existedRows);
205
0
    (void)memset(&data->fixLenCol.nullBitmap[oldLen], 0, BitmapLen(allocCapacity) - oldLen);
206
0
207
0
    if (meta->type == TSDB_DATA_TYPE_NULL) {
208
0
      return TSDB_CODE_SUCCESS;
209
0
    }
210
0
211
0
    tmp = (char *)realloc(data->fixLenCol.data, allocCapacity * meta->bytes);
212
0
    if (tmp == NULL) {
213
0
      return TSDB_CODE_OUT_OF_MEMORY;
214
0
    }
215
0
216
0
    data->fixLenCol.data = tmp;
217
0
    data->fixLenCol.dataLen = allocCapacity * meta->bytes;
218
0
  }
219
0
220
0
  data->rowsAlloc = allocCapacity;
221
0
222
0
  return TSDB_CODE_SUCCESS;
223
0
}
Unexecuted instantiation: functionMgt.c:udfColEnsureCapacity
Unexecuted instantiation: tudf.c:udfColEnsureCapacity
Unexecuted instantiation: builtinsimpl.c:udfColEnsureCapacity
Unexecuted instantiation: tavgfunction.c:udfColEnsureCapacity
Unexecuted instantiation: tminmax.c:udfColEnsureCapacity
Unexecuted instantiation: scalar.c:udfColEnsureCapacity
224
225
0
static FORCE_INLINE int32_t udfColDataSetNull(SUdfColumn *pColumn, int32_t row) {
226
0
  int32_t code = udfColEnsureCapacity(pColumn, row + 1);
227
0
  if (code != TSDB_CODE_SUCCESS) {
228
0
    return code;
229
0
  }
230
0
  if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
231
0
    udfColDataSetNull_var(pColumn, row);
232
0
  } else {
233
0
    udfColDataSetNull_f(pColumn, row);
234
0
  }
235
0
  pColumn->hasNull = true;
236
0
  pColumn->colData.numOfRows =
237
0
      ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows;
238
0
  return 0;
239
0
}
Unexecuted instantiation: functionMgt.c:udfColDataSetNull
Unexecuted instantiation: tudf.c:udfColDataSetNull
Unexecuted instantiation: builtinsimpl.c:udfColDataSetNull
Unexecuted instantiation: tavgfunction.c:udfColDataSetNull
Unexecuted instantiation: tminmax.c:udfColDataSetNull
Unexecuted instantiation: scalar.c:udfColDataSetNull
240
241
0
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) {
242
0
  SUdfColumnMeta *meta = &pColumn->colMeta;
243
0
  SUdfColumnData *data = &pColumn->colData;
244
0
  TAOS_UDF_CHECK_RETURN(udfColEnsureCapacity(pColumn, currentRow + 1));
245
0
  bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
246
0
  if (isNull) {
247
0
    TAOS_UDF_CHECK_RETURN(udfColDataSetNull(pColumn, currentRow));
248
0
  } else {
249
0
    if (!isVarCol) {
250
0
      udfColDataSetNotNull_f(pColumn, currentRow);
251
0
      (void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
252
0
    } else {
253
0
      int32_t dataLen = varDataTLen(pData);
254
0
      // This is a piece of code to help users implement udf. It is only called during testing.
255
0
      // Currently, the json type is not supported and will not be called.
256
0
      // if (meta->type == TSDB_DATA_TYPE_JSON) {
257
0
      //   if (*pData == TSDB_DATA_TYPE_NULL) {
258
0
      //     dataLen = 0;
259
0
      //   } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
260
0
      //     dataLen = varDataTLen(pData + sizeof(char));
261
0
      //   } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
262
0
      //     dataLen = sizeof(int64_t);
263
0
      //   } else if (*pData == TSDB_DATA_TYPE_BOOL) {
264
0
      //     dataLen = sizeof(char);
265
0
      //   }
266
0
      //   dataLen += sizeof(char);
267
0
      // }
268
0
269
0
      if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
270
0
        uint32_t newSize = data->varLenCol.payloadAllocLen;
271
0
        if (newSize <= 1) {
272
0
          newSize = 8;
273
0
        }
274
0
275
0
        while (newSize < (uint32_t)(data->varLenCol.payloadLen + dataLen)) {
276
0
          newSize = newSize * UDF_MEMORY_EXP_GROWTH;
277
0
        }
278
0
279
0
        char *buf = (char *)realloc(data->varLenCol.payload, newSize);
280
0
        if (buf == NULL) {
281
0
          return TSDB_CODE_OUT_OF_MEMORY;
282
0
        }
283
0
284
0
        data->varLenCol.payload = buf;
285
0
        data->varLenCol.payloadAllocLen = newSize;
286
0
      }
287
0
288
0
      uint32_t len = data->varLenCol.payloadLen;
289
0
      data->varLenCol.varOffsets[currentRow] = len;
290
0
291
0
      (void)memcpy(data->varLenCol.payload + len, pData, dataLen);
292
0
      data->varLenCol.payloadLen += dataLen;
293
0
    }
294
0
  }
295
0
  data->numOfRows = ((int32_t)(currentRow + 1) > data->numOfRows) ? (int32_t)(currentRow + 1) : data->numOfRows;
296
0
  return 0;
297
0
}
Unexecuted instantiation: functionMgt.c:udfColDataSet
Unexecuted instantiation: tudf.c:udfColDataSet
Unexecuted instantiation: builtinsimpl.c:udfColDataSet
Unexecuted instantiation: tavgfunction.c:udfColDataSet
Unexecuted instantiation: tminmax.c:udfColDataSet
Unexecuted instantiation: scalar.c:udfColDataSet
298
299
// dynamic lib init and destroy for C UDF
300
typedef int32_t (*TUdfInitFunc)();
301
typedef int32_t (*TUdfDestroyFunc)();
302
303
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol);
304
305
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
306
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
307
typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
308
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
309
310
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
311
typedef struct SScriptUdfEnvItem {
312
  const char *name;
313
  const char *value;
314
} SScriptUdfEnvItem;
315
316
typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EUdfFuncType;
317
318
typedef struct SScriptUdfInfo {
319
  const char *name;
320
  int32_t     version;
321
  int64_t     createdTime;
322
323
  EUdfFuncType funcType;
324
  int8_t       scriptType;
325
  int8_t       outputType;
326
  int32_t      outputLen;
327
  int32_t      bufSize;
328
329
  const char *path;
330
} SScriptUdfInfo;
331
332
typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx);
333
334
typedef int32_t (*TScriptUdfAggStartFunc)(SUdfInterBuf *buf, void *udfCtx);
335
typedef int32_t (*TScriptUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf,
336
                                            void *udfCtx);
337
typedef int32_t (*TScriptUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
338
                                          void *udfCtx);
339
typedef int32_t (*TScriptUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx);
340
typedef int32_t (*TScriptUdfInitFunc)(SScriptUdfInfo *info, void **pUdfCtx);
341
typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx);
342
343
// the following function is for open/close script plugin.
344
typedef int32_t (*TScriptOpenFunc)(SScriptUdfEnvItem *items, int numItems);
345
typedef int32_t (*TScriptCloseFunc)();
346
347
// clang-format off
348
#ifdef WINDOWS
349
  #define fnFatal(...) {}
350
  #define fnError(...) {}
351
  #define fnWarn(...)  {}
352
  #define fnInfo(...)  {}
353
  #define fnDebug(...) {}
354
  #define fnTrace(...) {}
355
#else
356
  DLL_EXPORT void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...)
357
#ifdef __GNUC__
358
    __attribute__((format(printf, 4, 5)))
359
#endif
360
    ;
361
  extern int32_t udfDebugFlag;
362
  #define udfFatal(...) { if (udfDebugFlag & 1) { taosPrintLog("UDF FATAL ", 1, 255, __VA_ARGS__); }}
363
  #define udfError(...) { if (udfDebugFlag & 1) { taosPrintLog("UDF ERROR ", 1, 255, __VA_ARGS__); }}
364
  #define udfWarn(...)  { if (udfDebugFlag & 2) { taosPrintLog("UDF WARN  ", 2, 255, __VA_ARGS__); }}
365
  #define udfInfo(...)  { if (udfDebugFlag & 2) { taosPrintLog("UDF INFO  ", 2, 255, __VA_ARGS__); }}
366
  #define udfDebug(...) { if (udfDebugFlag & 4) { taosPrintLog("UDF DEBUG ", 4, udfDebugFlag, __VA_ARGS__); }}
367
  #define udfTrace(...) { if (udfDebugFlag & 8) { taosPrintLog("UDF TRACE ", 8, udfDebugFlag, __VA_ARGS__); }}
368
#endif
369
// clang-format on
370
371
#ifdef __cplusplus
372
}
373
#endif
374
375
#endif  // TDENGINE_TAOSUDF_H