YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/ql/audit/audit_logger.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/cql/ql/audit/audit_logger.h"
16
17
#include <boost/algorithm/string.hpp>
18
#include <boost/optional/optional_io.hpp>
19
#include <boost/preprocessor/seq/for_each.hpp>
20
21
#include "yb/rpc/connection.h"
22
23
#include "yb/util/date_time.h"
24
#include "yb/util/flag_tags.h"
25
#include "yb/util/result.h"
26
#include "yb/util/string_util.h"
27
28
#include "yb/yql/cql/ql/ptree/pt_alter_keyspace.h"
29
#include "yb/yql/cql/ql/ptree/pt_alter_table.h"
30
#include "yb/yql/cql/ql/ptree/pt_create_index.h"
31
#include "yb/yql/cql/ql/ptree/pt_create_keyspace.h"
32
#include "yb/yql/cql/ql/ptree/pt_create_table.h"
33
#include "yb/yql/cql/ql/ptree/pt_create_type.h"
34
#include "yb/yql/cql/ql/ptree/pt_delete.h"
35
#include "yb/yql/cql/ql/ptree/pt_drop.h"
36
#include "yb/yql/cql/ql/ptree/pt_explain.h"
37
#include "yb/yql/cql/ql/ptree/pt_grant_revoke.h"
38
#include "yb/yql/cql/ql/ptree/pt_select.h"
39
#include "yb/yql/cql/ql/ptree/pt_truncate.h"
40
#include "yb/yql/cql/ql/ptree/pt_use_keyspace.h"
41
#include "yb/yql/cql/ql/util/ql_env.h"
42
#include "yb/yql/cql/ql/util/statement_result.h"
43
44
DEFINE_bool(ycql_enable_audit_log,
45
            false,
46
            "Enable YCQL audit. Use ycql_audit_* flags for fine-grained configuration");
47
TAG_FLAG(ycql_enable_audit_log, runtime);
48
49
// IMPORTANT:
50
// These flags are expected to change at runtime, but due to the nature of std::string we shouldn't
51
// access them directly as a concurrent read-write access would lead to an undefined behaviour.
52
// Instead, use GFLAGS_NAMESPACE::GetCommandLineOption("flag_name", &result)
53
54
DEFINE_string(ycql_audit_log_level,
55
              "ERROR",
56
              "Severity level at which an audit will be logged. Could be INFO, WARNING, or ERROR");
57
TAG_FLAG(ycql_audit_log_level, runtime);
58
59
DEFINE_string(ycql_audit_included_keyspaces,
60
              "",
61
              "Comma separated list of keyspaces to be included in the audit log, "
62
              "if none - includes all (non-excluded) keyspaces");
63
TAG_FLAG(ycql_audit_included_keyspaces, runtime);
64
65
DEFINE_string(ycql_audit_excluded_keyspaces,
66
              "system,system_schema,system_virtual_schema,system_auth",
67
              "Comma separated list of keyspaces to be excluded from the audit log");
68
TAG_FLAG(ycql_audit_excluded_keyspaces, runtime);
69
70
DEFINE_string(ycql_audit_included_categories,
71
              "",
72
              "Comma separated list of categories to be included in the audit log, "
73
              "if none - includes all (non-excluded) categories");
74
TAG_FLAG(ycql_audit_included_categories, runtime);
75
76
DEFINE_string(ycql_audit_excluded_categories,
77
              "",
78
              "Comma separated list of categories to be excluded from the audit log");
79
TAG_FLAG(ycql_audit_excluded_categories, runtime);
80
81
DEFINE_string(ycql_audit_included_users,
82
              "",
83
              "Comma separated list of users to be included in the audit log, "
84
              "if none - includes all (non-excluded) users");
85
TAG_FLAG(ycql_audit_included_users, runtime);
86
87
DEFINE_string(ycql_audit_excluded_users,
88
              "",
89
              "Comma separated list of users to be excluded from the audit log");
