Merge pull request #19365 from alalek:parallel_api

This commit is contained in:
Alexander Alekhin
2021-01-27 18:12:15 +00:00
13 changed files with 528 additions and 31 deletions
+4
View File
@@ -97,6 +97,10 @@
@}
@defgroup core_lowlevel_api Low-level API for external libraries / plugins
@}
@defgroup core_parallel Parallel Processing
@{
@defgroup core_parallel_backend Parallel backends API
@}
@}
*/
@@ -0,0 +1,72 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_PARALLEL_FOR_OPENMP_HPP
#define OPENCV_CORE_PARALLEL_FOR_OPENMP_HPP
#include "opencv2/core/parallel/parallel_backend.hpp"
#if !defined(_OPENMP) && !defined(OPENCV_SKIP_OPENMP_PRESENSE_CHECK)
#error "This file must be compiled with enabled OpenMP"
#endif
#include <omp.h>
namespace cv { namespace parallel { namespace openmp {
/** OpenMP parallel_for API implementation
*
* @sa setParallelForBackend
* @ingroup core_parallel_backend
*/
class ParallelForBackend : public ParallelForAPI
{
protected:
int numThreads;
int numThreadsMax;
public:
ParallelForBackend()
{
numThreads = 0;
numThreadsMax = omp_get_max_threads();
}
virtual ~ParallelForBackend() {}
virtual void parallel_for(int tasks, FN_parallel_for_body_cb_t body_callback, void* callback_data) CV_OVERRIDE
{
#pragma omp parallel for schedule(dynamic) num_threads(numThreads > 0 ? numThreads : numThreadsMax)
for (int i = 0; i < tasks; ++i)
body_callback(i, i + 1, callback_data);
}
virtual int getThreadNum() const CV_OVERRIDE
{
return omp_get_thread_num();
}
virtual int getNumThreads() const CV_OVERRIDE
{
return numThreads > 0
? numThreads
: numThreadsMax;
}
virtual int setNumThreads(int nThreads) CV_OVERRIDE
{
int oldNumThreads = numThreads;
numThreads = nThreads;
// nothing needed as numThreads is used in #pragma omp parallel for directly
return oldNumThreads;
}
const char* getName() const CV_OVERRIDE
{
return "openmp";
}
};
}}} // namespace
#endif // OPENCV_CORE_PARALLEL_FOR_OPENMP_HPP
@@ -0,0 +1,153 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_PARALLEL_FOR_TBB_HPP
#define OPENCV_CORE_PARALLEL_FOR_TBB_HPP
#include "opencv2/core/parallel/parallel_backend.hpp"
#include <opencv2/core/utils/logger.hpp>
#ifndef TBB_SUPPRESS_DEPRECATED_MESSAGES // supress warning
#define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
#endif
#include "tbb/tbb.h"
#if !defined(TBB_INTERFACE_VERSION)
#error "Unknows/unsupported TBB version"
#endif
#if TBB_INTERFACE_VERSION >= 8000
#include "tbb/task_arena.h"
#endif
namespace cv { namespace parallel { namespace tbb {
using namespace ::tbb;
#if TBB_INTERFACE_VERSION >= 8000
static tbb::task_arena& getArena()
{
static tbb::task_arena tbbArena(tbb::task_arena::automatic);
return tbbArena;
}
#else
static tbb::task_scheduler_init& getScheduler()
{
static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred);
return tbbScheduler;
}
#endif
/** OpenMP parallel_for API implementation
*
* @sa setParallelForBackend
* @ingroup core_parallel_backend
*/
class ParallelForBackend : public ParallelForAPI
{
protected:
int numThreads;
int numThreadsMax;
public:
ParallelForBackend()
{
CV_LOG_INFO(NULL, "Initializing TBB parallel backend: TBB_INTERFACE_VERSION=" << TBB_INTERFACE_VERSION);
numThreads = 0;
#if TBB_INTERFACE_VERSION >= 8000
(void)getArena();
#else
(void)getScheduler();
#endif
}
virtual ~ParallelForBackend() {}
class CallbackProxy
{
const FN_parallel_for_body_cb_t& callback;
void* const callback_data;
const int tasks;
public:
inline CallbackProxy(int tasks_, FN_parallel_for_body_cb_t& callback_, void* callback_data_)
: callback(callback_), callback_data(callback_data_), tasks(tasks_)
{
// nothing
}
void operator()(const tbb::blocked_range<int>& range) const
{
this->callback(range.begin(), range.end(), callback_data);
}
void operator()() const
{
tbb::parallel_for(tbb::blocked_range<int>(0, tasks), *this);
}
};
virtual void parallel_for(int tasks, FN_parallel_for_body_cb_t body_callback, void* callback_data) CV_OVERRIDE
{
CallbackProxy task(tasks, body_callback, callback_data);
#if TBB_INTERFACE_VERSION >= 8000
getArena().execute(task);
#else
task();
#endif
}
virtual int getThreadNum() const CV_OVERRIDE
{
#if TBB_INTERFACE_VERSION >= 9100
return tbb::this_task_arena::current_thread_index();
#elif TBB_INTERFACE_VERSION >= 8000
return tbb::task_arena::current_thread_index();
#else
return 0;
#endif
}
virtual int getNumThreads() const CV_OVERRIDE
{
#if TBB_INTERFACE_VERSION >= 9100
return getArena().max_concurrency();
#elif TBB_INTERFACE_VERSION >= 8000
return numThreads > 0
? numThreads
: tbb::task_scheduler_init::default_num_threads();
#else
return getScheduler().is_active()
? numThreads
: tbb::task_scheduler_init::default_num_threads();
#endif
}
virtual int setNumThreads(int nThreads) CV_OVERRIDE
{
int oldNumThreads = numThreads;
numThreads = nThreads;
#if TBB_INTERFACE_VERSION >= 8000
auto& tbbArena = getArena();
if (tbbArena.is_active())
tbbArena.terminate();
if (numThreads > 0)
tbbArena.initialize(numThreads);
#else
auto& tbbScheduler = getScheduler();
if (tbbScheduler.is_active())
tbbScheduler.terminate();
if (numThreads > 0)
tbbScheduler.initialize(numThreads);
#endif
return oldNumThreads;
}
const char* getName() const CV_OVERRIDE
{
return "tbb";
}
};
}}} // namespace
#endif // OPENCV_CORE_PARALLEL_FOR_TBB_HPP
@@ -0,0 +1,73 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_PARALLEL_BACKEND_HPP
#define OPENCV_CORE_PARALLEL_BACKEND_HPP
#include <memory>
namespace cv { namespace parallel {
#ifndef CV_API_CALL
#define CV_API_CALL
#endif
/** @addtogroup core_parallel_backend
* @{
* API below is provided to resolve problem of CPU resource over-subscription by multiple thread pools from different multi-threading frameworks.
* This is common problem for cases when OpenCV compiled threading framework is different from the Users Applications framework.
*
* Applications can replace OpenCV `parallel_for()` backend with own implementation (to reuse Application's thread pool).
*
* @note This call is not thread-safe. Consider calling this function from the `main()` before any other OpenCV processing functions (and without any other created threads).
*
* #### Intel TBB usage example:
*
* - include header with simple implementation of TBB backend:
* @snippet parallel_backend/example-tbb.cpp tbb_include
* - execute backend replacement code:
* @snippet parallel_backend/example-tbb.cpp tbb_backend
* - configuration of compiler/linker options is responsibility of Application's scripts
*
* #### OpenMP usage example:
*
* - include header with simple implementation of OpenMP backend:
* @snippet parallel_backend/example-openmp.cpp openmp_include
* - execute backend replacement code:
* @snippet parallel_backend/example-openmp.cpp openmp_backend
* - Configuration of compiler/linker options is responsibility of Application's scripts
*/
/** Interface for parallel_for backends implementations
*
* @sa setParallelForBackend
*/
class CV_EXPORTS ParallelForAPI
{
public:
virtual ~ParallelForAPI();
typedef void (CV_API_CALL *FN_parallel_for_body_cb_t)(int start, int end, void* data);
virtual void parallel_for(int tasks, FN_parallel_for_body_cb_t body_callback, void* callback_data) = 0;
virtual int getThreadNum() const = 0;
virtual int getNumThreads() const = 0;
virtual int setNumThreads(int nThreads) = 0;
virtual const char* getName() const = 0;
};
/** @brief Replace OpenCV parallel_for backend
*
* Application can replace OpenCV `parallel_for()` backend with own implementation.
*
* @note This call is not thread-safe. Consider calling this function from the `main()` before any other OpenCV processing functions (and without any other created threads).
*/
CV_EXPORTS void setParallelForBackend(const std::shared_ptr<ParallelForAPI>& api, bool propagateNumThreads = true);
//! @}
}} // namespace
#endif // OPENCV_CORE_PARALLEL_BACKEND_HPP
+15 -4
View File
@@ -570,6 +570,8 @@ static inline size_t getElemSize(int type) { return (size_t)CV_ELEM_SIZE(type);
/////////////////////////////// Parallel Primitives //////////////////////////////////
/** @brief Base class for parallel data processors
@ingroup core_parallel
*/
class CV_EXPORTS ParallelLoopBody
{
@@ -579,17 +581,23 @@ public:
};
/** @brief Parallel data processor
@ingroup core_parallel
*/
CV_EXPORTS void parallel_for_(const Range& range, const ParallelLoopBody& body, double nstripes=-1.);
//! @ingroup core_parallel
class ParallelLoopBodyLambdaWrapper : public ParallelLoopBody
{
private:
std::function<void(const Range&)> m_functor;
public:
ParallelLoopBodyLambdaWrapper(std::function<void(const Range&)> functor) :
m_functor(functor)
{ }
inline
ParallelLoopBodyLambdaWrapper(std::function<void(const Range&)> functor)
: m_functor(functor)
{
// nothing
}
virtual void operator() (const cv::Range& range) const CV_OVERRIDE
{
@@ -597,11 +605,14 @@ public:
}
};
inline void parallel_for_(const Range& range, std::function<void(const Range&)> functor, double nstripes=-1.)
//! @ingroup core_parallel
static inline
void parallel_for_(const Range& range, std::function<void(const Range&)> functor, double nstripes=-1.)
{
parallel_for_(range, ParallelLoopBodyLambdaWrapper(functor), nstripes);
}
/////////////////////////////// forEach method of cv::Mat ////////////////////////////
template<typename _Tp, typename Functor> inline
void Mat::forEach_impl(const Functor& operation) {