YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/subprocess.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/subprocess.h"
34
35
#include <dirent.h>
36
#include <fcntl.h>
37
#include <signal.h>
38
#include <spawn.h>
39
#include <sys/wait.h>
40
41
#include <memory>
42
#include <mutex>
43
#include <string>
44
#include <vector>
45
46
#include <boost/container/small_vector.hpp>
47
#include <glog/logging.h>
48
49
#include "yb/gutil/once.h"
50
#include "yb/gutil/port.h"
51
#include "yb/gutil/strings/join.h"
52
#include "yb/gutil/strings/numbers.h"
53
#include "yb/gutil/strings/split.h"
54
55
#include "yb/util/errno.h"
56
#include "yb/util/result.h"
57
#include "yb/util/scope_exit.h"
58
#include "yb/util/status.h"
59
#include "yb/util/status_format.h"
60
#include "yb/util/status_log.h"
61
62
using std::shared_ptr;
63
using std::string;
64
using std::vector;
65
using std::mutex;
66
using std::unique_lock;
67
using strings::Split;
68
using strings::Substitute;
69
70
extern char** environ;
71
72
namespace yb {
73
74
namespace {
75
76
static const char* kProcSelfFd =
77
#if defined(__APPLE__)
78
  "/dev/fd";
79
#else
80
  "/proc/self/fd";
81
#endif // defined(__APPLE__)
82
83
#if defined(__linux__)
84
#define READDIR readdir64
85
#define DIRENT dirent64
86
#else
87
564k
#define READDIR readdir
88
#define DIRENT dirent
89
#endif
90
91
4.04k
void DisableSigPipe() {
92
4.04k
  struct sigaction act;
93
94
4.04k
  act.sa_handler = SIG_IGN;
95
4.04k
  sigemptyset(&act.sa_mask);
96
4.04k
  act.sa_flags = 0;
97
4.04k
  PCHECK(sigaction(SIGPIPE, &act, nullptr) == 0);
98
4.04k
}
99
100
5.86k
void EnsureSigPipeDisabled() {
101
5.86k
  static GoogleOnceType once = GOOGLE_ONCE_INIT;
102
5.86k
  GoogleOnceInit(&once, &DisableSigPipe);
103
5.86k
}
104
105
// Since opendir() calls malloc(), this must be called before fork().
106
// This function is not async-signal-safe.
107
5.86k
Result<DIR*> OpenProcFdDir() {
108
5.86k
  DIR* dir = opendir(kProcSelfFd);
109
5.86k
  if (PREDICT_FALSE(dir == nullptr)) {
110
0
    return STATUS(IOError, Substitute("opendir(\"$0\") failed", kProcSelfFd), Errno(errno));
111
0
  }
112
5.86k
  return dir;
113
5.86k
}
114
115
// Close the directory stream opened by OpenProcFdDir().
116
// This function is not async-signal-safe.
117
5.85k
void CloseProcFdDir(DIR* dir) {
118
5.85k
  if (PREDICT_FALSE(closedir(dir) == -1)) {
119
0
    LOG(WARNING)
120
0
        << "Unable to close fd dir: "
121
0
        << STATUS(IOError, Substitute("closedir(\"$0\") failed", kProcSelfFd), Errno(errno));
122
0
  }
123
5.85k
}
124
125
} // anonymous namespace
126
127
Subprocess::Subprocess(string program, vector<string> argv)
128
    : program_(std::move(program)),
129
      argv_(std::move(argv)),
130
      state_(SubprocessState::kNotStarted),
131
      child_pid_(0),
132
      fd_state_(),
133
5.86k
      child_fds_() {
134
5.86k
  fd_state_[STDIN_FILENO] = SubprocessStreamMode::kPiped;
135
5.86k
  fd_state_[STDOUT_FILENO] = SubprocessStreamMode::kShared;
136
5.86k
  fd_state_[STDERR_FILENO] = SubprocessStreamMode::kShared;
137
5.86k
  child_fds_[STDIN_FILENO] = -1;
138
5.86k
  child_fds_[STDOUT_FILENO] = -1;
139
5.86k
  child_fds_[STDERR_FILENO] = -1;
140
5.86k
}
141
142
4.69k
Subprocess::~Subprocess() {
143
4.69k
  if (state() == SubprocessState::kRunning) {
144
6
    LOG(WARNING) << "Child process " << child_pid_
145
6
                 << "(" << JoinStrings(argv_, " ") << ") "
146
6
                 << " was orphaned. Sending SIGKILL...";
147
6
    WARN_NOT_OK(Kill(SIGKILL), "Failed to send SIGKILL");
148
6
    int junk = 0;
149
6
    WARN_NOT_OK(Wait(&junk), "Failed to Wait()");
150
6
  }
151
152
18.7k
  for (int i = 0; i < 3; ++i) {
153
14.0k
    if (fd_state_[i] == SubprocessStreamMode::kPiped && child_fds_[i] >= 0) {
154
4.69k
      close(child_fds_[i]);
155
4.69k
    }
156
14.0k
  }
157
4.69k
}
158
159
57.2k
void Subprocess::SetEnv(const std::string& key, const std::string& value) {
160
57.2k
  CHECK_EQ(state_, SubprocessState::kNotStarted);
161
57.2k
  env_[key] = value;
162
57.2k
}
163
164
0
void Subprocess::AddPIDToCGroup(const string& path, pid_t pid) {
165
0
#if defined(__APPLE__)
166
0
  LOG(WARNING) << "Writing to cgroup.procs is not supported";
167
#else
168
  const char* filename = path.c_str();
169
  FILE *fptr = fopen(const_cast<char *>(filename), "w");
170
  if (fptr == NULL) {
171
    LOG(WARNING) << "Couldn't open " << path;
172
  } else {
173
    int ret = fprintf(fptr, "%d\n", pid);
174
    if (ret < 0) {
175
      LOG(WARNING) << "Cannot write to " << path  << ". Return = " << ret;
176
    }
177
    fclose(fptr);
178
  }
179
#endif
180
0
}
181
182
3.97k
void Subprocess::SetFdShared(int stdfd, SubprocessStreamMode mode) {
183
3.97k
  CHECK_NE(mode, SubprocessStreamMode::kDisabled);
184
3.97k
  unique_lock<mutex> l(state_lock_);
185
3.97k
  CHECK_EQ(state_, SubprocessState::kNotStarted);
186
3.97k
  CHECK_NE(fd_state_[stdfd], SubprocessStreamMode::kDisabled);
187
3.97k
  fd_state_[stdfd] = mode;
188
3.97k
}
189
190
1.81k
void Subprocess::InheritNonstandardFd(int fd) {
191
1.81k
  ns_fds_inherited_.insert(fd);
192
1.81k
}
193
194
66
void Subprocess::DisableStderr() {
195
66
  unique_lock<mutex> l(state_lock_);
196
66
  CHECK_EQ(state_, SubprocessState::kNotStarted);
197
66
  fd_state_[STDERR_FILENO] = SubprocessStreamMode::kDisabled;
198
66
}
199
200
66
void Subprocess::DisableStdout() {
201
66
  unique_lock<mutex> l(state_lock_);
202
66
  CHECK_EQ(state_, SubprocessState::kNotStarted);
203
66
  fd_state_[STDOUT_FILENO] = SubprocessStreamMode::kDisabled;
204
66
}
205
206
#if defined(__APPLE__)
207
8.02k
static int pipe2(int pipefd[2], int flags) {
208
8.02k
  DCHECK_EQ(O_CLOEXEC, flags);
209
210
8.02k
  int new_fds[2] = { 0, 0 };
211
8.02k
  if (pipe(new_fds) == -1) {
212
4
    return -1;
213
4
  }
214
8.01k
  if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) {
215
0
    close(new_fds[0]);
216
0
    close(new_fds[1]);
217
0
    return -1;
218
0
  }
219
8.01k
  if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) {
220
0
    close(new_fds[0]);
221
0
    close(new_fds[1]);
222
0
    return -1;
223
0
  }