90
TAG_FLAG(ycql_audit_excluded_users, runtime);
91
92
93
namespace yb {
94
namespace ql {
95
namespace audit {
96
97
using boost::optional;
98
99
//
100
// Entities
101
//
102
103
namespace {
104
105
// Audit categories are modelled after Apache Cassandra 4.0-beta2.
106
YB_DEFINE_ENUM(Category, (QUERY)(DML)(DDL)(DCL)(AUTH)(PREPARE)(ERROR)(OTHER))
107
108
} // anonymous namespace
109
110
// Audit types are modelled after Apache Cassandra 4.0-beta2.
111
#define YCQL_AUDIT_TYPES \
112
    ((SELECT, QUERY)) \
113
    ((UPDATE, DML)) /* Includes INSERT. */ \
114
    ((DELETE, DML)) \
115
    ((BATCH, DML)) \
116
    \
117
    /* DDL */ \
118
    \
119
    ((TRUNCATE, DDL)) \
120
    ((CREATE_KEYSPACE, DDL)) \
121
    ((ALTER_KEYSPACE, DDL)) \
122
    ((DROP_KEYSPACE, DDL)) \
123
    ((CREATE_TABLE, DDL)) \
124
    ((DROP_TABLE, DDL)) \
125
    ((CREATE_INDEX, DDL)) \
126
    ((DROP_INDEX, DDL)) \
127
    ((CREATE_TYPE, DDL)) \
128
    ((ALTER_TABLE, DDL)) \
129
    ((DROP_TYPE, DDL)) \
130
    ((ALTER_TYPE, DDL)) \
131
    \
132
    /* DCL */ \
133
    \
134
    ((GRANT, DCL)) \
135
    ((REVOKE, DCL)) \
136
    ((CREATE_ROLE, DCL)) \
137
    ((DROP_ROLE, DCL)) \
138
    ((ALTER_ROLE, DCL)) \
139
    \
140
    /* AUTH */ \
141
    \
142
    ((LOGIN_ERROR, AUTH)) \
143
    ((UNAUTHORIZED_ATTEMPT, AUTH)) \
144
    ((LOGIN_SUCCESS, AUTH)) \
145
    \
146
    /* Misc */ \
147
    \
148
    ((PREPARE_STATEMENT, PREPARE)) \
149
    ((REQUEST_FAILURE, ERROR)) \
150
    ((USE_KEYSPACE, OTHER)) \
151
    ((DESCRIBE, OTHER)) \
152
    ((EXPLAIN, OTHER)) \
153
    /**/
154
155
#define DECLARE_YCQL_AUDIT_TYPE(type_name, category_name) \
156
    static const Type type_name; \
157
    /**/
158
159
#define YCQL_FORWARD_MACRO(r, data, tuple) data tuple
160
161
// Audit type with category, an enum-like class.
162
class Type {
163
 public:
164
  BOOST_PP_SEQ_FOR_EACH(YCQL_FORWARD_MACRO, DECLARE_YCQL_AUDIT_TYPE, YCQL_AUDIT_TYPES)
165
166
  const std::string name_;
167
  const Category    category_;
168
169
 private:
170
  explicit Type(std::string name, Category category)
171
177k
      : name_(std::move(name)), category_(category) {
172
177k
  }
173
};
174
175
#define DEFINE_YCQL_AUDIT_TYPE(type_name, category_name) \
176
    const Type Type::type_name = Type(BOOST_PP_STRINGIZE(type_name), Category::category_name); \
177
    /**/
178
179
BOOST_PP_SEQ_FOR_EACH(YCQL_FORWARD_MACRO, DEFINE_YCQL_AUDIT_TYPE, YCQL_AUDIT_TYPES)
180
181
struct LogEntry {
182
  // Username of the currently active user, special case is "anonymous" user. None if not logged in.
183
  // Note that we can't distinguish between anonymous user and user named "anonymous",
184
  // but that's expected.
185
  optional<std::string> user;
186
187
  // Host (current node) endpoint.
188
  const Endpoint& host;
189
190
  // Source (CQL client) endpoint.
191
  const Endpoint& source;
192
193
  // Node local timestamp, unrelated to transaction read point.
194
  const Timestamp timestamp;
195
196
  // Audit log entry type (and category). Cannot be null.
197
  const Type* type;
198
199
  // Batch ID, for driver-level batch requests only.
200
  const std::string batch_id;
201
202
  // Keyspace, if applicable, empty string if none.
203
  const std::string keyspace;
204
205
  // Scope, if applicable, empty string if none. Exact meaning of "scope" depends on an operation.
206
  const std::string scope;
207
208
  // CQL being executed, or login status.
209
  std::string operation;
210
211
  // Operaion error message, empty string if none.
212
  std::string error_message;
213
214
0
  std::string ToString() const {
215
0
    return Format("{ user: $0, host: $1, source: $2, timestamp: $3, type: $4, category: $5,"
216
0
                  " keyspace: $6, scope: $7, operation: $8, error_message: $9 }",
217
0
                  user, host, source, timestamp, type->name_, type->category_,
218
0
                  keyspace, scope, operation, error_message);
219
0
  }
220
};
221
222
223
//
224
// Local functions
225
//
226
227
namespace {
228
229
// * Splits a comma-separated gflag list
230
// * Trims each string
231
// * Uppercases them (using the fact that CQL's keyspaces and users are case-insensitive)
232
// * Filters out empty strings
233
397
std::unordered_set<std::string> SplitGflagList(const std::string& gflag) {
234
397
  std::vector<std::string> split;
235
397
  boost::split(split, gflag, boost::is_any_of(","), boost::token_compress_on);
236
1.50k
  for (auto& s : split) {
237
1.50k
    boost::algorithm::trim(s);
238
1.50k
    boost::algorithm::to_upper(s);
239
1.50k
  }
240
1.49k
  split.erase(std::remove_if(split.begin(), split.end(), [](auto& s){ return s.empty(); }),
241
397
              split.end());
242
397
  return std::unordered_set<std::string>(split.begin(), split.end());
243
397
}
244
245
template<typename T>
246
12.2k
bool Contains(const std::unordered_set<T>& set, const T& value) {
247
12.2k
  return set.count(value) != 0;
248
12.2k
}
249
250
// Return an audit log type for a tree node, or nullptr if a node can't be audited.
251
// Also sets an operation keyspace and scope, if applicable, following Cassandra's behaviour.
252
const Type* GetAuditLogTypeOption(const TreeNode& tnode,
253
                                  std::string* keyspace,
254
12.7k
                                  std::string* scope) {
255
  // FIXME: DESCRIBE and LIST <...> are client-only operations and are not audited!
256
12.7k
  switch (tnode.opcode()) {
257
5.16k
    case TreeNodeOpcode::kPTSelectStmt: {
258
5.16k
      const auto& cast_node = static_cast<const PTSelectStmt&>(tnode);
259
5.16k
      *keyspace = cast_node.table_name().namespace_name();
260
5.16k
      *scope    = cast_node.table_name().table_name();
261
5.16k
      return &Type::SELECT;
262
0
    }
263
1.73k
    case TreeNodeOpcode::kPTInsertStmt:
264
3.24k
    case TreeNodeOpcode::kPTUpdateStmt: {
265
3.24k
      const auto& cast_node = static_cast<const PTDmlStmt&>(tnode);
266
3.24k
      *keyspace = cast_node.table_name().namespace_name();
267
3.24k
      *scope    = cast_node.table_name().table_name();
268
3.24k
      return &Type::UPDATE;
269
1.73k
    }
270
107
    case TreeNodeOpcode::kPTDeleteStmt: {
271
107
      const auto& cast_node = static_cast<const PTDeleteStmt&>(tnode);
272
107
      *keyspace = cast_node.table_name().namespace_name();
273
107
      *scope    = cast_node.table_name().table_name();
274
107
      return &Type::DELETE;
275
1.73k
    }
276
2.91k
    case TreeNodeOpcode::kPTTruncateStmt: {
277
2.91k
      const auto& cast_node = static_cast<const PTTruncateStmt&>(tnode);
278
2.91k
      *keyspace = cast_node.yb_table_name().namespace_name();
279
2.91k
      *scope    = cast_node.yb_table_name().table_name();
280
2.91k
      return &Type::TRUNCATE;
281
1.73k
    }
282
26
    case TreeNodeOpcode::kPTCreateKeyspace: {
283
26
      const auto& cast_node = static_cast<const PTCreateKeyspace&>(tnode);
284
26
      *keyspace = cast_node.name();
285
26
      return &Type::CREATE_KEYSPACE;
286
1.73k
    }
287
0
    case TreeNodeOpcode::kPTAlterKeyspace: {
288
0
      const auto& cast_node = static_cast<const PTAlterKeyspace&>(tnode);
289
0
      *keyspace = cast_node.name();
290
0
      return &Type::ALTER_KEYSPACE;
291
1.73k
    }
292
89
    case TreeNodeOpcode::kPTUseKeyspace: {
293
89
      const auto& cast_node = static_cast<const PTUseKeyspace&>(tnode);
294
89
      *keyspace = cast_node.name();
295
89
      return &Type::USE_KEYSPACE;
296
1.73k
    }
297
199
    case TreeNodeOpcode::kPTDropStmt: {
298
199
      const auto& cast_node = static_cast<const PTDropStmt&>(tnode);
299
      // We only expect a handful of types here, same as Executor::ExecPTNode(const PTDropStmt*)
300
199
      switch (cast_node.drop_type()) {
301
33
        case ObjectType::SCHEMA:
302
33
          *keyspace = cast_node.name()->last_name().data();
303
33
          return &Type::DROP_KEYSPACE;
304
89
        case ObjectType::INDEX:
305
89
          *keyspace = cast_node.yb_table_name().namespace_name();
306
89
          *scope    = cast_node.yb_table_name().table_name();
307
89
          return &Type::DROP_INDEX;
308
15
        case ObjectType::ROLE:
309
15
          return &Type::DROP_ROLE;
310
62
        case ObjectType::TABLE:
311
62
          *keyspace = cast_node.yb_table_name().namespace_name();
312
62
          *scope    = cast_node.yb_table_name().table_name();
313
62
          return &Type::DROP_TABLE;
314
0
        case ObjectType::TYPE:
315
0
          *keyspace = cast_node.name()->first_name().data();
316
0
          *scope    = cast_node.name()->last_name().data();
317
0
          return &Type::DROP_TYPE;
318
0
        default:
319
0
          return nullptr;
320
0
      }
321
0
    }
322
54
    case TreeNodeOpcode::kPTCreateTable: {
323
54
      const auto& cast_node = static_cast<const PTCreateTable&>(tnode);
324
54
      *keyspace = cast_node.yb_table_name().namespace_name();
325
54
      *scope    = cast_node.yb_table_name().table_name();
326
54
      return &Type::CREATE_TABLE;
327
0
    }
328
130
    case TreeNodeOpcode::kPTCreateIndex: {
329
130
      const auto& cast_node = static_cast<const PTCreateIndex&>(tnode);
330
130
      *keyspace = cast_node.yb_table_name().namespace_name();
331
130
      *scope    = cast_node.yb_table_name().table_name();
332
130
      return &Type::CREATE_INDEX;
333
0
    }
334
18
    case TreeNodeOpcode::kPTAlterTable: {
335
18
      const auto& cast_node = static_cast<const PTAlterTable&>(tnode);
336
18
      *keyspace = cast_node.yb_table_name().namespace_name();
337
18
      *scope    = cast_node.yb_table_name().table_name();
338
18
      return &Type::ALTER_TABLE;
339
0
    }
340
4
    case TreeNodeOpcode::kPTCreateRole: {
341
      // Scope is not used.
342
4
      return &Type::CREATE_ROLE;
343
0
    }
344
1
    case TreeNodeOpcode::kPTAlterRole: {
345
      // Scope is not used.
346
1
      return &Type::ALTER_ROLE;
347
0
    }
348
0
    case TreeNodeOpcode::kPTCreateType: {
349
0
      const auto& cast_node = static_cast<const PTCreateType&>(tnode);
350
0
      *keyspace = cast_node.yb_type_name().namespace_name();
351
0
      *scope    = cast_node.yb_type_name().table_name();
352
0
      return &Type::CREATE_TYPE;
353
0
    }
354
3
    case TreeNodeOpcode::kPTGrantRevokePermission: {
355
3
      const auto& cast_node = static_cast<const PTGrantRevokePermission&>(tnode);
356
      // Scope hierarchy for GRANT/REVOKE is a bit unusual, see DataResource and RoleResource
357
      // classes in Cassandra.
358
3
      switch (cast_node.resource_type()) {
359
0
        case ALL_KEYSPACES:
360
0
          *keyspace = "data";
361
0
          *scope    = "data";
362
0
          break;
363
3
        case KEYSPACE:
364
3
          *keyspace = "data";
365
3
          *scope    = Format("data/$0", cast_node.namespace_name());
366
3
          break;
367
0
        case TABLE:
368
0
          *keyspace = Format("data/$0",    cast_node.namespace_name());
369
0
          *scope    = Format("data/$0/$1", cast_node.namespace_name(), cast_node.resource_name());
370
0
          break;
371
0
        case ALL_ROLES:
372
0
          *keyspace = "roles";
373
0
          *scope    = "roles";
374
0
          break;
375
0
        case ROLE:
376
0
          *keyspace = "roles";
377
0
          *scope    = Format("roles/$0", cast_node.resource_name());
378
0
          break;
379
3
      }
380
3
      switch (cast_node.statement_type()) {
381
3
        case client::GrantRevokeStatementType::GRANT:
382
3
          return &Type::GRANT;
383
0
        case client::GrantRevokeStatementType::REVOKE:
384
0
          return &Type::REVOKE;
385
0
      }
386
0
      FATAL_INVALID_ENUM_VALUE(client::GrantRevokeStatementType, cast_node.statement_type());
387
0
    }
388
0
    case TreeNodeOpcode::kPTGrantRevokeRole: {
389
0
      const auto& cast_node = static_cast<const PTGrantRevokeRole&>(tnode);
390
      // Scope is not used.
391
0
      switch (cast_node.statement_type()) {
392
0
        case client::GrantRevokeStatementType::GRANT:
393
0
          return &Type::GRANT;
394
0
        case client::GrantRevokeStatementType::REVOKE:
395
0
          return &Type::REVOKE;
396
0
      }
397
0
      FATAL_INVALID_ENUM_VALUE(client::GrantRevokeStatementType, cast_node.statement_type());
398
0
    }
399
400
681
    case TreeNodeOpcode::kPTListNode: {
401
      // As per YCQL grammar, list node can only be a DML batch.
402
      // Everything from PTStartTransaction to PTCommit (inclusive) is packed to PTListNode.
403
681
      return &Type::BATCH;
404
0
    }
405
406
1
    case TreeNodeOpcode::kPTStartTransaction: {
407
      // This will usually be inside PTListNode, but we can explicitly issue START TRANSACTION
408
      // from YCQL driver.
409
1
      return &Type::BATCH;
410
0
    }
411
412
1
    case TreeNodeOpcode::kPTCommit: {
413
      // This will usually be inside PTListNode, but we can explicitly issue COMMIT
414
      // from YCQL driver.
415
      // Also, standalone COMMIT is permitted even from ysqlsh (does nothing).
416
1
      return &Type::BATCH;
417
0
    }
418
419
147
    case TreeNodeOpcode::kPTExplainStmt: {
420
147
      const auto& cast_node = static_cast<const PTExplainStmt&>(tnode);
421
147
      const auto& sub_stmt  = static_cast<const PTDmlStmt&>(*cast_node.stmt());
422
147
      *keyspace = sub_stmt.table_name().namespace_name();
423
147
      *scope    = sub_stmt.table_name().table_name();
424
147
      return &Type::EXPLAIN;
425
0
    }
426
427
0
    case TreeNodeOpcode::kPTName:
428
0
    case TreeNodeOpcode::kPTProperty:
429
0
    case TreeNodeOpcode::kPTStatic:
430
0
    case TreeNodeOpcode::kPTConstraint:
431
0
    case TreeNodeOpcode::kPTCollection:
432
0
    case TreeNodeOpcode::kPTPrimitiveType:
433
0
    case TreeNodeOpcode::kPTColumnDefinition:
434
0
    case TreeNodeOpcode::kPTAlterColumnDefinition:
435
0
    case TreeNodeOpcode::kPTDmlUsingClauseElement:
436
0
    case TreeNodeOpcode::kPTTableRef:
437
0
    case TreeNodeOpcode::kPTOrderBy:
438
0
    case TreeNodeOpcode::kPTRoleOption:
439
0
    case TreeNodeOpcode::kPTInsertValuesClause:
440
0
    case TreeNodeOpcode::kPTInsertJsonClause:
441
0
    case TreeNodeOpcode::kPTExpr:
442
0
    case TreeNodeOpcode::kPTRef:
443
0
    case TreeNodeOpcode::kPTSubscript:
444
0
    case TreeNodeOpcode::kPTAllColumns:
445
0
    case TreeNodeOpcode::kPTAssign:
446
0
    case TreeNodeOpcode::kPTBindVar:
447
0
    case TreeNodeOpcode::kPTJsonOp:
448
0
    case TreeNodeOpcode::kPTTypeField:
449
0
    case TreeNodeOpcode::kNoOp:
450
0
      return nullptr;
451
452
0
  }
453
0
  FATAL_INVALID_ENUM_VALUE(TreeNodeOpcode, tnode.opcode());
454
0
}
455
456
// Replace sensitive information in a CQL command string with <REDACTED> placeholders.
457
// We only do this for CREATE/ALTER ROLE.
458
13.3k
std::string ObfuscateOperation(const TreeNode& tnode, const std::string& operation) {
459
13.3k
  if (tnode.opcode() != TreeNodeOpcode::kPTCreateRole &&
460
13.3k
      tnode.opcode() != TreeNodeOpcode::kPTAlterRole) {
461
13.3k
    return operation;
462
13.3k
  }
463
464
13
  static const auto replacement = "<REDACTED>";
465
  // Using somewhat tricky code to account for escaped quotes ('') in a password.
466
  // We replace an entire string, including quotes.
467
13
  static const std::regex pwd_start_regex("password[\\s]*=[\\s]*'", std::regex_constants::icase);
468
13
  std::smatch m;
469
13
  if (!regex_search(operation, m, pwd_start_regex)) {
470
1
    return operation;
471
1
  }
472
12
  size_t pwd_start_idx = m.position() + m.length() - 1;
473
12
  ssize_t pwd_length = -1;
474
38
  for (auto i = pwd_start_idx + 1; i < operation.length(); ++i) {
475
30
    if (operation[i] == '\'') {
476
      // If the next character is a quote too - this is an escaped quote.
477
4
      if (i < operation.length() - 1 && operation[i + 1] == '\'') {
478
0
        ++i; // Skip both quotes.
479
4
      } else {
480
4
        pwd_length = i - pwd_start_idx + 1;
481
4
        break;
482
4
      }
483
4
    }
484
30
  }
485
12
  if (pwd_length == -1) {
486
0
    return operation;
487
0
  }
488
12
  std::string copy(operation);
489
12
  copy.replace(pwd_start_idx, pwd_length, replacement);
490
12
  return copy;
491
12
}
492
493
// Follows Cassandra's view format for prettified binary log.
494
10.7k
CHECKED_STATUS AddLogEntry(const LogEntry& e) {
495
10.7k
  std::string str;
496
10.7k
  str.reserve(512); // Some reasonable default that's expected to fit most of the audit records.
497
10.7k
  str.append("AUDIT: ");
498
10.7k
  str.append("user:");
499
10.7k
  str.append(e.user ? *e.user : "null");
500
10.7k
  str.append("|host:");
501
10.7k
  str.append(AsString(e.host));
502
10.7k
  str.append("|source:");
503
10.7k
  str.append(AsString(e.source.address()));
504
10.7k
  str.append("|port:");
505
10.7k
  str.append(AsString(e.source.port()));
506
10.7k
  str.append("|timestamp:");
507
10.7k
  str.append(std::to_string(e.timestamp.ToInt64() / 1000));
508
10.7k
  str.append("|type:");
509
10.7k
  str.append(e.type->name_);
510
10.7k
  str.append("|category:");
511
10.7k
  str.append(AsString(e.type->category_));
512
10.7k
  if (!e.batch_id.empty()) {
513
10
    str.append("|batch:");
514
10
    str.append(e.batch_id);
515
10
  }
516
10.7k
  if (!e.keyspace.empty()) {
517
9.37k
    str.append("|ks:");
518
9.37k
    str.append(e.keyspace);
519
9.37k
  }
520
10.7k
  if (!e.scope.empty()) {
521
9.24k
    str.append("|scope:");
522
9.24k
    str.append(e.scope);
523
9.24k
  }
524
10.7k
  str.append("|operation:");
525
10.7k
  str.append(e.operation);
526
10.7k
  if (!e.error_message.empty()) {
527
644
    str.append("; ");
528
644
    str.append(e.error_message);
529
644
  }
530
  // Since glog uses macros, it's not too convenient to extract a log level.
531
10.7k
  auto severity = boost::algorithm::to_lower_copy(FLAGS_ycql_audit_log_level);
532
10.7k
  if (severity == "info") {
533
4
    LOG(INFO) << str;
534
4
    return Status::OK();
535
10.7k
  } else if (severity == "warning") {
536
1
    LOG(WARNING) << str;
537
1
    return Status::OK();
538
10.7k
  } else if (severity == "error") {
539
10.7k
    LOG(ERROR) << str;
540
10.7k
    return Status::OK();
541
0
  } else {
542
0
    return STATUS_FORMAT(InvalidArgument,
543
0
                         "$0 is not a valid severity level",
544
0
                         FLAGS_ycql_audit_log_level);
545
0
  }
546
10.7k
}
547
548
} // anonymous namespace
549
550
//
551
// AuditLogger class definitions
552
//
553
554
17.1k
AuditLogger::AuditLogger(const QLEnv& ql_env) : ql_env_(ql_env) {
555
17.1k
}
556
557
template<class Pred>
558
bool AuditLogger::SatisfiesGFlag(const LogEntry& e,
559
                                 const std::string& gflag_name,
560
67.0k
                                 const Pred& predicate) {
561
67.0k
  std::string gflag_value;
562
67.0k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
67.0k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
67.0k
  auto& cached = gflags_cache_[gflag_name];
570
67.0k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
397
    cached.first  = gflag_value;
574
397
    cached.second = SplitGflagList(gflag_value);
575
397
  }
576
67.0k
  const auto& split = cached.second;
577
578
67.0k
  if (!split.empty() && !predicate(split, e)) {
579
85
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
85
            << ", flag: " << gflag_name << " = " << gflag_value;
581
2.80k
    return false;
582
2.80k
  }
583
64.2k
  return true;
584
64.2k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_0EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
11.9k
                                 const Pred& predicate) {
561
11.9k
  std::string gflag_value;
562
11.9k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
11.9k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
11.9k
  auto& cached = gflags_cache_[gflag_name];
570
11.9k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
12
    cached.first  = gflag_value;
574
12
    cached.second = SplitGflagList(gflag_value);
575
12
  }
576
11.9k
  const auto& split = cached.second;
577
578
11.9k
  if (!split.empty() && !predicate(split, e)) {
579
2
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
2
            << ", flag: " << gflag_name << " = " << gflag_value;
581
78
    return false;
582
78
  }
583
11.8k
  return true;
584
11.8k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_1EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
12.0k
                                 const Pred& predicate) {
561
12.0k
  std::string gflag_value;
562
12.0k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
12.0k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
12.0k
  auto& cached = gflags_cache_[gflag_name];
570
12.0k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
363
    cached.first  = gflag_value;
574
363
    cached.second = SplitGflagList(gflag_value);
575
363
  }
576
12.0k
  const auto& split = cached.second;
577
578
12.0k
  if (!split.empty() && !predicate(split, e)) {
579
83
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
83
            << ", flag: " << gflag_name << " = " << gflag_value;
581
2.69k
    return false;
582
2.69k
  }
583
9.39k
  return true;
584
9.39k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_2EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
10.7k
                                 const Pred& predicate) {
561
10.7k
  std::string gflag_value;
562
10.7k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
10.7k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
10.7k
  auto& cached = gflags_cache_[gflag_name];
570
10.7k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
5
    cached.first  = gflag_value;
574
5
    cached.second = SplitGflagList(gflag_value);
575
5
  }
