YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/exec/eval_aggr.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/common/ql_protocol_util.h"
17
#include "yb/common/ql_rowblock.h"
18
#include "yb/common/ql_value.h"
19
20
#include "yb/util/decimal.h"
21
#include "yb/util/result.h"
22
23
#include "yb/yql/cql/ql/exec/exec_context.h"
24
#include "yb/yql/cql/ql/exec/executor.h"
25
#include "yb/yql/cql/ql/ptree/pt_expr.h"
26
#include "yb/yql/cql/ql/ptree/pt_select.h"
27
28
namespace yb {
29
namespace ql {
30
31
using std::shared_ptr;
32
using yb::bfql::TSOpcode;
33
using yb::util::Decimal;
34
35
//--------------------------------------------------------------------------------------------------
36
37
3.89M
Status Executor::AggregateResultSets(const PTSelectStmt* pt_select, TnodeContext* tnode_context) {
38
3.89M
  if (!pt_select->is_aggregate()) {
39
3.89M
    return Status::OK();
40
3.89M
  }
41
42
2.27k
  shared_ptr<RowsResult> rows_result = tnode_context->rows_result();
43
2.27k
  DCHECK(rows_result->client() == QLClient::YQL_CLIENT_CQL);
44
2.27k
  shared_ptr<QLRowBlock> row_block = rows_result->GetRowBlock();
45
2.27k
  int column_index = 0;
46
2.27k
  faststring buffer;
47
48
2.27k
  CQLEncodeLength(1, &buffer);
49
149
  for (auto expr_node : pt_select->selected_exprs()) {
50
149
    QLValue ql_value;
51
52
149
    switch (expr_node->aggregate_opcode()) {
53
0
      case TSOpcode::kNoOp:
54
0
        break;
55
20
      case TSOpcode::kAvg:
56
20
        RETURN_NOT_OK(EvalAvg(row_block, column_index, expr_node->ql_type()->main(),
57
20
                              &ql_value));
58
        // Change type back from MAP to basic type for result of Avg
59
20
        rows_result->set_column_schema(column_index, expr_node->ql_type());
60
20
        break;
61
52
      case TSOpcode::kCount:
62
52
        RETURN_NOT_OK(EvalCount(row_block, column_index, &ql_value));
63
52
        break;
64
26
      case TSOpcode::kMax:
65
26
        RETURN_NOT_OK(EvalMax(row_block, column_index, &ql_value));
66
26
        break;
67
25
      case TSOpcode::kMin:
68
25
        RETURN_NOT_OK(EvalMin(row_block, column_index, &ql_value));
69
25
        break;
70
26
      case TSOpcode::kSum:
71
26
        RETURN_NOT_OK(EvalSum(row_block, column_index, expr_node->ql_type()->main(), &ql_value));
72
26
        break;
73
0
      default:
74
0
        return STATUS(RuntimeError, "Unexpected operator while evaluating aggregate expressions");
75
149
    }
76
77
    // Serialize the return value.
78
149
    ql_value.Serialize(expr_node->ql_type(), rows_result->client(), &buffer);
79
149
    column_index++;
80
149
  }
81
82
  // Change the result set to the aggregate result.
83
2.27k
  rows_result->set_rows_data(buffer.c_str(), buffer.size());
84
2.27k
  return Status::OK();
85
2.27k
}
86
87
Status Executor::EvalCount(const shared_ptr<QLRowBlock>& row_block,
88
                           int column_index,
89
52
                           QLValue *ql_value) {
90
52
  int64_t total_count = 0;
91
146
  for (auto row : row_block->rows()) {
92
146
    if (!row.column(column_index).IsNull()) {
93
      // Summing up the sub-counts from individual partitions.
94
      // For details see DocExprExecutor::EvalTSCall() and DocExprExecutor::EvalCount().
95
146
      total_count += row.column(column_index).int64_value();
96
146
    }
97
146
  }
98
52
  ql_value->set_int64_value(total_count);
99
52
  return Status::OK();
100
52
}
101
102
Status Executor::EvalMax(const shared_ptr<QLRowBlock>& row_block,
103
                         int column_index,
104
26
                         QLValue *ql_value) {
105
65
  for (auto row : row_block->rows()) {
106
65
    if (ql_value->IsNull() ||
107
44
        (!row.column(column_index).IsNull() && *ql_value < row.column(column_index))) {
108
41
      *ql_value = row.column(column_index);
109
41
    }
110
65
  }
111
26
  return Status::OK();
112
26
}
113
114
Status Executor::EvalMin(const shared_ptr<QLRowBlock>& row_block,
115
                         int column_index,
116
25
                         QLValue *ql_value) {
117
62
  for (auto row : row_block->rows()) {
118
62
    if (ql_value->IsNull() ||
119
42
        (!row.column(column_index).IsNull() && *ql_value > row.column(column_index))) {
120
20
      *ql_value = row.column(column_index);
121
20
    }
122
62
  }
123
25
  return Status::OK();
124
25
}
125
126
Status Executor::EvalSum(const shared_ptr<QLRowBlock>& row_block,
127
                         int column_index,
128
                         DataType data_type,
129
26
                         QLValue *ql_value) {
130
  // CQL doesn't return overflow for sum.
131
63
  for (auto row : row_block->rows()) {
132
63
    if (row.column(column_index).IsNull()) {
133
0
      continue;
134
0
    }
135
63
    if (ql_value->IsNull()) {
136
21
      *ql_value = row.column(column_index);
137
21
      continue;
138
21
    }
139
42
    switch (data_type) {
140
0
      case DataType::INT8:
141
0
        ql_value->set_int8_value(ql_value->int8_value() + row.column(column_index).int8_value());
142
0
        break;
143
0
      case DataType::INT16:
144
0
        ql_value->set_int16_value(ql_value->int16_value() + row.column(column_index).int16_value());
145
0
        break;
146
10
      case DataType::INT32:
147
10
        ql_value->set_int32_value(ql_value->int32_value() + row.column(column_index).int32_value());
148
10
        break;
149
8
      case DataType::INT64:
150
8
        ql_value->set_int64_value(ql_value->int64_value() + row.column(column_index).int64_value());
151
8
        break;
152
0
      case DataType::VARINT:
153
0
        ql_value->set_varint_value(ql_value->varint_value() +
154
0
                                   row.column(column_index).varint_value());
155
0
        break;
156
8
      case DataType::FLOAT:
157
8
        ql_value->set_float_value(ql_value->float_value() + row.column(column_index).float_value());
158
8
        break;
159
16
      case DataType::DOUBLE:
160
16
        ql_value->set_double_value(ql_value->double_value() +
161
16
                                   row.column(column_index).double_value());
162
16
        break;
163
0
      case DataType::DECIMAL: {
164
0
        Decimal sum, value;
165
0
        RETURN_NOT_OK(sum.DecodeFromComparable(ql_value->decimal_value()));
166
0
        RETURN_NOT_OK(value.DecodeFromComparable(row.column(column_index).decimal_value()));
167
0
        sum = sum + value;
168
0
        ql_value->set_decimal_value(sum.EncodeToComparable());
169
0
        break;
170
0
      }
171
0
      default:
172
0
        return STATUS(RuntimeError, "Unexpected datatype for argument of SUM()");
173
42
    }
174
42
  }
175
26
  if (ql_value->IsNull()) {
176
5
    switch (data_type) {
177
0
      case DataType::INT8:
178
0
        ql_value->set_int8_value(0);
179
0
        break;
180
0
      case DataType::INT16:
181
0
        ql_value->set_int16_value(0);
182
0
        break;
183
1
      case DataType::INT32:
184
1
        ql_value->set_int32_value(0);
185
1
        break;
186
1
      case DataType::INT64:
187
1
        ql_value->set_int64_value(0);
188
1
        break;
189
0
      case DataType::VARINT: {
190
0
        int64_t tsum;
191
0
        tsum = 0;
192
0
        util::VarInt varint(tsum);
193
0
        ql_value->set_varint_value(varint);
194
0
      }
195
0
        break;
196
1
      case DataType::FLOAT:
197
1
        ql_value->set_float_value(0);
198
1
        break;
199
2
      case DataType::DOUBLE:
200
2
        ql_value->set_double_value(0);
201
2
        break;
202
0
      case DataType::DECIMAL: {
203
0
        Decimal sum;
204
0
        ql_value->set_decimal_value(sum.EncodeToComparable());
205
0
        break;
206
0
      }
207
0
      default:
208
0
        return STATUS(RuntimeError, "Unexpected datatype for argument of SUM()");
209
26
    }
210
26
  }
211
26
  return Status::OK();
212
26
}
213
214
Status Executor::EvalAvg(const shared_ptr<QLRowBlock>& row_block,
215
                         int column_index,
216
                         DataType data_type,
217
20
                         QLValue *ql_value) {
218
20
  QLValue sum, count;
219
220
55
  for (auto row : row_block->rows()) {
221
55
    if (row.column(column_index).IsNull()) {
222
0
      continue;
223
0
    }
224
55
    QLMapValuePB map = row.column(column_index).map_value();
225
55
    if (count.IsNull()) {
226
15
      count = QLValue(map.keys(0));
227
15
      sum = QLValue(map.values(0));
228
15
      continue;
229
15
    }
230
231
40
    count.set_int64_value(count.int64_value() + map.keys(0).int64_value());
232
40
    switch (data_type) {
233
0
      case DataType::INT8:
234
0
        sum.set_int8_value(sum.int8_value() + map.values(0).int8_value());
235
0
        break;
236
0
      case DataType::INT16:
237
0
        sum.set_int16_value(sum.int16_value() + map.values(0).int16_value());
238
0
        break;
239
8
      case DataType::INT32:
240
8
        sum.set_int32_value(sum.int32_value() + map.values(0).int32_value());
241
8
        break;
242
16
      case DataType::INT64:
243
16
        sum.set_int64_value(sum.int64_value() + map.values(0).int64_value());
244
16
        break;
245
0
      case DataType::VARINT:
246
0
        sum.set_varint_value(sum.varint_value() + QLValue(map.values(0)).varint_value());
247
0
        break;
248
8
      case DataType::FLOAT:
249
8
        sum.set_float_value(sum.float_value() + map.values(0).float_value());
250
8
        break;
251
8
      case DataType::DOUBLE:
252
8
        sum.set_double_value(sum.double_value() + map.values(0).double_value());
253
8
        break;
254
0
      default:
255
0
        return STATUS(RuntimeError, "Unexpected datatype for argument of AVG()");
256
40
    }
257
40
  }
258
259
20
  switch (data_type) {
260
0
    case DataType::INT8:
261
0
      ql_value->set_int8_value(sum.IsNull() ? 0 : sum.int8_value() / count.int64_value());
262
0
      break;
263
0
    case DataType::INT16:
264
0
      ql_value->set_int16_value(sum.IsNull() ? 0 : sum.int16_value() / count.int64_value());
265
0
      break;
266
4
    case DataType::INT32:
267
3
      ql_value->set_int32_value(sum.IsNull() ? 0 : sum.int32_value() / count.int64_value());
268
4
      break;
269
8
    case DataType::INT64:
270
6
      ql_value->set_int64_value(sum.IsNull() ? 0 : sum.int64_value() / count.int64_value());
271
8
      break;
272
0
    case DataType::VARINT:
273
0
      if (sum.IsNull()) {
274
0
        util::VarInt varint(0);
275
0
        ql_value->set_varint_value(varint);
276
0
      } else {
277
0
        int64_t tsum = VERIFY_RESULT(sum.varint_value().ToInt64());
278
0
        int64_t tcount = VERIFY_RESULT(count.varint_value().ToInt64());
279
0
        util::VarInt average(tsum / tcount);
280
0
        ql_value->set_varint_value(average);
281
0
      }
282
0
      break;
283
4
    case DataType::FLOAT:
284
3
      ql_value->set_float_value(sum.IsNull() ? 0 :sum.float_value() / count.int64_value());
285
4
      break;
286
4
    case DataType::DOUBLE:
287
3
      ql_value->set_double_value(sum.IsNull() ? 0 :sum.double_value() / count.int64_value());
288
4
      break;
289
0
    default:
290
0
      return STATUS(RuntimeError, "Unexpected datatype for argument of AVG()");
291
20
  }
292
20
  return Status::OK();
293
20
}
294
295
}  // namespace ql
296
}  // namespace yb