core: parallel backends API

- allow to replace parallel_for() backend
This commit is contained in:
Alexander Alekhin
2021-01-21 11:03:17 +00:00
parent 84676fefe3
commit b73bf03bfc
13 changed files with 528 additions and 31 deletions
+81 -26
View File
@@ -45,6 +45,8 @@
#include <opencv2/core/utils/configuration.private.hpp>
#include <opencv2/core/utils/trace.private.hpp>
#include "opencv2/core/parallel/parallel_backend.hpp"
#if defined _WIN32 || defined WINCE
#include <windows.h>
#undef small
@@ -145,9 +147,7 @@
# define CV_PARALLEL_FRAMEWORK "pthreads"
#endif
#ifdef CV_PARALLEL_FRAMEWORK
#include <atomic>
#endif
#include "parallel_impl.hpp"
@@ -159,9 +159,36 @@ namespace cv {
ParallelLoopBody::~ParallelLoopBody() {}
namespace parallel {
static int numThreads = -1;
static
std::shared_ptr<ParallelForAPI>& getCurrentParallelForAPI()
{
static std::shared_ptr<ParallelForAPI> g_currentParallelForAPI;
return g_currentParallelForAPI;
}
ParallelForAPI::~ParallelForAPI()
{
// nothing
}
void setParallelForBackend(const std::shared_ptr<ParallelForAPI>& api, bool propagateNumThreads)
{
getCurrentParallelForAPI() = api;
if (propagateNumThreads && api)
{
setNumThreads(numThreads);
}
}
} // namespace
using namespace cv::parallel;
namespace {
#ifdef CV_PARALLEL_FRAMEWORK
#ifdef ENABLE_INSTRUMENTATION
static void SyncNodes(cv::instr::InstrNode *pNode)
{
@@ -430,8 +457,6 @@ namespace {
typedef ParallelLoopBodyWrapper ProxyLoopBody;
#endif
static int numThreads = -1;
#if defined HAVE_TBB
#if TBB_INTERFACE_VERSION >= 8000
static tbb::task_arena tbbArena(tbb::task_arena::automatic);
@@ -446,7 +471,7 @@ static inline int _initMaxThreads()
int maxThreads = omp_get_max_threads();
if (!utils::getConfigurationParameterBool("OPENCV_FOR_OPENMP_DYNAMIC_DISABLE", false))
{
omp_set_dynamic(maxThreads);
omp_set_dynamic(1);
}
return maxThreads;
}
@@ -477,15 +502,11 @@ static SchedPtr pplScheduler;
#endif
#endif // CV_PARALLEL_FRAMEWORK
} // namespace anon
/* ================================ parallel_for_ ================================ */
#ifdef CV_PARALLEL_FRAMEWORK
static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); // forward declaration
#endif
void parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
{
@@ -500,7 +521,6 @@ void parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, dou
if (range.empty())
return;
#ifdef CV_PARALLEL_FRAMEWORK
static std::atomic<bool> flagNestedParallelFor(false);
bool isNotNestedRegion = !flagNestedParallelFor.load();
if (isNotNestedRegion)
@@ -519,16 +539,23 @@ void parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, dou
}
}
else // nested parallel_for_() calls are not parallelized
#endif // CV_PARALLEL_FRAMEWORK
{
CV_UNUSED(nstripes);
body(range);
}
}
#ifdef CV_PARALLEL_FRAMEWORK
static
void parallel_for_cb(int start, int end, void* data)
{
CV_DbgAssert(data);
const cv::ParallelLoopBody& body = *(const cv::ParallelLoopBody*)data;
body(Range(start, end));
}
static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
{
using namespace cv::parallel;
if ((numThreads < 0 || numThreads > 1) && range.end - range.start > 1)
{
ParallelLoopBodyWrapperContext ctx(body, range, nstripes);
@@ -540,6 +567,16 @@ static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody
return;
}
std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
if (api)
{
CV_CheckEQ(stripeRange.start, 0, "");
api->parallel_for(stripeRange.end, parallel_for_cb, (void*)&pbody);
ctx.finalize(); // propagate exceptions if exists
return;
}
#ifdef CV_PARALLEL_FRAMEWORK
#if defined HAVE_TBB
#if TBB_INTERFACE_VERSION >= 8000
@@ -590,24 +627,25 @@ static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody
#endif
ctx.finalize(); // propagate exceptions if exists
}
else
{
body(range);
}
}
return;
#endif // CV_PARALLEL_FRAMEWORK
}
body(range);
}
int getNumThreads(void)
{
#ifdef CV_PARALLEL_FRAMEWORK
std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
if (api)
{
return api->getNumThreads();
}
if(numThreads == 0)
if (numThreads == 0)
return 1;
#endif
#if defined HAVE_TBB
#if TBB_INTERFACE_VERSION >= 9100
@@ -682,10 +720,15 @@ unsigned defaultNumberOfThreads()
void setNumThreads( int threads_ )
{
CV_UNUSED(threads_);
#ifdef CV_PARALLEL_FRAMEWORK
int threads = (threads_ < 0) ? defaultNumberOfThreads() : (unsigned)threads_;
numThreads = threads;
#endif
std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
if (api)
{
api->setNumThreads(numThreads);
}
#ifdef HAVE_TBB
@@ -741,6 +784,12 @@ void setNumThreads( int threads_ )
int getThreadNum()
{
std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
if (api)
{
return api->getThreadNum();
}
#if defined HAVE_TBB
#if TBB_INTERFACE_VERSION >= 9100
return tbb::this_task_arena::current_thread_index();
@@ -963,7 +1012,13 @@ int getNumberOfCPUs()
return nCPUs; // cached value
}
const char* currentParallelFramework() {
const char* currentParallelFramework()
{
std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
if (api)
{
return api->getName();
}
#ifdef CV_PARALLEL_FRAMEWORK
return CV_PARALLEL_FRAMEWORK;
#else