576
10.7k
  const auto& split = cached.second;
577
578
10.7k
  if (!split.empty() && !predicate(split, e)) {
579
0
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
0
            << ", flag: " << gflag_name << " = " << gflag_value;
581
5
    return false;
582
5
  }
583
10.7k
  return true;
584
10.7k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_3EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
10.7k
                                 const Pred& predicate) {
561
10.7k
  std::string gflag_value;
562
10.7k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
10.7k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
10.7k
  auto& cached = gflags_cache_[gflag_name];
570
10.7k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
2
    cached.first  = gflag_value;
574
2
    cached.second = SplitGflagList(gflag_value);
575
2
  }
576
10.7k
  const auto& split = cached.second;
577
578
10.7k
  if (!split.empty() && !predicate(split, e)) {
579
0
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
0
            << ", flag: " << gflag_name << " = " << gflag_value;
581
1
    return false;
582
1
  }
583
10.7k
  return true;
584
10.7k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_4EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
10.7k
                                 const Pred& predicate) {
561
10.7k
  std::string gflag_value;
562
10.7k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
10.7k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
10.7k
  auto& cached = gflags_cache_[gflag_name];
570
10.7k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
9
    cached.first  = gflag_value;
574
9
    cached.second = SplitGflagList(gflag_value);
575
9
  }
