diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index af50af3f8c..04e1906c75 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -203,6 +203,7 @@ set(gapi_srcs src/streaming/onevpl/engine/transcode/transcode_session.cpp src/streaming/onevpl/engine/preproc/preproc_engine.cpp src/streaming/onevpl/engine/preproc/preproc_session.cpp + src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp src/streaming/onevpl/data_provider_dispatcher.cpp diff --git a/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.cpp b/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.cpp index 82859e474c..76da3dbe50 100644 --- a/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.cpp +++ b/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.cpp @@ -15,8 +15,11 @@ namespace cv { namespace gapi { namespace wip { namespace onevpl { -BaseFrameAdapter::BaseFrameAdapter(std::shared_ptr surface, SessionHandle assoc_handle): - parent_surface_ptr(surface), parent_handle(assoc_handle) { +BaseFrameAdapter::BaseFrameAdapter(std::shared_ptr surface, + SessionHandle assoc_handle, + AccelType accel): + parent_surface_ptr(surface), parent_handle(assoc_handle), + acceleration_type(accel) { GAPI_Assert(parent_surface_ptr && "Surface is nullptr"); GAPI_Assert(parent_handle && "mfxSession is nullptr"); @@ -63,6 +66,10 @@ const BaseFrameAdapter::SessionHandle BaseFrameAdapter::get_session_handle() con cv::GFrameDesc BaseFrameAdapter::meta() const { return frame_desc; } +AccelType BaseFrameAdapter::accel_type() const { + return acceleration_type; +} + } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.hpp b/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.hpp index 3d8d951535..a3dfcf542f 100644 --- a/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.hpp +++ b/modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.hpp @@ -9,6 +9,7 @@ #include #include +#include #include "streaming/onevpl/accelerators/surface/surface.hpp" #ifdef HAVE_ONEVPL @@ -25,14 +26,17 @@ public: const SessionHandle get_session_handle() const; cv::GFrameDesc meta() const override; + AccelType accel_type() const; protected: - BaseFrameAdapter(std::shared_ptr assoc_surface, SessionHandle assoc_handle); + BaseFrameAdapter(std::shared_ptr assoc_surface, SessionHandle assoc_handle, + AccelType accel); ~BaseFrameAdapter(); std::shared_ptr surface(); std::shared_ptr parent_surface_ptr; SessionHandle parent_handle; GFrameDesc frame_desc; + AccelType acceleration_type; }; } // namespace onevpl } // namespace wip diff --git a/modules/gapi/src/streaming/onevpl/accelerators/surface/cpu_frame_adapter.cpp b/modules/gapi/src/streaming/onevpl/accelerators/surface/cpu_frame_adapter.cpp index 58be29f628..751ed7abbd 100644 --- a/modules/gapi/src/streaming/onevpl/accelerators/surface/cpu_frame_adapter.cpp +++ b/modules/gapi/src/streaming/onevpl/accelerators/surface/cpu_frame_adapter.cpp @@ -18,7 +18,7 @@ namespace onevpl { VPLMediaFrameCPUAdapter::VPLMediaFrameCPUAdapter(std::shared_ptr surface, SessionHandle assoc_handle): - BaseFrameAdapter(surface, assoc_handle) { + BaseFrameAdapter(surface, assoc_handle, AccelType::HOST) { } VPLMediaFrameCPUAdapter::~VPLMediaFrameCPUAdapter() = default; diff --git a/modules/gapi/src/streaming/onevpl/accelerators/surface/dx11_frame_adapter.cpp b/modules/gapi/src/streaming/onevpl/accelerators/surface/dx11_frame_adapter.cpp index db23a3c69f..885fa1589a 100644 --- a/modules/gapi/src/streaming/onevpl/accelerators/surface/dx11_frame_adapter.cpp +++ b/modules/gapi/src/streaming/onevpl/accelerators/surface/dx11_frame_adapter.cpp @@ -42,7 +42,7 @@ void unlock_mid(mfxMemId mid, mfxFrameData &data, MediaFrame::Access mode) { VPLMediaFrameDX11Adapter::VPLMediaFrameDX11Adapter(std::shared_ptr assoc_surface, SessionHandle assoc_handle): - BaseFrameAdapter(assoc_surface, assoc_handle) { + BaseFrameAdapter(assoc_surface, assoc_handle, AccelType::DX11) { Surface::data_t& data = assoc_surface->get_data(); LockAdapter* alloc_data = reinterpret_cast(data.MemId); diff --git a/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp new file mode 100644 index 0000000000..23ad385b51 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp @@ -0,0 +1,85 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2022 Intel Corporation + +#ifdef HAVE_ONEVPL + +#include +#include + +#include + +#include "streaming/onevpl/engine/preproc/preproc_engine.hpp" +#include "streaming/onevpl/engine/preproc/preproc_session.hpp" +#include "streaming/onevpl/engine/preproc/preproc_dispatcher.hpp" + +#include "streaming/onevpl/accelerators/accel_policy_interface.hpp" +#include "streaming/onevpl/accelerators/surface/surface.hpp" +#include "streaming/onevpl/cfg_params_parser.hpp" +#include "logger.hpp" + + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { +cv::util::optional VPPPreprocDispatcher::is_applicable(const cv::MediaFrame& in_frame) { + cv::util::optional param; + GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size()); + for (const auto &w : workers) { + param = w->is_applicable(in_frame); + if (param.has_value()) { + auto &vpp_param = param.value().get(); + BaseFrameAdapter* adapter = reinterpret_cast(vpp_param.reserved); + const IDeviceSelector::DeviceScoreTable &devs = + (std::static_pointer_cast(w))->get_accel()->get_device_selector()->select_devices(); + GAPI_DbgAssert(devs.size() >= 1 && "Invalid device selector"); + auto worker_accel_type = std::get<1>(*devs.begin()).get_type(); + GAPI_LOG_DEBUG(nullptr, "acceleration types for frame: " << to_cstring(adapter->accel_type()) << + ", for worker: " << to_cstring(worker_accel_type)); + if (worker_accel_type == adapter->accel_type()){ + vpp_param.reserved = reinterpret_cast(w.get()); + GAPI_LOG_DEBUG(nullptr, "selected worker: " << vpp_param.reserved); + break; + } + } + } + return param; +} + +pp_session VPPPreprocDispatcher::initialize_preproc(const pp_params& initial_frame_param, + const GFrameDesc& required_frame_descr) { + const auto &vpp_param = initial_frame_param.get(); + GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size()); + for (auto &w : workers) { + if (reinterpret_cast(w.get()) == vpp_param.reserved) { + pp_session sess = w->initialize_preproc(initial_frame_param, required_frame_descr); + vpp_pp_session &vpp_sess = sess.get(); + vpp_sess.reserved = reinterpret_cast(w.get()); + GAPI_LOG_DEBUG(nullptr, "initialized session preproc for worker: " << vpp_sess.reserved); + return sess; + } + } + GAPI_Assert(false && "Cannot initialize VPP preproc in dispatcher, no suitable worker"); +} + +cv::MediaFrame VPPPreprocDispatcher::run_sync(const pp_session &session_handle, + const cv::MediaFrame& in_frame, + const cv::util::optional &opt_roi) { + const auto &vpp_sess = session_handle.get(); + GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size()); + for (auto &w : workers) { + if (reinterpret_cast(w.get()) == vpp_sess.reserved) { + GAPI_LOG_DEBUG(nullptr, "trigger execution on worker: " << vpp_sess.reserved); + return w->run_sync(session_handle, in_frame, opt_roi); + } + } + GAPI_Assert(false && "Cannot invoke VPP preproc in dispatcher, no suitable worker"); +} +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_ONEVPL diff --git a/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.hpp b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.hpp new file mode 100644 index 0000000000..6e2ebc81f9 --- /dev/null +++ b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.hpp @@ -0,0 +1,53 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2022 Intel Corporation + +#ifndef GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP +#define GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP + +#include +#include + +#include "streaming/onevpl/engine/preproc_engine_interface.hpp" +#include "streaming/onevpl/engine/preproc_defines.hpp" + +#ifdef HAVE_ONEVPL +#include "streaming/onevpl/onevpl_export.hpp" + +namespace cv { +namespace gapi { +namespace wip { +namespace onevpl { + +// GAPI_EXPORTS for tests +class GAPI_EXPORTS VPPPreprocDispatcher final : public cv::gapi::wip::IPreprocEngine { +public: + + cv::util::optional is_applicable(const cv::MediaFrame& in_frame) override; + + pp_session initialize_preproc(const pp_params& initial_frame_param, + const GFrameDesc& required_frame_descr) override; + + cv::MediaFrame run_sync(const pp_session &session_handle, + const cv::MediaFrame& in_frame, + const cv::util::optional &opt_roi) override; + + template + void insert_worker(Args&& ...args) { + workers.emplace_back(std::make_shared(std::forward(args)...)); + } + + size_t size() const { + return workers.size(); + } +private: + std::vector> workers; +}; +} // namespace onevpl +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_ONEVPL +#endif // GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP diff --git a/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_engine.cpp b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_engine.cpp index 7de363fad5..d205211903 100644 --- a/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_engine.cpp +++ b/modules/gapi/src/streaming/onevpl/engine/preproc/preproc_engine.cpp @@ -163,7 +163,8 @@ cv::util::optional VPPPreprocEngine::is_applicable(const cv::MediaFra if (vpl_adapter) { ret = cv::util::make_optional( pp_params::create(vpl_adapter->get_session_handle(), - vpl_adapter->get_surface()->get_info())); + vpl_adapter->get_surface()->get_info(), + vpl_adapter)); GAPI_LOG_DEBUG(nullptr, "VPP preprocessing applicable, session [" << vpl_adapter->get_session_handle() << "]"); } @@ -203,7 +204,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p // check In & Out equally to bypass preproc if (mfxVPPParams.vpp.Out == mfxVPPParams.vpp.In) { GAPI_LOG_DEBUG(nullptr, "no preproc required"); - return pp_session::create(nullptr); + return pp_session::create(nullptr); } // recalculate size param according to VPP alignment @@ -221,7 +222,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p auto it = preproc_session_map.find(mfxVPPParams.vpp.In); if (it != preproc_session_map.end()) { GAPI_LOG_DEBUG(nullptr, "[" << it->second->session << "] found"); - return pp_session::create(std::static_pointer_cast(it->second)); + return pp_session::create(std::static_pointer_cast(it->second)); } // NB: make some sanity checks @@ -311,7 +312,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p bool inserted = preproc_session_map.emplace(mfxVPPParams.vpp.In, sess_ptr).second; GAPI_Assert(inserted && "preproc session is exist"); GAPI_LOG_INFO(nullptr, "VPPPreprocSession created, total sessions: " << preproc_session_map.size()); - return pp_session::create(std::static_pointer_cast(sess_ptr)); + return pp_session::create(std::static_pointer_cast(sess_ptr)); } void VPPPreprocEngine::on_frame_ready(session_type& sess, @@ -339,12 +340,12 @@ VPPPreprocEngine::initialize_session(mfxSession, cv::MediaFrame VPPPreprocEngine::run_sync(const pp_session& sess, const cv::MediaFrame& in_frame, const cv::util::optional &roi) { - std::shared_ptr pp_sess_impl = sess.get(); - if (!pp_sess_impl) { + vpp_pp_session pp_sess_impl = sess.get(); + if (!pp_sess_impl.handle) { // bypass case return in_frame; } - session_ptr_type s = std::static_pointer_cast(pp_sess_impl); + session_ptr_type s = std::static_pointer_cast(pp_sess_impl.handle); GAPI_DbgAssert(s && "Session is nullptr"); GAPI_DbgAssert(is_applicable(in_frame) && "VPP preproc is not applicable for the given frame"); diff --git a/modules/gapi/src/streaming/onevpl/engine/preproc/vpp_preproc_defines.hpp b/modules/gapi/src/streaming/onevpl/engine/preproc/vpp_preproc_defines.hpp index 820510a55d..780c9cf5d7 100644 --- a/modules/gapi/src/streaming/onevpl/engine/preproc/vpp_preproc_defines.hpp +++ b/modules/gapi/src/streaming/onevpl/engine/preproc/vpp_preproc_defines.hpp @@ -18,9 +18,13 @@ namespace onevpl { struct vpp_pp_params { mfxSession handle; mfxFrameInfo info; + void *reserved = nullptr; }; -using vpp_pp_session_ptr = std::shared_ptr; +struct vpp_pp_session { + std::shared_ptr handle; + void *reserved = nullptr; +}; } // namespace onevpl } // namespace wip } // namespace gapi diff --git a/modules/gapi/src/streaming/onevpl/engine/preproc_defines.hpp b/modules/gapi/src/streaming/onevpl/engine/preproc_defines.hpp index 2c72d7c547..5f68d9c4f7 100644 --- a/modules/gapi/src/streaming/onevpl/engine/preproc_defines.hpp +++ b/modules/gapi/src/streaming/onevpl/engine/preproc_defines.hpp @@ -19,12 +19,12 @@ namespace wip { #ifdef VPP_PREPROC_ENGINE #define GAPI_BACKEND_PP_PARAMS cv::gapi::wip::onevpl::vpp_pp_params -#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::onevpl::vpp_pp_session_ptr +#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::onevpl::vpp_pp_session #else // VPP_PREPROC_ENGINE struct empty_pp_params {}; struct empty_pp_session {}; #define GAPI_BACKEND_PP_PARAMS cv::gapi::wip::empty_pp_params; -#define GAPI_BACKEND_PP_SESSIONS std::shared_ptr; +#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::empty_pp_session; #endif // VPP_PREPROC_ENGINE struct pp_params { @@ -57,26 +57,25 @@ private: struct pp_session { using value_type = cv::util::variant; - template - static pp_session create(std::shared_ptr session) { - static_assert(cv::detail::contains, + template + static pp_session create(Args&& ...args) { + static_assert(cv::detail::contains::value, "Invalid BackendSpecificSesionType requested"); pp_session ret; - ret.value = session; + ret.value = BackendSpecificSesionType{std::forward(args)...};; return ret; } template - std::shared_ptr get() { - using ptr_type = std::shared_ptr; - static_assert(cv::detail::contains::value, + BackendSpecificSesionType &get() { + static_assert(cv::detail::contains::value, "Invalid BackendSpecificSesionType requested"); - return cv::util::get(value); + return cv::util::get(value); } template - std::shared_ptr get() const { + const BackendSpecificSesionType &get() const { return const_cast(this)->get(); } private: diff --git a/modules/gapi/test/streaming/gapi_streaming_vpp_preproc_test.cpp b/modules/gapi/test/streaming/gapi_streaming_vpp_preproc_test.cpp index a0a66c7b93..9c0cc9ca4a 100644 --- a/modules/gapi/test/streaming/gapi_streaming_vpp_preproc_test.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_vpp_preproc_test.cpp @@ -49,6 +49,7 @@ #include "streaming/onevpl/engine/preproc/preproc_engine.hpp" #include "streaming/onevpl/engine/preproc/preproc_session.hpp" +#include "streaming/onevpl/engine/preproc/preproc_dispatcher.hpp" #include "streaming/onevpl/engine/transcode/transcode_engine_legacy.hpp" #include "streaming/onevpl/engine/transcode/transcode_session.hpp" @@ -279,8 +280,8 @@ TEST(OneVPL_Source_PreprocEngine, functional_single_thread) pp_session pp_sess = preproc_engine.initialize_preproc(params.value(), required_frame_param); - ASSERT_EQ(pp_sess.get().get(), - first_pp_sess.get().get()); + ASSERT_EQ(pp_sess.get().handle.get(), + first_pp_sess.get().handle.get()); cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess, decoded_frame, @@ -319,7 +320,7 @@ void decode_function(cv::gapi::wip::onevpl::VPLLegacyDecodeEngine &decode_engine queue.push_stop(); } -void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, SafeQueue&queue, +void preproc_function(cv::gapi::wip::IPreprocEngine &preproc_engine, SafeQueue&queue, size_t &preproc_number, const out_frame_info_t &required_frame_param, const cv::util::optional &roi_rect = {}) { using namespace cv::gapi::wip; @@ -361,12 +362,15 @@ void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, S cv::util::optional params = preproc_engine.is_applicable(decoded_frame); ASSERT_TRUE(params.has_value()); - ASSERT_TRUE(0 == memcmp(¶ms.value(), &first_pp_params.value(), sizeof(pp_params))); + const auto &vpp_params = params.value().get(); + const auto &first_vpp_params = first_pp_params.value().get(); + ASSERT_EQ(vpp_params.handle, first_vpp_params.handle); + ASSERT_TRUE(0 == memcmp(&vpp_params.info, &first_vpp_params.info, sizeof(mfxFrameInfo))); pp_session pp_sess = preproc_engine.initialize_preproc(params.value(), required_frame_param); - ASSERT_EQ(pp_sess.get().get(), - first_pp_sess.get().get()); + ASSERT_EQ(pp_sess.get().handle.get(), + first_pp_sess.get().handle.get()); cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess, decoded_frame, empty_roi); cv::GFrameDesc pp_desc = pp_frame.desc(); @@ -381,6 +385,71 @@ void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, S ASSERT_NE(preproc_number, 1); } +void multi_source_preproc_function(size_t source_num, + cv::gapi::wip::IPreprocEngine &preproc_engine, SafeQueue&queue, + size_t &preproc_number, const out_frame_info_t &required_frame_param, + const cv::util::optional &roi_rect = {}) { + using namespace cv::gapi::wip; + using namespace cv::gapi::wip::onevpl; + // create preproc session based on frame description & network info + cv::MediaFrame first_decoded_frame = queue.pop(); + cv::util::optional first_pp_params = preproc_engine.is_applicable(first_decoded_frame); + ASSERT_TRUE(first_pp_params.has_value()); + pp_session first_pp_sess = + preproc_engine.initialize_preproc(first_pp_params.value(), + required_frame_param); + + // make preproc using incoming decoded frame & preproc session + cv::MediaFrame first_pp_frame = preproc_engine.run_sync(first_pp_sess, + first_decoded_frame, + roi_rect); + cv::GFrameDesc first_outcome_pp_desc = first_pp_frame.desc(); + + // do not hold media frames because they share limited DX11 surface pool resources + first_decoded_frame = cv::MediaFrame(); + first_pp_frame = cv::MediaFrame(); + + // launch pipeline + bool in_progress = false; + preproc_number = 1; + size_t received_stop_count = 0; + try { + while(received_stop_count != source_num) { + cv::MediaFrame decoded_frame = queue.pop(); + if (SafeQueue::is_stop(decoded_frame)) { + ++received_stop_count; + continue; + } + in_progress = true; + + cv::util::optional params = preproc_engine.is_applicable(decoded_frame); + ASSERT_TRUE(params.has_value()); + + pp_session pp_sess = preproc_engine.initialize_preproc(params.value(), + required_frame_param); + cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess, decoded_frame, empty_roi); + cv::GFrameDesc pp_desc = pp_frame.desc(); + ASSERT_TRUE(pp_desc == first_outcome_pp_desc); + in_progress = false; + decoded_frame = cv::MediaFrame(); + preproc_number++; + } + } catch (const std::exception& ex) { + GAPI_LOG_WARNING(nullptr, "Caught exception in preproc worker: " << ex.what()); + } + + // test if interruption has happened + if (in_progress) { + while (true) { + cv::MediaFrame decoded_frame = queue.pop(); + if (SafeQueue::is_stop(decoded_frame)) { + break; + } + } + } + ASSERT_FALSE(in_progress); + ASSERT_NE(preproc_number, 1); +} using roi_t = cv::util::optional; using preproc_roi_args_t = decltype(std::tuple_cat(std::declval(), std::declval>())); @@ -548,6 +617,93 @@ TEST_P(VPPInnerPreprocParams, functional_inner_preproc_size) INSTANTIATE_TEST_CASE_P(OneVPL_Source_PreprocInner, VPPInnerPreprocParams, testing::ValuesIn(files)); + +// Dispatcher test suite +class VPPPreprocDispatcherROIParams : public ::testing::TestWithParam {}; +TEST_P(VPPPreprocDispatcherROIParams, functional_roi_different_threads) +{ + using namespace cv::gapi::wip; + using namespace cv::gapi::wip::onevpl; + source_t file_path; + decoder_t decoder_id; + acceleration_t accel = MFX_ACCEL_MODE_VIA_D3D11; + out_frame_info_t required_frame_param; + roi_t opt_roi; + std::tie(file_path, decoder_id, std::ignore, required_frame_param, opt_roi) = GetParam(); + + file_path = findDataFile(file_path); + + std::vector cfg_params_w_dx11; + cfg_params_w_dx11.push_back(CfgParam::create_acceleration_mode(accel)); + std::unique_ptr decode_accel_policy ( + new VPLDX11AccelerationPolicy(std::make_shared(cfg_params_w_dx11))); + + // create file data provider + std::shared_ptr data_provider(new FileDataProvider(file_path, + {CfgParam::create_decoder_id(decoder_id)})); + std::shared_ptr cpu_data_provider(new FileDataProvider(file_path, + {CfgParam::create_decoder_id(decoder_id)})); + + mfxLoader mfx{}; + mfxConfig mfx_cfg{}; + std::tie(mfx, mfx_cfg) = prepare_mfx(decoder_id, accel); + + // create decode session + mfxSession mfx_decode_session{}; + mfxStatus sts = MFXCreateSession(mfx, 0, &mfx_decode_session); + EXPECT_EQ(MFX_ERR_NONE, sts); + + mfxSession mfx_cpu_decode_session{}; + sts = MFXCreateSession(mfx, 0, &mfx_cpu_decode_session); + EXPECT_EQ(MFX_ERR_NONE, sts); + + // create decode engines + auto device_selector = decode_accel_policy->get_device_selector(); + VPLLegacyDecodeEngine decode_engine(std::move(decode_accel_policy)); + auto sess_ptr = decode_engine.initialize_session(mfx_decode_session, + cfg_params_w_dx11, + data_provider); + std::vector cfg_params_cpu; + auto cpu_device_selector = std::make_shared(cfg_params_cpu); + VPLLegacyDecodeEngine cpu_decode_engine(std::unique_ptr{ + new VPLCPUAccelerationPolicy(cpu_device_selector)}); + auto cpu_sess_ptr = cpu_decode_engine.initialize_session(mfx_cpu_decode_session, + cfg_params_cpu, + cpu_data_provider); + + // create VPP preproc engines + VPPPreprocDispatcher preproc_dispatcher; + preproc_dispatcher.insert_worker(std::unique_ptr{ + new VPLDX11AccelerationPolicy(device_selector)}); + preproc_dispatcher.insert_worker(std::unique_ptr{ + new VPLCPUAccelerationPolicy(cpu_device_selector)}); + + // launch threads + SafeQueue queue; + size_t decoded_number = 1; + size_t cpu_decoded_number = 1; + size_t preproc_number = 0; + + std::thread decode_thread(decode_function, std::ref(decode_engine), sess_ptr, + std::ref(queue), std::ref(decoded_number)); + std::thread cpu_decode_thread(decode_function, std::ref(cpu_decode_engine), cpu_sess_ptr, + std::ref(queue), std::ref(cpu_decoded_number)); + std::thread preproc_thread(multi_source_preproc_function, + preproc_dispatcher.size(), + std::ref(preproc_dispatcher), + std::ref(queue), std::ref(preproc_number), + std::cref(required_frame_param), + std::cref(opt_roi)); + + decode_thread.join(); + cpu_decode_thread.join(); + preproc_thread.join(); + ASSERT_EQ(preproc_number, decoded_number + cpu_decoded_number); +} + +INSTANTIATE_TEST_CASE_P(OneVPL_Source_PreprocDispatcherROI, VPPPreprocDispatcherROIParams, + testing::ValuesIn(files_w_roi)); + #endif // HAVE_DIRECTX #endif // HAVE_D3D11 } // namespace opencv_test