Skip to content

Commit

Permalink
Fix system on Windows 32 bits
Browse files Browse the repository at this point in the history
  • Loading branch information
Nelson-numerical-software committed Oct 15, 2024
1 parent b3b3fcd commit 5a6ee2b
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 97 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ccpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,6 @@ jobs:
- name: Tests
run: |
set OMP_NUM_THREADS=1
set NELSON_TERM_IS_UNICODE_SUPPORTED=TRUE
set PATH=C:\Program Files\Microsoft MPI\Bin;%PATH%
set TESTS_RESULT_DIR=%GITHUB_WORKSPACE%/artifacts
Expand Down
10 changes: 6 additions & 4 deletions modules/ipc/src/cpp/NelsonInterprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
//=============================================================================
namespace Nelson {
//=============================================================================
constexpr std::size_t KILOBYTE = 1024;
constexpr std::size_t MEGABYTE = 1024 * KILOBYTE;
#if (defined(_LP64) || defined(_WIN64))
constexpr auto OFF_MSG_SIZE = sizeof(double) * 16 * 1024;
constexpr auto MAX_MSG_SIZE = sizeof(double) * (4096 * 4096) + OFF_MSG_SIZE;
constexpr auto MAX_NB_MSG = 4;
constexpr std::size_t OFF_MSG_SIZE = sizeof(double) * 16 * KILOBYTE;
constexpr std::size_t MAX_MSG_SIZE = sizeof(double) * (4 * MEGABYTE) + OFF_MSG_SIZE;
constexpr std::size_t MAX_NB_MSG = 4;
#else
constexpr auto MAX_MSG_SIZE = sizeof(double) * (1024 * 1024);
constexpr auto MAX_MSG_SIZE = sizeof(double) * MEGABYTE;
constexpr auto MAX_NB_MSG = 2;
#endif
constexpr auto TIMEOUT_COUNT = 20;
Expand Down
46 changes: 39 additions & 7 deletions modules/os_functions/src/cpp/SystemCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <fcntl.h>
#include <csignal>
#endif
#include <BS_thread_pool.hpp>
#include <ctime>
#include <thread>
#include <chrono>
Expand All @@ -33,6 +32,7 @@
#include "i18n.hpp"
#include "PredefinedErrorMessages.hpp"
#include "SystemCommandTask.hpp"
#include "nlsBuildConfig.h"
//=============================================================================
namespace Nelson {
//=============================================================================
Expand Down Expand Up @@ -68,20 +68,37 @@ ParallelSystemCommand(const wstringVector& commands, const std::vector<uint64>&
std::vector<std::tuple<int, std::wstring, uint64>> results;
size_t nbCommands = commands.size();
results.resize(nbCommands);
std::vector<SystemCommandTask*> taskList;

#if defined(__APPLE__) or (defined(_WIN32) && not defined(_WIN64))
for (ompIndexType k = 0; k < (ompIndexType)nbCommands; k++) {
SystemCommandTask* task = new SystemCommandTask();
taskList.push_back(task);
}
#if not defined(__APPLE__)
#if WITH_OPENMP
#pragma omp parallel for
#endif
#endif
for (ompIndexType k = 0; k < (ompIndexType)nbCommands; k++) {
taskList[k]->evaluateCommand(commands[k], timeouts[k]);
}
#else
size_t nbThreadsMax = (size_t)NelsonConfiguration::getInstance()->getMaxNumCompThreads();
size_t nbThreads = std::min(nbCommands, nbThreadsMax);

std::vector<SystemCommandTask*> taskList;
BS::thread_pool pool((BS::concurrency_t)nbThreads);
std::vector<std::thread> threadList;
for (size_t k = 0; k < nbCommands; k++) {
try {
SystemCommandTask* task = new SystemCommandTask();
taskList.push_back(task);
pool.push_task(&SystemCommandTask::evaluateCommand, task, commands[k], timeouts[k]);
threadList.emplace_back([task, commands, timeouts, k]() {
task->evaluateCommand(commands[k], timeouts[k]);
});
} catch (std::bad_alloc&) {
Error(ERROR_MEMORY_ALLOCATION);
}
}
bool allTasksFinished = false;
do {
if (NelsonConfiguration::getInstance()->getInterruptPending(evaluatorID)) {
for (size_t k = 0; k < nbCommands; k++) {
Expand All @@ -93,8 +110,23 @@ ParallelSystemCommand(const wstringVector& commands, const std::vector<uint64>&
ProcessEventsDynamicFunction();
}
std::this_thread::sleep_for(std::chrono::milliseconds(uint64(1)));
} while (pool.get_tasks_total());

allTasksFinished = std::all_of(threadList.begin(), threadList.end(),
[](const auto& thread) { return thread.joinable(); });

allTasksFinished = allTasksFinished
&& std::all_of(taskList.begin(), taskList.end(),
[](const auto& task) { return !task->isRunning(); });

if (allTasksFinished) {
for (auto& thread : threadList) {
thread.join();
}
break;
}
} while (!allTasksFinished);

#endif
for (size_t k = 0; k < nbCommands; k++) {
if (taskList[k]) {
results[k] = taskList[k]->getResult();
Expand All @@ -107,7 +139,7 @@ ParallelSystemCommand(const wstringVector& commands, const std::vector<uint64>&
}
}
taskList.clear();
pool.reset((BS::concurrency_t)nbThreads);

return results;
}
//=============================================================================
Expand Down
202 changes: 125 additions & 77 deletions modules/os_functions/src/cpp/SystemCommandTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,49 @@
#define _CRT_SECURE_NO_WARNINGS
#define WIN32_LEAN_AND_MEAN
#endif
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/shell.hpp>
#include <thread>
#include <fstream>
#include "SystemCommandTask.hpp"
#include "StringHelpers.hpp"
#include "characters_encoding.hpp"
//=============================================================================
namespace Nelson {
//=============================================================================
static const int SLEEP_DURATION_MS = 10;
//=============================================================================
void
SystemCommandTask::evaluateCommand(const std::wstring& command, uint64 timeout)
{
_beginTimePoint = std::chrono::steady_clock::now();
_terminate = false;
_running = true;

auto tempOutputFile
= std::make_unique<FileSystemWrapper::Path>(FileSystemWrapper::Path::unique_path());
auto tempErrorFile
= std::make_unique<FileSystemWrapper::Path>(FileSystemWrapper::Path::unique_path());

try {
bool mustDetach = false;
std::wstring _command = detectDetachProcess(command, mustDetach);
std::wstring cmd = buildCommandString(_command);

if (mustDetach) {
executeDetachedProcess(cmd);
} else {
executeAttachedProcess(cmd, *tempOutputFile, *tempErrorFile, timeout);
}
} catch (const std::exception& e) {
_message = utf8_to_wstring(e.what());
_exitCode = -1;
}

cleanupTempFiles(*tempOutputFile, *tempErrorFile);

_running = false;
_duration = this->getDuration();
}
//=============================================================================
void
SystemCommandTask::terminate()
{
Expand All @@ -39,6 +72,16 @@ SystemCommandTask::isRunning()
return _running;
}
//=============================================================================
int
SystemCommandTask::exitCodeAbort()
{
#ifdef _MSC_VER
return int(258); // WAIT_TIMEOUT
#else
return int(128 + SIGABRT);
#endif
}
//=============================================================================
std::tuple<int, std::wstring, uint64>
SystemCommandTask::getResult()
{
Expand All @@ -57,85 +100,94 @@ SystemCommandTask::getDuration()
.count();
}
//=============================================================================
std::wstring
SystemCommandTask::buildCommandString(const std::wstring& _command)
{
std::wstring argsShell = getPlatformSpecificShellArgs();
return L"\"" + boost::process::shell().wstring() + L"\" " + argsShell + L"\"" + _command
+ L"\"";
}
//=============================================================================
void
SystemCommandTask::evaluateCommand(const std::wstring& command, uint64 timeout)
SystemCommandTask::executeDetachedProcess(const std::wstring& cmd)
{
_beginTimePoint = std::chrono::steady_clock::now();
boost::process::child childProcess(cmd);
childProcess.detach();
_exitCode = 0;
}
//=============================================================================
void
SystemCommandTask::executeAttachedProcess(const std::wstring& cmd,
const FileSystemWrapper::Path& tempOutputFile, const FileSystemWrapper::Path& tempErrorFile,
uint64 timeout)
{
boost::process::child childProcess(cmd,
boost::process::std_out > tempOutputFile.generic_string().c_str(),
boost::process::std_err > tempErrorFile.generic_string().c_str(),
boost::process::std_in < boost::process::null);

_terminate = false;
_running = true;
FileSystemWrapper::Path tempOutputFile(FileSystemWrapper::Path::unique_path());
FileSystemWrapper::Path tempErrorFile(FileSystemWrapper::Path::unique_path());
bool mustDetach = false;
std::wstring _command = detectDetachProcess(command, mustDetach);
std::wstring argsShell;
#ifdef _MSC_VER
argsShell = L" /a /c ";
#else
argsShell = L" -c ";
#endif
std::wstring cmd
= L"\"" + boost::process::shell().wstring() + L"\" " + argsShell + L"\"" + _command + L"\"";
if (mustDetach) {
boost::process::child childProcess(cmd);
childProcess.detach();
_exitCode = 0;
} else {
boost::process::child childProcess(cmd,
boost::process::std_out > tempOutputFile.generic_string().c_str(),
boost::process::std_err > tempErrorFile.generic_string().c_str(),
boost::process::std_in < boost::process::null);
monitorChildProcess(childProcess, timeout);

while (childProcess.running() && !_terminate) {
std::chrono::steady_clock::time_point _currentTimePoint
= std::chrono::steady_clock::now();
if ((timeout != 0)
&& (std::chrono::duration_cast<std::chrono::seconds>(
_currentTimePoint - _beginTimePoint)
>= std::chrono::seconds(timeout))) {
_terminate = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (!_terminate) {
_exitCode = (int)childProcess.exit_code();
_message = readProcessOutput(tempOutputFile, tempErrorFile);
}
}
//=============================================================================
void
SystemCommandTask::monitorChildProcess(boost::process::child& childProcess, uint64 timeout)
{
while (childProcess.running() && !_terminate) {
auto _currentTimePoint = std::chrono::steady_clock::now();
if (timeout != 0
&& (std::chrono::duration_cast<std::chrono::seconds>(
_currentTimePoint - _beginTimePoint)
>= std::chrono::seconds(timeout))) {
_terminate = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_DURATION_MS));
}

if (_terminate) {
this->_duration = this->getDuration();
this->_exitCode = exitCodeAbort();
FileSystemWrapper::Path::remove(tempOutputFile);
FileSystemWrapper::Path::remove(tempErrorFile);
childProcess.terminate();
_running = false;
return;
} else {
this->_exitCode = (int)childProcess.exit_code();
}
if (_terminate) {
_duration = this->getDuration();
_exitCode = exitCodeAbort();
childProcess.terminate();
}
}
//=============================================================================
std::wstring
SystemCommandTask::readProcessOutput(
const FileSystemWrapper::Path& tempOutputFile, const FileSystemWrapper::Path& tempErrorFile)
{
std::wstring outputResult;
if (_exitCode) {

std::wstring outputResult;
if (this->_exitCode) {
outputResult = readFile(tempErrorFile);
if (outputResult.empty()) {
outputResult = readFile(tempOutputFile);
}
} else {
outputResult = readFile(tempErrorFile);
if (outputResult.empty()) {
outputResult = readFile(tempOutputFile);
}
_message = outputResult;
} else {
outputResult = readFile(tempOutputFile);
}
return outputResult;
}
//=============================================================================
void
SystemCommandTask::cleanupTempFiles(
const FileSystemWrapper::Path& tempOutputFile, const FileSystemWrapper::Path& tempErrorFile)
{
FileSystemWrapper::Path::remove(tempOutputFile);
FileSystemWrapper::Path::remove(tempErrorFile);

_running = false;
_duration = this->getDuration();
}
//=============================================================================
int
SystemCommandTask::exitCodeAbort()
std::wstring
SystemCommandTask::getPlatformSpecificShellArgs()
{
#ifdef _MSC_VER
return int(258); // WAIT_TIMEOUT
return L" /a /c ";
#else
return int(128 + SIGABRT);
return L" -c ";
#endif
}
//=============================================================================
Expand Down Expand Up @@ -174,31 +226,27 @@ std::wstring
SystemCommandTask::readFile(const FileSystemWrapper::Path& filePath)
{
std::string result;
FILE* pFile;
#ifdef _MSC_VER
pFile = _wfopen(filePath.wstring().c_str(), L"r");
FILE* pFile = _wfopen(filePath.wstring().c_str(), L"r");
#else
pFile = fopen(filePath.string().c_str(), "r");
FILE* pFile = fopen(filePath.string().c_str(), "r");
#endif
if (pFile != nullptr) {
#define bufferSize 4096
#define bufferSizeMax 4096 * 2
constexpr std::streamsize bufferSize = 16384;
char buffer[bufferSize];
result.reserve(bufferSizeMax);
while (fgets(buffer, sizeof(buffer), pFile)) {

while (fgets(buffer, bufferSize * sizeof(char), pFile)) {
#ifdef _MSC_VER
std::string str = std::string(buffer);
boost::replace_all(str, "\r\n", "\n");
StringHelpers::replace_all(str, "\r\n", "\n");
OemToCharBuffA(str.c_str(), const_cast<char*>(str.c_str()), (DWORD)str.size());
result.append(str);
#else
result.append(buffer);
#endif
}
if (result.size() > 0) {
if (*result.rbegin() != '\n') {
result.append("\n");
}
if (!result.empty() && result.back() != '\n') {
result.push_back('\n');
}
fclose(pFile);
}
Expand Down
Loading

0 comments on commit 5a6ee2b

Please sign in to comment.