Core/Common: Output stdout/stderr from child process without waiting for it to finish

This commit is contained in:
Shauren
2024-09-14 14:45:40 +02:00
parent dbe8d1f11e
commit f270686201
3 changed files with 119 additions and 154 deletions

View File

@@ -29,16 +29,43 @@
#include <boost/process/search_path.hpp>
#include <fmt/ranges.h>
using namespace boost::process;
namespace bp = boost::process;
namespace Trinity
{
template<typename T>
static int CreateChildProcess(T waiter, std::string const& executable,
std::vector<std::string> const& argsVector,
std::string const& logger, std::string const& input,
bool secure)
class AsyncProcessResultImplementation
: public AsyncProcessResult
{
std::string const executable;
std::vector<std::string> const args;
std::string const logger;
std::string const input_file;
bool const is_secure;
std::atomic<bool> was_terminated;
Optional<std::future<int>> futureResult;
Optional<bp::child> my_child;
public:
explicit AsyncProcessResultImplementation(std::string executable_, std::vector<std::string> args_,
std::string logger_, std::string input_file_,
bool secure)
: executable(std::move(executable_)), args(std::move(args_)),
logger(std::move(logger_)), input_file(std::move(input_file_)),
is_secure(secure), was_terminated(false) { }
AsyncProcessResultImplementation(AsyncProcessResultImplementation const&) = delete;
AsyncProcessResultImplementation& operator= (AsyncProcessResultImplementation const&) = delete;
AsyncProcessResultImplementation(AsyncProcessResultImplementation&&) = delete;
AsyncProcessResultImplementation& operator= (AsyncProcessResultImplementation&&) = delete;
~AsyncProcessResultImplementation() = default;
int StartProcess()
{
ASSERT(!my_child, "Process started already!");
#if TRINITY_COMPILER == TRINITY_COMPILER_MICROSOFT
#pragma warning(push)
#pragma warning(disable:4297)
@@ -51,159 +78,95 @@ static int CreateChildProcess(T waiter, std::string const& executable,
boost/process/pipe.hpp(304,42): message : see reference to class template instantiation 'boost::process::basic_pipebuf<char,std::char_traits<char>>' being compiled
*/
#endif
ipstream outStream;
ipstream errStream;
bp::ipstream outStream;
bp::ipstream errStream;
#if TRINITY_COMPILER == TRINITY_COMPILER_MICROSOFT
#pragma warning(pop)
#endif
if (!secure)
{
TC_LOG_TRACE(logger, "Starting process \"{}\" with arguments: \"{}\".",
executable, fmt::join(argsVector, " "));
}
if (!is_secure)
{
TC_LOG_TRACE(logger, "Starting process \"{}\" with arguments: \"{}\".",
executable, fmt::join(args, " "));
}
// prepare file with only read permission (boost process opens with read_write)
auto inputFile = Trinity::make_unique_ptr_with_deleter(!input.empty() ? fopen(input.c_str(), "rb") : nullptr, &::fclose);
// prepare file with only read permission (boost process opens with read_write)
auto inputFile = Trinity::make_unique_ptr_with_deleter(!input_file.empty() ? fopen(input_file.c_str(), "rb") : nullptr, &::fclose);
// Start the child process
child c = [&]()
{
// Start the child process
if (inputFile)
{
// With binding stdin
return child{
exe = boost::filesystem::absolute(executable).string(),
args = argsVector,
env = environment(boost::this_process::environment()),
std_in = inputFile.get(),
std_out = outStream,
std_err = errStream
};
my_child.emplace(
bp::exe = boost::filesystem::absolute(executable).string(),
bp::args = args,
bp::env = bp::environment(boost::this_process::environment()),
bp::std_in = inputFile.get(),
bp::std_out = outStream,
bp::std_err = errStream
);
}
else
{
// Without binding stdin
return child{
exe = boost::filesystem::absolute(executable).string(),
args = argsVector,
env = environment(boost::this_process::environment()),
std_in = boost::process::close,
std_out = outStream,
std_err = errStream
};
my_child.emplace(
bp::exe = boost::filesystem::absolute(executable).string(),
bp::args = args,
bp::env = bp::environment(boost::this_process::environment()),
bp::std_in = boost::process::close,
bp::std_out = outStream,
bp::std_err = errStream
);
}
}();
std::string line;
while (std::getline(outStream, line, '\n'))
{
std::erase(line, '\r');
if (!line.empty())
TC_LOG_INFO(logger, "{}", line);
}
std::future<void> stdOutReader = std::async(std::launch::async, [&]
{
std::string line;
while (std::getline(outStream, line, '\n'))
{
std::erase(line, '\r');
if (!line.empty())
TC_LOG_INFO(logger, "{}", line);
}
});
while (std::getline(errStream, line, '\n'))
{
std::erase(line, '\r');
if (!line.empty())
TC_LOG_ERROR(logger, "{}", line);
}
std::future<void> stdErrReader = std::async(std::launch::async, [&]
{
std::string line;
while (std::getline(errStream, line, '\n'))
{
std::erase(line, '\r');
if (!line.empty())
TC_LOG_ERROR(logger, "{}", line);
}
});
// Call the waiter in the current scope to prevent
// the streams from closing too early on leaving the scope.
int const result = waiter(c);
std::error_code ec;
my_child->wait(ec);
int32 const result = !ec && !was_terminated ? my_child->exit_code() : EXIT_FAILURE;
my_child.reset();
if (!secure)
{
TC_LOG_TRACE(logger, ">> Process \"{}\" finished with return value {}.",
stdOutReader.wait();
stdErrReader.wait();
if (!is_secure)
{
TC_LOG_TRACE(logger, ">> Process \"{}\" finished with return value {}.",
executable, result);
}
return result;
}
return result;
}
int StartProcess(std::string const& executable, std::vector<std::string> const& args,
std::string const& logger, std::string input_file, bool secure)
{
return CreateChildProcess([](child& c) -> int
void SetFuture(std::future<int32> result_)
{
try
{
c.wait();
return c.exit_code();
}
catch (...)
{
return EXIT_FAILURE;
}
}, executable, args, logger, input_file, secure);
}
class AsyncProcessResultImplementation
: public AsyncProcessResult
{
std::string const executable;
std::vector<std::string> const args;
std::string const logger;
std::string const input_file;
bool const is_secure;
std::atomic<bool> was_terminated;
// Workaround for missing move support in boost < 1.57
Optional<std::shared_ptr<std::future<int>>> result;
Optional<std::reference_wrapper<child>> my_child;
public:
explicit AsyncProcessResultImplementation(std::string executable_, std::vector<std::string> args_,
std::string logger_, std::string input_file_,
bool secure)
: executable(std::move(executable_)), args(std::move(args_)),
logger(std::move(logger_)), input_file(input_file_),
is_secure(secure), was_terminated(false) { }
AsyncProcessResultImplementation(AsyncProcessResultImplementation const&) = delete;
AsyncProcessResultImplementation& operator= (AsyncProcessResultImplementation const&) = delete;
AsyncProcessResultImplementation(AsyncProcessResultImplementation&&) = delete;
AsyncProcessResultImplementation& operator= (AsyncProcessResultImplementation&&) = delete;
int StartProcess()
{
ASSERT(!my_child, "Process started already!");
return CreateChildProcess([&](child& c) -> int
{
int result;
my_child = std::reference_wrapper<child>(c);
try
{
c.wait();
result = c.exit_code();
}
catch (...)
{
result = EXIT_FAILURE;
}
my_child.reset();
return was_terminated ? EXIT_FAILURE : result;
}, executable, args, logger, input_file, is_secure);
}
void SetFuture(std::future<int> result_)
{
result = std::make_shared<std::future<int>>(std::move(result_));
futureResult.emplace(std::move(result_));
}
/// Returns the future which contains the result of the process
/// as soon it is finished.
std::future<int>& GetFutureResult() override
std::future<int32>& GetFutureResult() override
{
ASSERT(*result, "The process wasn't started!");
return **result;
ASSERT(futureResult.has_value(), "The process wasn't started!");
return *futureResult;
}
/// Tries to terminate the process
@@ -212,23 +175,25 @@ public:
if (my_child)
{
was_terminated = true;
try
{
my_child->get().terminate();
}
catch(...)
{
// Do nothing
}
std::error_code ec;
my_child->terminate(ec);
}
}
};
std::shared_ptr<AsyncProcessResult>
StartAsyncProcess(std::string executable, std::vector<std::string> args,
std::string logger, std::string input_file, bool secure)
int StartProcess(std::string executable, std::vector<std::string> args,
std::string logger, std::string input_file, bool secure)
{
auto handle = std::make_shared<AsyncProcessResultImplementation>(
AsyncProcessResultImplementation handle(
std::move(executable), std::move(args), std::move(logger), std::move(input_file), secure);
return handle.StartProcess();
}
std::shared_ptr<AsyncProcessResult> StartAsyncProcess(std::string executable, std::vector<std::string> args,
std::string logger, std::string input_file, bool secure)
{
std::shared_ptr<AsyncProcessResultImplementation> handle = std::make_shared<AsyncProcessResultImplementation>(
std::move(executable), std::move(args), std::move(logger), std::move(input_file), secure);
handle->SetFuture(std::async(std::launch::async, [handle] { return handle->StartProcess(); }));
@@ -239,7 +204,7 @@ std::string SearchExecutableInPath(std::string const& filename)
{
try
{
return search_path(filename).string();
return bp::search_path(filename).string();
}
catch (...)
{