224
8.01k
  pipefd[0] = new_fds[0];
225
8.01k
  pipefd[1] = new_fds[1];
226
8.01k
  return 0;
227
8.01k
}
228
#endif
229
230
5.86k
Status Subprocess::Start() {
231
5.86k
  std::lock_guard<std::mutex> l(state_lock_);
232
5.86k
  SCHECK_EQ(state_, SubprocessState::kNotStarted, IllegalState,
233
5.86k
            "Incorrect state when starting the process");
234
5.86k
  EnsureSigPipeDisabled();
235
5.86k
#if defined(__APPLE__)
236
  // Closing file descriptors with posix_spawn has some issues on macOS so we still use fork/exec
237
  // there.
238
5.86k
  Status s = StartWithForkExec();
239
#else
240
  Status s = StartWithPosixSpawn();
241
#endif
242
5.86k
  if (s.ok()) {
243
5.85k
    state_ = SubprocessState::kRunning;
244
5.85k
  }
245
5.86k
  return s;
246
5.86k
}
247
248
namespace {
249
250
132
static void RedirectToDevNull(int fd) {
251
  // We must not close stderr or stdout, because then when a new file descriptor
252
  // gets opened, it might get that fd number.  (We always allocate the lowest
253
  // available file descriptor number.)  Instead, we reopen that fd as
254
  // /dev/null.
255
132
  int dev_null = open("/dev/null", O_WRONLY);
256
132
  if (dev_null < 0) {
257
0
    PLOG(WARNING) << "failed to open /dev/null";
258
132
  } else {
259
132
    PCHECK(dup2(dev_null, fd));
260
132
  }
261
132
}
262
263
// Close all open file descriptors other than stdin, stderr, stdout.
264
// Expects a directory stream created by OpenProdFdDir() as a parameter.
265
// This function is called after fork() and must not call malloc().
266
// The rule of thumb is to only call async-signal-safe functions in such cases
267
// if at all possible.
268
5.85k
void CloseNonStandardFDs(DIR* fd_dir, const std::unordered_set<int>& excluding) {
269
  // This is implemented by iterating over the open file descriptors
270
  // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone
271
  // since it may not represent the highest open fd if the fd soft limit
272
  // has changed since the process started. This should also be faster
273
  // since iterating over all possible fds is likely to cause 64k+ syscalls
274
  // in typical configurations.
275
  //
276
  // Note also that this doesn't use any of the Env utility functions, to
277
  // make it as lean and mean as possible -- this runs in the subprocess
278
  // after a fork, so there's some possibility that various global locks
279
  // inside malloc() might be held, so allocating memory is a no-no.
280
5.85k
  PCHECK(fd_dir != nullptr);
281
5.85k
  int dir_fd = dirfd(fd_dir);
282
283
5.85k
  struct DIRENT* ent;
284
  // readdir64() is not reentrant (it uses a static buffer) and it also
285
  // locks fd_dir->lock, so it must not be called in a multi-threaded
286
  // environment and is certainly not async-signal-safe.
287
  // However, it appears to be safe to call right after fork(), since only one
288
  // thread exists in the child process at that time. It also does not call
289
  // malloc() or free(). We could use readdir64_r() instead, but all that
290
  // buys us is reentrancy, and not async-signal-safety, due to the use of
291
  // dir->lock, so seems not worth the added complexity in lifecycle & plumbing.
292
564k
  while ((ent = READDIR(fd_dir)) != nullptr) {
293
559k
    int32_t fd;
294
559k
    if (!safe_strto32(ent->d_name, &fd)) continue;
295
559k
    if (!(fd == STDIN_FILENO  ||
296
553k
          fd == STDOUT_FILENO ||
297
547k
          fd == STDERR_FILENO ||
298
541k
          fd == dir_fd ||
299
535k
          excluding.count(fd))) {
300
533k
      close(fd);
301
533k
    }
302
559k
  }
303
5.85k
}
304
305
} // anonymous namespace
306
307
5.86k
Status Subprocess::StartWithForkExec() {
308
5.86k
  auto argv_ptrs = VERIFY_RESULT(GetArgvPtrs());
309
5.86k
  auto child_pipes = VERIFY_RESULT(CreateChildPipes());
310
311
5.86k
  DIR* fd_dir = VERIFY_RESULT(OpenProcFdDir());
312
5.86k
  SCHECK_NOTNULL(fd_dir);
313
5.86k
  auto scope_exit_fd_dir = ScopeExit([fd_dir]() {
314
5.85k
    CloseProcFdDir(fd_dir);
315
5.85k
  });
316
317
5.86k
  int ret = fork();
318
5.86k
  if (ret == -1) {
319
0
    return STATUS(RuntimeError, "Unable to fork", Errno(errno));
320
0
  }
321
5.86k
  if (ret == 0) { // We are the child
322
    // Send the child a SIGTERM when the parent dies. This is done as early
323
    // as possible in the child's life to prevent any orphaning whatsoever
324
    // (e.g. from KUDU-402).
325
326
    // stdin
327
5.85k
    if (fd_state_[STDIN_FILENO] == SubprocessStreamMode::kPiped) {
328
5.85k
      PCHECK(dup2(child_pipes.child_stdin[0], STDIN_FILENO) == STDIN_FILENO);
329
5.85k
    }
330
5.85k
    ConfigureOutputStreamAfterFork(STDOUT_FILENO, child_pipes.child_stdout[1]);
331
5.85k
    ConfigureOutputStreamAfterFork(STDERR_FILENO, child_pipes.child_stderr[1]);
332
5.85k
    CloseNonStandardFDs(fd_dir, ns_fds_inherited_);
333
334
    // setenv does allocate memory, which has led to the child process getting stuck on a lock
335
    // when attempting to allocate memory, because that lock is held by the parent process.
336
    // If we don't find a way to switch to posix_spawn on macOS, we will need to ensure setenv
337
    // does not attempt to allocate memory here.
338
57.2k
    for (const auto& env_kv : env_) {
339
57.2k
      setenv(env_kv.first.c_str(), env_kv.second.c_str(), /* replace */ true);
340
57.2k
    }
341
342
5.85k
    execvp(program_.c_str(), &argv_ptrs[0]);
343
5.85k
    PLOG(WARNING) << "Couldn't exec " << program_;
344
5.85k
    _exit(errno);
345
4
  } else {
346
    // We are the parent
347
4
    child_pid_ = ret;
348
4
    FinalizeParentSideOfPipes(child_pipes);
349
4
  }
350
351
4
  return Status::OK();
352
5.86k
}
353
354
11.7k
void Subprocess::ConfigureOutputStreamAfterFork(int out_stream_fd, int child_write_fd) {
355
  // out_stream_id is STDOUT_FILENO or STDERR_FILENO.
356
  // Not doing sanity checks because we are in the forked process and can't allocate memory.l
357
11.7k
  switch (fd_state_[out_stream_fd]) {
358
2.15k
    case SubprocessStreamMode::kPiped: {
359
2.15k
      PCHECK(dup2(child_write_fd, out_stream_fd) == out_stream_fd);
360
2.15k
      break;
361
0
    }
362
132
    case SubprocessStreamMode::kDisabled: {
363
132
      RedirectToDevNull(out_stream_fd);
364
132
      break;
365
0
    }
366
9.42k
    default: break;
367
11.7k
  }
368
11.7k
}
369
370
0
Status Subprocess::StartWithPosixSpawn() {
371
372
0
  auto argv_ptrs = VERIFY_RESULT(GetArgvPtrs());
373
374
0
  auto child_pipes = VERIFY_RESULT(CreateChildPipes());
375
376
0
  posix_spawn_file_actions_t file_actions;
377
378
  // posix_spawn_file_actions_... functions do not necessarily set errno, but they return an errno.
379
0
  RETURN_ON_ERRNO_RV_FN_CALL(posix_spawn_file_actions_init, &file_actions);
380
381
0
  auto scope_exit_destroy_file_actions = ScopeExit([&file_actions] {
382
0
    WARN_NOT_OK(
383
0
        STATUS_FROM_ERRNO_RV_FN_CALL(posix_spawn_file_actions_destroy, &file_actions),
384
0
        "posix_spawn_file_actions_destroy failed");
385
0
  });
386
387
0
  RETURN_NOT_OK(ConfigureFileActionsForPosixSpawn(&file_actions, child_pipes));
388
0
  std::vector<int> fds_to_be_closed = VERIFY_RESULT(
389
0
      CloseFileDescriptorsForPosixSpawn(&file_actions));
390
391
0
  auto combined_env = GetCombinedEnv();
392
393
0
  posix_spawnattr_t attrs;
394
0
  posix_spawnattr_init(&attrs);
395
0
  auto scope_exit_spawnattr = ScopeExit([&attrs] {
396
0
    WARN_NOT_OK(
397
0
        STATUS_FROM_ERRNO_RV_FN_CALL(posix_spawnattr_destroy, &attrs),
398
0
        "posix_spawnattr_destroy failed");
399
0
  });
400
401
0
  child_pid_ = 0;
402
403
0
  auto posix_spawn_status = STATUS_FROM_ERRNO_RV_FN_CALL(
404
0
      posix_spawnp,
405
0
      &child_pid_,
406
0
      program_.c_str(),
407
0
      &file_actions,
408
0
      &attrs,
409
0
      argv_ptrs.data(),
410
0
      combined_env.second.data());
411
0
  RETURN_NOT_OK_PREPEND(
412
0
      posix_spawn_status,
413
0
      Format("Tried to close file descriptors as part of posix_spawn: $0", fds_to_be_closed));
414
415
0
  FinalizeParentSideOfPipes(child_pipes);
416
417
0
  return Status::OK();
418
0
}
419
420
5.59k
Status Subprocess::Wait(int* ret) {
421
5.59k
  return DoWait(ret, 0);
422
5.59k
}
423
424
904
Result<int> Subprocess::Wait() {
425
904
  int ret = 0;
426
904
  RETURN_NOT_OK(Wait(&ret));
427
904
  return ret;
428
904
}
429
430
159k
Status Subprocess::DoWait(int* ret, int options) {
431
159k
  if (!ret) {
432
0
    return STATUS(InvalidArgument, "ret is NULL");
433
0
  }
434
159k
  *ret = 0;
435
436
159k
  pid_t child_pid = 0;
437
159k
  {
438
159k
    unique_lock<mutex> l(state_lock_);
439
159k
    if (state_ == SubprocessState::kExited) {
440
126
      *ret = cached_rc_;
441
126
      return Status::OK();
442
126
    }
443
158k
    if (state_ != SubprocessState::kRunning) {
444
0
      return STATUS(IllegalState, "DoWait called on a process that is not running");
445
0
    }
446
158k
    child_pid = child_pid_;
447
158k
  }
448
449
158k
  CHECK_NE(child_pid, 0);
450
451
158k
  int waitpid_ret_val = waitpid(child_pid, ret, options);
452
158k
  if (waitpid_ret_val == -1) {
453
0
    return STATUS(RuntimeError, "Unable to wait on child", Errno(errno));
454
0
  }
455
158k
  if ((options & WNOHANG) && waitpid_ret_val == 0) {
456
153k
    return STATUS(TimedOut, "");
457
153k
  }
458
459
5.59k
  unique_lock<mutex> l(state_lock_);
460
5.59k
  CHECK_EQ(waitpid_ret_val, child_pid_);
461
5.59k
  child_pid_ = 0;
462
5.59k
  cached_rc_ = *ret;
463
5.59k
  state_ = SubprocessState::kExited;
464
5.59k
  return Status::OK();
465
5.59k
}
466
467
929
Status Subprocess::Kill(int signal) {
468
929
  return KillInternal(signal, /* must_be_running = */ true);
469
929
}
470
471
0
Status Subprocess::KillNoCheckIfRunning(int signal) {
472
0
  return KillInternal(signal, /* must_be_running = */ false);
473
0
}
474
475
929
Status Subprocess::KillInternal(int signal, bool must_be_running) {
476
929
  unique_lock<mutex> l(state_lock_);
477
478
929
  if (must_be_running) {
479
929
    CHECK_EQ(state_, SubprocessState::kRunning);
480
0
  } else if (state_ == SubprocessState::kNotStarted) {
481
0
    return STATUS(IllegalState, "Child process has not been started, cannot send signal");
482
0
  } else if (state_ == SubprocessState::kExited) {
483
0
    return STATUS(IllegalState, "Child process has exited, cannot send signal");
484
0
  }
485
929
  CHECK_NE(child_pid_, 0);
486
487
929
  if (kill(child_pid_, signal) != 0) {
488
0
    return STATUS(RuntimeError, "Unable to kill", Errno(errno));
489
0
  }
490
929
  return Status::OK();
491
929
}
492
493
0
std::pair<std::vector<std::string>, std::vector<char*>> Subprocess::GetCombinedEnv() {
494
0
  std::map<std::string, std::string> complete_env;
495
0
  for (char **env_var = environ; *env_var != nullptr; env_var++) {
496
0
    const char* equal_char = strchr(*env_var, '=');
497
0
    if (equal_char) {
498
0
      size_t name_length = equal_char - *env_var;
499
0
      complete_env[std::string(*env_var, name_length)] = equal_char + 1;
500
0
    }
501
0
  }
502
0
  for (const auto& env_kv : env_) {
503
0
    complete_env[env_kv.first] = env_kv.second;
504
0
  }
505
506
0
  std::vector<std::string> k_equals_v_strings;
507
0
  k_equals_v_strings.reserve(complete_env.size());
508
509
0
  std::vector<char*> envp;
510
0
  envp.reserve(complete_env.size() + 1);
511
512
0
  for (const auto& kv : complete_env) {
513
0
    k_equals_v_strings.push_back(yb::Format("$0=$1", kv.first, kv.second));
514
0
    envp.push_back(const_cast<char*>(k_equals_v_strings.back().c_str()));
515
0
  }
516
0
  envp.push_back(nullptr);
517
518
0
  return {std::move(k_equals_v_strings), std::move(envp)};
519
0
}
520
521
5.86k
Result<std::vector<char*>> Subprocess::GetArgvPtrs() {
522
5.86k
  if (argv_.size() < 1) {
523
0
    return STATUS(InvalidArgument, "argv must have at least one elem");
524
0
  }
525
526
5.86k
  vector<char*> argv_ptrs;
527
5.86k
  argv_ptrs.reserve(argv_.size() + 1);
528
65.0k
  for (const string& arg : argv_) {
529
65.0k
    argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
530
65.0k
  }
531
5.86k
  argv_ptrs.push_back(nullptr);
532
5.86k
  return argv_ptrs;
533
5.86k
}
534
535
2.89k
bool Subprocess::IsRunning() const {
536
2.89k
  unique_lock<mutex> l(state_lock_);
537
2.89k
  if (state_ == SubprocessState::kRunning) {
538
0
    CHECK_NE(child_pid_, 0);
539
0
    return kill(child_pid_, 0) == 0;
540
0
  }
541
2.89k
  return false;
542
2.89k
}
543
544
0
Status Subprocess::Call(const string& arg_str) {
545
0
  VLOG(2) << "Invoking command: " << arg_str;
546
0
  vector<string> argv = Split(arg_str, " ");
547
0
  return Call(argv);
548
0
}
549
550
1
Status Subprocess::Call(const vector<string>& argv) {
551
1
  Subprocess proc(argv[0], argv);
552
1
  RETURN_NOT_OK(proc.Start());
553
1
  int retcode;
554
1
  RETURN_NOT_OK(proc.Wait(&retcode));
555
556
1
  if (retcode == 0) {
557
1
    return Status::OK();
558
1
  }
559
0
  return STATUS(RuntimeError, Substitute(
560
0
      "Subprocess '$0' terminated with non-zero exit status $1",
561
0
      argv[0],
562
0
      retcode));
563
0
}
564
565
7
Status Subprocess::Call(const vector<string>& argv, string* output, StdFdTypes read_fds) {
566
7
  Subprocess p(argv[0], argv);
567
7
  return p.Call(output, read_fds);
568
7
}
569
570
7
Status Subprocess::Call(string* output, StdFdTypes read_fds) {
571
7
  if (read_fds.Test(StdFdType::kIn)) {
572
0
    return STATUS(InvalidArgument, "Cannot read from child stdin");
573
0
  }
574
7
  for (const auto fd_type : read_fds) {
575
7
    SetFdShared(to_underlying(fd_type), SubprocessStreamMode::kPiped);
576
7
  }
577
578
7
  RETURN_NOT_OK_PREPEND(Start(), "Unable to fork " + argv_[0]);
579
4
  const int err = close(ReleaseChildStdinFd());
580
4
  if (PREDICT_FALSE(err != 0)) {
581
0
    return STATUS(IOError, "Unable to close child process stdin", Errno(errno));
582
0
  }
583
584
4
  output->clear();
585
4
  char buf[1024];
586
4
  boost::container::small_vector<int, 2> fds;
587
4
  for (const auto fd_type : read_fds) {
588
4
    fds.push_back(CheckAndOffer(to_underlying(fd_type)));
589
4
  }
590
591
124
  while (!fds.empty()) {
592
120
    auto it = fds.end();
593
240
    while (it != fds.begin()) {
594
120
      auto fd = *--it;
595
120
      ssize_t n = read(fd, buf, arraysize(buf));
596
120
      if (n < 0) {
597
0
        if (errno == EINTR) continue;
598
0
        return STATUS(IOError, "IO error reading from " + argv_[0], Errno(errno));
599
0
      }
600
120
      if (n == 0) {
601
4
        fds.erase(it);
602
4
        continue;
603
4
      }
604
116
      output->append(buf, n);
605
116
    }
606
120
  }
607
608
4
  int retcode = 0;
609
4
  RETURN_NOT_OK_PREPEND(Wait(&retcode), "Unable to wait() for " + argv_[0]);
610
611
4
  if (PREDICT_FALSE(retcode != 0)) {
612
0
    return STATUS(RuntimeError, Substitute(
613
0
        "Subprocess '$0' terminated with non-zero exit status $1",
614
0
        argv_[0],
615
0
        retcode));
616
0
  }
617
4
  return Status::OK();
618
4
}
619
620
9
int Subprocess::CheckAndOffer(int stdfd) const {
621
9
  unique_lock<mutex> l(state_lock_);
622
9
  CHECK_EQ(state_, SubprocessState::kRunning);
623
9
  CHECK_EQ(fd_state_[stdfd], SubprocessStreamMode::kPiped);
624
9
  return child_fds_[stdfd];
625
9
}
626
627
2.15k
int Subprocess::ReleaseChildFd(int stdfd) {
628
2.15k
  unique_lock<mutex> l(state_lock_);
629
2.15k
  CHECK_EQ(state_, SubprocessState::kRunning);
630
2.15k
  CHECK_GE(child_fds_[stdfd], 0);
631
2.15k
  CHECK_EQ(fd_state_[stdfd], SubprocessStreamMode::kPiped);
632
2.15k
  int ret = child_fds_[stdfd];
633
2.15k
  child_fds_[stdfd] = -1;
634
2.15k
  return ret;
635
2.15k
}
636
637
5.70k
pid_t Subprocess::pid() const {
638
5.70k
  unique_lock<mutex> l(state_lock_);
639
5.70k
  CHECK_EQ(state_, SubprocessState::kRunning);
640
5.70k
  return child_pid_;
641
5.70k
}
642
643
4.69k
SubprocessState Subprocess::state() const {
644
4.69k
  unique_lock<mutex> l(state_lock_);
645
4.69k
  return state_;
646
4.69k
}
647
648
5.86k
Result<Subprocess::ChildPipes> Subprocess::CreateChildPipes() {
649
5.86k
  ChildPipes pipes;
650
  // Pipe from caller process to child's stdin
651
  // [0] = stdin for child, [1] = how parent writes to it
652
5.86k
  if (fd_state_[STDIN_FILENO] == SubprocessStreamMode::kPiped) {
653
5.86k
    RETURN_NOT_OK(STATUS_FROM_ERRNO_IF_NONZERO_RV(
654
5.86k
        "pipe2 failed for stdin", pipe2(pipes.child_stdin, O_CLOEXEC)));
655
5.86k
  }
656
657
  // Pipe from child's stdout back to caller process
658
  // [0] = how parent reads from child's stdout, [1] = how child writes to it
659
5.86k
  if (fd_state_[STDOUT_FILENO] == SubprocessStreamMode::kPiped) {
660
1.08k
    RETURN_NOT_OK(STATUS_FROM_ERRNO_IF_NONZERO_RV(
661
1.08k
        "pipe2 failed for stdout", pipe2(pipes.child_stdout, O_CLOEXEC)));
662
1.08k
  }
663
  // Pipe from child's stderr back to caller process
664
  // [0] = how parent reads from child's stderr, [1] = how child writes to it
665
5.86k
  if (fd_state_[STDERR_FILENO] == SubprocessStreamMode::kPiped) {
666
1.07k
    RETURN_NOT_OK(STATUS_FROM_ERRNO_IF_NONZERO_RV(
667
1.07k
        "pipe2 failed for stderr", pipe2(pipes.child_stderr, O_CLOEXEC)));
668
1.07k
  }
669
670
5.86k
  return pipes;
671
5.86k
}
672
673
Status Subprocess::ConfigureFileActionsForPosixSpawn(
674
    posix_spawn_file_actions_t* file_actions,
675
0
    const Subprocess::ChildPipes& child_pipes) {
676
  // stdin
677
0
  if (fd_state_[STDIN_FILENO] == SubprocessStreamMode::kPiped) {
678
0
    RETURN_ON_ERRNO_RV_FN_CALL(
679
0
        posix_spawn_file_actions_adddup2, file_actions, child_pipes.child_stdin[0], STDIN_FILENO);
680
0
  }
681
682
0
  RETURN_NOT_OK(ConfigureOutputStreamActionForPosixSpawn(
683
0
      file_actions, STDOUT_FILENO, child_pipes.child_stdout[1]));
684
0
  RETURN_NOT_OK(ConfigureOutputStreamActionForPosixSpawn(
685
0
      file_actions, STDERR_FILENO, child_pipes.child_stderr[1]));
686
687
0
  return Status::OK();
688
0
}
689
690
Status Subprocess::ConfigureOutputStreamActionForPosixSpawn(
691
0
    posix_spawn_file_actions_t* file_actions, int out_stream_fd, int pipe_input_fd) {
692
0
  SCHECK_FORMAT(
693
0
      out_stream_fd == STDOUT_FILENO || out_stream_fd == STDERR_FILENO,
694
0
      IllegalState,
695
0
      "out_stream_fd must be one of STDOUT_FILENO ($0) or STDERR_FILENO ($1), but is $2",
696
0
      STDOUT_FILENO, STDERR_FILENO, out_stream_fd);
697
698
0
  switch (fd_state_[out_stream_fd]) {
699
0
    case SubprocessStreamMode::kPiped: {
700
0
      RETURN_ON_ERRNO_RV_FN_CALL(
701
0
          posix_spawn_file_actions_adddup2, file_actions, pipe_input_fd, out_stream_fd);
702
0
      break;
703
0
    }
704
0
    case SubprocessStreamMode::kDisabled: {
705
0
      RETURN_ON_ERRNO_RV_FN_CALL(
706
0
          posix_spawn_file_actions_addopen, file_actions, out_stream_fd, "/dev/null", O_WRONLY,
707
0
          0 /* mode */);
708
0
      break;
709
0
    }
710
0
    default:
711
0
      break;
712
0
  }
713
0
  return Status::OK();
714
0
}
715
716
Result<std::vector<int>> Subprocess::CloseFileDescriptorsForPosixSpawn(
717
0
    posix_spawn_file_actions_t* file_actions) {
718
0
  DIR* fd_dir = VERIFY_RESULT(OpenProcFdDir());
719
0
  SCHECK_NOTNULL(fd_dir);
720
0
  auto scope_exit_fd_dir = ScopeExit([fd_dir]() {
721
0
    CloseProcFdDir(fd_dir);
722
0
  });
723
724
0
  int dir_fd = dirfd(fd_dir);
725
0
  struct DIRENT* ent;
726
0
  std::vector<int> fds_to_close;
727
0
  while ((ent = READDIR(fd_dir)) != nullptr) {
728
0
    int32_t fd;
729
0
    if (!safe_strto32(ent->d_name, &fd)) continue;
730
0
    if (fd == STDIN_FILENO  ||
731
0
        fd == STDOUT_FILENO ||
732
0
        fd == STDERR_FILENO ||
733
0
        fd == dir_fd ||
734
0
        ns_fds_inherited_.count(fd)) {
735
0
      continue;
736
0
    }
737
738
0
    int fcntl_result = fcntl(fd, F_GETFD);
739
0
    if (fcntl_result != 0) {
740
0
      int fcntl_errno = errno;
741
0
      if (fcntl_result == FD_CLOEXEC) {
742
0
        VLOG(1) << "fcntl(fd=" << fd << ", F_GETFD) is FD_CLOEXEC (" << FD_CLOEXEC << ") "
743
0
                << ", skipping posix_spawn_file_actions_addclose";
744
0
        continue;
745
0
      }
746
0
      if (fcntl_result != 0) {
747
0
        if (fcntl_result == -1 && fcntl_errno == EBADF) {
748
          // This is an expected combination, only print a VLOG.
749
0
          VLOG(1) << "fcntl(fd=" << fd << ", F_GETFD) is -1 and errno is EBADF ("
750
0
                  << EBADF << "): skipping posix_spawn_file_actions_addclose (already closed?)";
751
0
          continue;
752
0
        }
753
0
        LOG(WARNING) << "fcntl(fd=" << fd << ", F_GETFD) is " << fcntl_result << ", errno="
754
0
                     << fcntl_errno << " (" << strerror(fcntl_errno) << "): an unexpected "
755
0
                     << "combination. Skipping posix_spawn_file_actions_addclose.";
756
0
        continue;
757
0
      }
758
0
    }
759
0
    VLOG(1) << "Calling posix_spawn_file_actions_addclose for fd=" << fd;
760
0
    RETURN_ON_ERRNO_RV_FN_CALL(posix_spawn_file_actions_addclose, file_actions, fd);
761
0
    fds_to_close.push_back(fd);
762
0
  }
763
0
  return fds_to_close;
764
0
}
765
766
5.85k
void Subprocess::FinalizeParentSideOfPipes(const Subprocess::ChildPipes& child_pipes) {
767
  // Close child's side of the pipes.
768
5.85k
  if (fd_state_[STDIN_FILENO] == SubprocessStreamMode::kPiped) {
769
5.85k
    close(child_pipes.child_stdin[0]);
770
5.85k
  }
771
5.85k
  if (fd_state_[STDOUT_FILENO] == SubprocessStreamMode::kPiped) {
772
1.08k
    close(child_pipes.child_stdout[1]);
773
1.08k
  }
774
5.85k
  if (fd_state_[STDERR_FILENO] == SubprocessStreamMode::kPiped) {
775
1.07k
    close(child_pipes.child_stderr[1]);
776
1.07k
  }
777
778
  // Keep parent's side of the pipes.
779
5.85k
  child_fds_[STDIN_FILENO] = child_pipes.child_stdin[1];
780
5.85k
  child_fds_[STDOUT_FILENO] = child_pipes.child_stdout[0];
781
5.85k
  child_fds_[STDERR_FILENO] = child_pipes.child_stderr[0];
782
5.85k
}
783
784
153k
Status Subprocess::WaitNoBlock(int* ret) {
785
153k
  return DoWait(ret, WNOHANG);
786
153k
}
787
788
} // namespace yb