576
10.7k
  const auto& split = cached.second;
577
578
10.7k
  if (!split.empty() && !predicate(split, e)) {
579
0
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
0
            << ", flag: " << gflag_name << " = " << gflag_value;
581
18
    return false;
582
18
  }
583
10.7k
  return true;
584
10.7k
}
audit_logger.cc:_ZN2yb2ql5audit11AuditLogger14SatisfiesGFlagIZNS2_14ShouldBeLoggedERKNS1_8LogEntryEE3$_5EEbS6_RKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEERKT_
Line
Count
Source
560
10.7k
                                 const Pred& predicate) {
561
10.7k
  std::string gflag_value;
562
10.7k
  bool found = GFLAGS_NAMESPACE::GetCommandLineOption(gflag_name.c_str(), &gflag_value);
563
10.7k
  if (!found) {
564
    // This should never happen as we use a compile-time check in a macro.
565
0
    LOG(DFATAL) << "Gflag " << gflag_name << " does not exist!";
566
0
    return false;
567
0
  }
568
569
10.7k
  auto& cached = gflags_cache_[gflag_name];
570
10.7k
  if (cached.first != gflag_value) {
571
0
    VLOG(2) << "Audit flag " << gflag_name << " = " << gflag_value
572
0
            << " cache was invalid, old value = " << cached.first;
573
6
    cached.first  = gflag_value;
574
6
    cached.second = SplitGflagList(gflag_value);
575
6
  }
576
10.7k
  const auto& split = cached.second;
577
578
10.7k
  if (!split.empty() && !predicate(split, e)) {
579
0
    VLOG(1) << "Filtered out audit record: " << e.ToString()
580
0
            << ", flag: " << gflag_name << " = " << gflag_value;
581
6
    return false;
582
6
  }
583
10.7k
  return true;
584
10.7k
}
585
586
// Helper macro to avoid boilerplate when using SatisfiesGFlag function.
587
// Returns false if the given predicate is not satisfied.
588
#define RETURN_IF_NOT_SATISFIES_GFLAG(gflag, entry, predicate_on_split_and_e) \
589
66.7k
    static_assert(std::is_same<decltype(BOOST_PP_CAT(FLAGS_, gflag)), std::string&>::value, \
590
66.7k
                  "Flag " BOOST_PP_STRINGIZE(gflag) " must be string"); \
591
66.7k
    \
592
66.7k
    if (!SatisfiesGFlag(e, BOOST_PP_STRINGIZE(gflag), \
593
12.2k
                        [](const GflagListValue& split, const LogEntry& e) { \
594
84
                          return predicate_on_split_and_e; \
595
2.77k
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_0clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
98
                        [](const GflagListValue& split, const LogEntry& e) { \
594
98
                          return predicate_on_split_and_e; \
595
98
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_1clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
12.0k
                        [](const GflagListValue& split, const LogEntry& e) { \
594
12.0k
                          return predicate_on_split_and_e; \
595
12.0k
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_2clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
7
                        [](const GflagListValue& split, const LogEntry& e) { \
594
7
                          return predicate_on_split_and_e; \
595
7
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_3clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
2
                        [](const GflagListValue& split, const LogEntry& e) { \
594
2
                          return predicate_on_split_and_e; \
595
2
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_4clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
30
                        [](const GflagListValue& split, const LogEntry& e) { \
594
60
                          return predicate_on_split_and_e; \
595
30
                        })) { \
audit_logger.cc:_ZZN2yb2ql5audit11AuditLogger14ShouldBeLoggedERKNS1_8LogEntryEENK3$_5clERKNSt3__113unordered_setINS7_12basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEENS7_4hashISE_EENS7_8equal_toISE_EENSC_ISE_EEEES5_
Line
Count
Source
593
12
                        [](const GflagListValue& split, const LogEntry& e) { \
594
24
                          return predicate_on_split_and_e; \
595
12
                        })) { \
596
2.77k
      return false; \
597
2.77k
    } \
598
    /**/
599
600
13.4k
bool AuditLogger::ShouldBeLogged(const LogEntry& e) {
601
  // If a keyspace isn't present, it's not used for filtering.
602
13.4k
  if (!e.keyspace.empty()) {
603
12.0k
    RETURN_IF_NOT_SATISFIES_GFLAG(
604
12.0k
        ycql_audit_included_keyspaces, e,
605
12.0k
        Contains(split, boost::algorithm::to_upper_copy(e.keyspace)))
606
11.9k
    RETURN_IF_NOT_SATISFIES_GFLAG(
607
11.9k
        ycql_audit_excluded_keyspaces, e,
608
11.9k
        !Contains(split, boost::algorithm::to_upper_copy(e.keyspace)))
609
11.9k
  }
610
611
10.6k
  RETURN_IF_NOT_SATISFIES_GFLAG(
612
10.6k
      ycql_audit_included_categories, e,
613
10.6k
      Contains(split, AsString(e.type->category_)))
614
10.6k
  RETURN_IF_NOT_SATISFIES_GFLAG(
615
10.6k
      ycql_audit_excluded_categories, e,
616
10.6k
      !Contains(split, AsString(e.type->category_)))
617
618
  // Include filter always removes userless entries.
619
10.6k
  RETURN_IF_NOT_SATISFIES_GFLAG(
620
10.6k
      ycql_audit_included_users, e,
621
10.6k
      e.user && Contains(split, boost::algorithm::to_upper_copy(*e.user)))
622
10.6k
  RETURN_IF_NOT_SATISFIES_GFLAG(
623
10.6k
      ycql_audit_excluded_users, e,
624
10.6k
      !e.user || !Contains(split, boost::algorithm::to_upper_copy(*e.user)))
625
626
10.6k
  return true;
627
10.6k
}
628
629
Result<LogEntry> AuditLogger::CreateLogEntry(const Type& type,
630
                                             std::string keyspace,
631
                                             std::string scope,
632
                                             std::string operation,
633
13.5k
                                             std::string error_message) {
634
13.5k
  SCHECK(conn_ != nullptr, InternalError, "Connection is not initialized");
635
13.5k
  auto curr_user = ql_env_.CurrentRoleName();
636
13.5k
  if (curr_user.empty()) {
637
13.0k
    curr_user = "anonymous";
638
13.0k
  }
639
13.5k
  auto entry = LogEntry {
640
13.5k
    .user          = std::move(curr_user),
641
13.5k
    .host          = conn_->local(),
642
13.5k
    .source        = conn_->remote(),
643
13.5k
    .timestamp     = DateTime::TimestampNow(),
644
13.5k
    .type          = &type,
645
13.5k
    .batch_id      = batch_id_,
646
13.5k
    .keyspace      = std::move(keyspace),
647
13.5k
    .scope         = std::move(scope),
648
13.5k
    .operation     = std::move(operation),
649
13.5k
    .error_message = std::move(error_message)
650
13.5k
  };
651
13.5k
  return entry;
652
13.5k
}
653
654
Status AuditLogger::StartBatchRequest(size_t statements_count,
655
1.66k
                                      IsRescheduled is_rescheduled) {
656
1.66k
  if (!FLAGS_ycql_enable_audit_log || !conn_) {
657
1.65k
    return Status::OK();
658
1.65k
  }
659
660
4
  if (is_rescheduled && !batch_id_.empty()) {
661
    // Keep an existing batch ID in case of reschedule.
662
1
    return Status::OK();
663
1
  }
664
665
  // We cannot have sub-batches as only DMLs are allowed within a batch.
666
3
  SCHECK(batch_id_.empty(), InternalError, "Batch request mode is already active!");
667
668
3
  batch_id_ = AsString(batch_id_gen_());
669
670
3
  auto operation = Format("BatchId:[$0] - BATCH of [$1] statements", batch_id_, statements_count);
671
672
  // Cassandra uses the currently active keyspace here, for whatever reason.
673
  // We don't as it's cumbersome to track.
674
3
  auto entry_result = CreateLogEntry(Type::BATCH,
675
3
                                     "" /* keyspace */,
676
3
                                     "" /* scope */,
677
3
                                     std::move(operation),
678
3
                                     "" /* error_message */);
679
3
  if (PREDICT_FALSE(!entry_result.ok())) {
680
0
    batch_id_ = "";
681
0
    return entry_result.status();
682
0
  }
683
684
3
  if (!ShouldBeLogged(*entry_result)) {
685
0
    return Status::OK();
686
0
  }
687
688
3
  auto s = AddLogEntry(*entry_result);
689
3
  if (PREDICT_FALSE(!s.ok())) {
690
0
    batch_id_ = "";
691
0
  }
692
3
  return s;
693
3
}
694
695
6.53k
Status AuditLogger::EndBatchRequest() {
696
6.53k
  batch_id_ = "";
697
6.53k
  return Status::OK();
698
6.53k
}
699
700
6.28k
Status AuditLogger::LogAuthResponse(const CQLResponse& response) {
701
6.28k
  if (!FLAGS_ycql_enable_audit_log || !conn_) {
702
6.27k
    return Status::OK();
703
6.27k
  }
704
705
  // Initialize entry assuming the happy case
706
17
  auto entry = VERIFY_RESULT(CreateLogEntry(Type::LOGIN_SUCCESS,
707
17
                                            "" /* keyspace */,
708
17
                                            "" /* scope */,
709
17
                                            "LOGIN SUCCESSFUL",
710
17
                                            "" /* error_message */));
711
17
  switch (response.opcode()) {
712
16
    case CQLMessage::Opcode::AUTH_SUCCESS:
713
16
      break;
714
1
    case CQLMessage::Opcode::ERROR: {
715
1
      const auto& error_respone = static_cast<const ErrorResponse&>(response);
716
1
      entry.user          = boost::none;
717
1
      entry.type          = &Type::LOGIN_ERROR;
718
1
      entry.operation     = "LOGIN FAILURE";
719
1
      entry.error_message = error_respone.message();
720
1
      break;
721
0
    }
722
0
    default:
723
0
      return STATUS_FORMAT(InternalError,
724
17
                           "$0 is not a valid opcode for an auth response",
725
17
                           static_cast<uint8_t>(response.opcode()));
726
17
  }
727
728
17
  if (!ShouldBeLogged(entry)) {
729
8
    return Status::OK();
730
8
  }
731
732
9
  return AddLogEntry(entry);
733
9
}
734
735
Status AuditLogger::LogStatement(const TreeNode* tnode,
736
                                 const std::string& statement,
737
5.10M
                                 IsPrepare is_prepare) {
738
  // FIXME(alex): Rescheduled statements should not be logged (this requires additional testing).
739
5.10M
  if (!FLAGS_ycql_enable_audit_log || !tnode || !conn_) {
740
5.08M
    return Status::OK();
741
5.08M
  }
742
743
20.5k
  std::string keyspace;
744
20.5k
  std::string scope;
745
20.5k
  auto type_option = GetAuditLogTypeOption(*tnode, &keyspace, &scope);
746
20.5k
  if (!type_option)
747
0
    return Status::OK();
748
749
20.5k
  auto entry = VERIFY_RESULT(CreateLogEntry(*type_option,
750
20.5k
                                            std::move(keyspace),
751
20.5k
                                            std::move(scope),
752
20.5k
                                            ObfuscateOperation(*tnode, statement),
753
20.5k
                                            "" /* error_message */));
754
20.5k
  if (is_prepare) {
755
15
    entry.type = &Type::PREPARE_STATEMENT;
756
15
  }
757
758
20.5k
  if (!ShouldBeLogged(entry)) {
759
2.76k
    return Status::OK();
760
2.76k
  }
761
762
17.7k
  return AddLogEntry(entry);
763
17.7k
}
764
765
Status AuditLogger::LogStatementError(const TreeNode* tnode,
766
                                      const std::string& statement,
767
                                      const Status& error_status,
768
5.39k
                                      ErrorIsFormatted error_is_formatted) {
769
5.39k
  if (!FLAGS_ycql_enable_audit_log || !tnode || !conn_) {
770
4.74k
    return Status::OK();
771
4.74k
  }
772
773
651
  return LogStatementError(ObfuscateOperation(*tnode, statement), error_status, error_is_formatted);
774
651
}
775
776
Status AuditLogger::LogStatementError(const std::string& statement,
777
                                      const Status& error_status,
778
912
                                      ErrorIsFormatted error_is_formatted) {
779
912
  if (!FLAGS_ycql_enable_audit_log || !conn_) {
780
267
    return Status::OK();
781
267
  }
782
783
645
  SCHECK(!error_status.ok(), InvalidArgument, "Operation hasn't failed");
784
785
645
  const Type& type = GetErrorCode(error_status) == ErrorCode::UNAUTHORIZED
786
0
                        ? Type::UNAUTHORIZED_ATTEMPT
787
645
                        : Type::REQUEST_FAILURE;
788
645
  std::string error_message = error_status.ToUserMessage();
789
645
  if (error_is_formatted) {
790
    // We've already concatenated message with CQL error location.
791
    // Here we use dirty hack, removing three trailing lines to get rid of it.
792
605
    auto split = StringSplit(error_message, '\n');
793
605
    SCHECK(split.size() > 3, InvalidArgument, "Unexpected error message format");
794
605
    split.resize(split.size() - 3);
795
605
    error_message = boost::algorithm::join(split, "\n");
796
605
  }
797
798
  // For failed requests, we do not log keyspace and scope even if we have them.
799
645
  auto entry = VERIFY_RESULT(CreateLogEntry(type,
800
645
                                            "" /* keyspace */,
801
645
                                            "" /* scope */,
802
645
                                            statement,
803
645
                                            std::move(error_message)));
804
805
645
  if (!ShouldBeLogged(entry)) {
806
2
    return Status::OK();
807
2
  }
808
809
643
  return AddLogEntry(entry);
810
643
}
811
812
} // namespace audit
813
} // namespace ql
814
} // namespace yb