diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index ae4d20c024..613c87f6f7 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -60,6 +60,7 @@ set(gapi_srcs # Executor src/executor/gexecutor.cpp + src/executor/gasync.cpp # CPU Backend (currently built-in) src/backends/cpu/gcpubackend.cpp diff --git a/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp new file mode 100644 index 0000000000..0472c474c1 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp @@ -0,0 +1,28 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + + +#ifndef OPENCV_GAPI_GCOMPILED_ASYNC_HPP +#define OPENCV_GAPI_GCOMPILED_ASYNC_HPP + +#include +#include //for std::exception_ptr +#include //for std::function +#include "opencv2/gapi/garg.hpp" + +namespace cv { + //fwd declaration + class GCompiled; + +namespace gapi{ +namespace wip { + GAPI_EXPORTS void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs); + GAPI_EXPORTS std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs); +} // namespace gapi +} // namespace wip +} // namespace cv + +#endif // OPENCV_GAPI_GCOMPILED_ASYNC_HPP diff --git a/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp new file mode 100644 index 0000000000..2daaf35cf8 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp @@ -0,0 +1,29 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_GCOMPUTATION_ASYNC_HPP +#define OPENCV_GAPI_GCOMPUTATION_ASYNC_HPP + + +#include +#include //for std::exception_ptr +#include //for std::function +#include "opencv2/gapi/garg.hpp" //for GRunArgs, GRunArgsP +#include "opencv2/gapi/gcommon.hpp" //for GCompileArgs + +namespace cv { + //fwd declaration + class GComputation; +namespace gapi { +namespace wip { + GAPI_EXPORTS void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {}); + GAPI_EXPORTS std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {}); +} // nmaepspace gapi +} // namespace wip +} // namespace cv + + +#endif //OPENCV_GAPI_GCOMPUTATION_ASYNC_HPP diff --git a/modules/gapi/src/executor/gasync.cpp b/modules/gapi/src/executor/gasync.cpp new file mode 100644 index 0000000000..8cc83108ae --- /dev/null +++ b/modules/gapi/src/executor/gasync.cpp @@ -0,0 +1,198 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#include "opencv2/gapi/gcomputation_async.hpp" +#include "opencv2/gapi/gcomputation.hpp" +#include "opencv2/gapi/gcompiled_async.hpp" +#include "opencv2/gapi/gcompiled.hpp" + +#include + +#include +#include +//#include +#include +#include + +namespace { + //This is a tool to move initialize captures of a lambda in C++11 + template + struct move_through_copy{ + T value; + move_through_copy(T&& g) : value(std::move(g)) {} + move_through_copy(move_through_copy&&) = default; + move_through_copy(move_through_copy const& lhs) : move_through_copy(std::move(const_cast(lhs))) {} + }; +} + +namespace cv { +namespace gapi { +namespace wip { + +namespace impl{ + +class async_service { + + std::mutex mtx; + std::condition_variable cv; + std::queue> q; + std::atomic exiting = {false}; + std::atomic thread_started = {false}; + + std::thread thrd; + +public: + async_service() = default ; + + void add_task(std::function&& t){ + if (!thread_started) + { + //thread has not been started yet, so start it + //try to Compare And Swap the flag, false -> true + //If there are multiple threads - only single one will succeed in changing the value. + bool expected = false; + if (thread_started.compare_exchange_strong(expected, true)) + { + //have won (probable) race - so actually start the thread + thrd = std::thread {[this](){ + //move the whole queue into local instance in order to minimize time the protecting lock is held + decltype(q) second_q; + while (!exiting){ + std::unique_lock lck{mtx}; + if (q.empty()) + { + //block current thread until arrival of exit request or new elements + cv.wait(lck, [&](){ return exiting || !q.empty();}); + } + //usually swap for std::queue is plain pointers exchange, so relatively cheap + q.swap(second_q); + lck.unlock(); + + while (!second_q.empty()) + { + auto& f = second_q.front(); + f(); + second_q.pop(); + } + } + }}; + } + } + std::unique_lock lck{mtx}; + bool first_task = q.empty(); + q.push(std::move(t)); + lck.unlock(); + + if (first_task) + { + //as the queue was empty before adding the task, + //the thread might be sleeping, so wake it up + cv.notify_one(); + } + } + ~async_service(){ + if (thread_started && thrd.joinable()) + { + exiting = true; + mtx.lock(); + mtx.unlock(); + cv.notify_one(); + thrd.join(); + } + } +}; + +async_service the_ctx; +} + +namespace { +template +std::exception_ptr call_and_catch(f_t&& f){ + std::exception_ptr eptr; + try { + std::forward(f)(); + } catch(...) { + eptr = std::current_exception(); + } + + return eptr; +} + +template +void call_with_callback(f_t&& f, callback_t&& cb){ + auto eptr = call_and_catch(std::forward(f)); + std::forward(cb)(eptr); +} + +template +void call_with_futute(f_t&& f, std::promise& p){ + auto eptr = call_and_catch(std::forward(f)); + if (eptr){ + p.set_exception(eptr); + } + else { + p.set_value(); + } +} +}//namespace + +//For now these async functions are simply wrapping serial version of apply/operator() into a functor. +//These functors are then serialized into single queue, which is when processed by a devoted background thread. +void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){ + //TODO: use move_through_copy for all args except gcomp + auto l = [=]() mutable { + auto apply_l = [&](){ + gcomp.apply(std::move(ins), std::move(outs), std::move(args)); + }; + + call_with_callback(apply_l,std::move(callback)); + }; + impl::the_ctx.add_task(l); +} + +std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){ + move_through_copy> prms{{}}; + auto f = prms.value.get_future(); + auto l = [=]() mutable { + auto apply_l = [&](){ + gcomp.apply(std::move(ins), std::move(outs), std::move(args)); + }; + + call_with_futute(apply_l, prms.value); + }; + + impl::the_ctx.add_task(l); + return f; +} + +void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs){ + auto l = [=]() mutable { + auto apply_l = [&](){ + gcmpld(std::move(ins), std::move(outs)); + }; + + call_with_callback(apply_l,std::move(callback)); + }; + + impl::the_ctx.add_task(l); +} + +std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs){ + move_through_copy> prms{{}}; + auto f = prms.value.get_future(); + auto l = [=]() mutable { + auto apply_l = [&](){ + gcmpld(std::move(ins), std::move(outs)); + }; + + call_with_futute(apply_l, prms.value); + }; + + impl::the_ctx.add_task(l); + return f; + +} +}}} //namespace wip namespace gapi namespace cv diff --git a/modules/gapi/test/gapi_async_test.cpp b/modules/gapi/test/gapi_async_test.cpp new file mode 100644 index 0000000000..633db0c4b6 --- /dev/null +++ b/modules/gapi/test/gapi_async_test.cpp @@ -0,0 +1,290 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + + +#include "test_precomp.hpp" +#include "opencv2/gapi/gcomputation_async.hpp" +#include "opencv2/gapi/gcompiled_async.hpp" + +#include +#include + +namespace opencv_test +{ +struct SumOfSum{ + cv::GComputation sum_of_sum; + SumOfSum() : sum_of_sum([]{ + cv::GMat in; + cv::GScalar out = cv::gapi::sum(in + in); + return GComputation{in, out}; + }) + {} +}; + +struct SumOfSum2x2 : SumOfSum { + const cv::Size sz{2, 2}; + cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; + cv::Scalar out; + + cv::GCompiled compile(){ + return sum_of_sum.compile(descr_of(in_mat)); + } + + cv::GComputation& computation(){ + return sum_of_sum; + } + + cv::GCompileArgs compile_args(){ + return {}; + } + + cv::GRunArgs in_args(){ + return cv::gin(in_mat); + } + + cv::GRunArgsP out_args(){ + return cv::gout(out); + } + + void verify(){ + EXPECT_EQ(8, out[0]); + } +}; + +namespace { + G_TYPED_KERNEL(GThrow, , "org.opencv.test.throw") + { + static GMatDesc outMeta(GMatDesc in) { return in; } + + }; + + struct gthrow_exception : std::runtime_error { + using std::runtime_error::runtime_error; + }; + + GAPI_OCV_KERNEL(GThrowImpl, GThrow) + { + static void run(const cv::Mat& in, cv::Mat&) + { + //this condition is needed to avoid "Unreachable code" warning on windows inside OCVCallHelper + if (!in.empty()) + { + throw gthrow_exception{"test"}; + } + } + }; +} + +struct ExceptionOnExecution { + cv::GComputation throwing_gcomp; + ExceptionOnExecution() : throwing_gcomp([]{ + cv::GMat in; + auto gout = GThrow::on(in); + return GComputation{in, gout}; + }) + {} + + + const cv::Size sz{2, 2}; + cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; + cv::Mat out; + + cv::GCompiled compile(){ + return throwing_gcomp.compile(descr_of(in_mat), compile_args()); + } + + cv::GComputation& computation(){ + return throwing_gcomp; + } + + cv::GRunArgs in_args(){ + return cv::gin(in_mat); + } + + cv::GRunArgsP out_args(){ + return cv::gout(out); + } + + cv::GCompileArgs compile_args(){ + auto pkg = cv::gapi::kernels(); + return cv::compile_args(pkg); + } + +}; + +template +struct crtp_cast { + template + static crtp_final_t* crtp_cast_(crtp_base_t* this_) + { + return static_cast(this_); + } +}; + +template +struct CallBack: crtp_cast { + std::atomic callback_called = {false}; + std::mutex mtx; + std::exception_ptr ep; + + std::condition_variable cv; + + std::function callback(){ + return [&](std::exception_ptr ep_){ + ep = ep_; + callback_called = true; + mtx.lock(); + mtx.unlock(); + cv.notify_one(); + }; + }; + + template + void start_async(Args&&... args){ + this->crtp_cast_(this)->async(callback(), std::forward(args)...); + } + + void wait_for_result() + { + std::unique_lock lck{mtx}; + cv.wait(lck,[&]{return callback_called == true;}); + if (ep) + { + std::rethrow_exception(ep); + } + } +}; + +template +struct Future: crtp_cast { + std::future f; + + template + void start_async(Args&&... args){ + f = this->crtp_cast_(this)->async(std::forward(args)...); + } + + void wait_for_result() + { + f.get(); + } +}; + + +template +struct AsyncCompiled : crtp_cast{ + + template + auto async(Args&&... args) -> decltype(cv::gapi::wip::async(std::declval(), std::forward(args)...)){ + auto gcmpld = this->crtp_cast_(this)->compile(); + return cv::gapi::wip::async(gcmpld, std::forward(args)...); + } +}; + +template +struct AsyncApply : crtp_cast { + + template + auto async(Args&&... args) ->decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)...)) { + return cv::gapi::wip::async_apply(this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args()); + } +}; + + +template +struct normal: ::testing::Test, case_t{}; + +TYPED_TEST_CASE_P(normal); + +TYPED_TEST_P(normal, basic){ + this->start_async(this->in_args(), this->out_args()); + this->wait_for_result(); + + this->verify(); +} + +REGISTER_TYPED_TEST_CASE_P(normal, + basic +); + +template +struct exception: ::testing::Test, case_t{}; +TYPED_TEST_CASE_P(exception); + +TYPED_TEST_P(exception, basic){ + this->start_async(this->in_args(), this->out_args()); + EXPECT_THROW(this->wait_for_result(), gthrow_exception); +} + +REGISTER_TYPED_TEST_CASE_P(exception, + basic +); + +template +struct stress : ::testing::Test{}; +TYPED_TEST_CASE_P(stress); + +TYPED_TEST_P(stress, test){ + const std::size_t request_per_thread = 10; + const std::size_t number_of_threads = 4; + + auto thread_body = [&](){ + std::vector requests{request_per_thread}; + for (auto&& r : requests){ + r.start_async(r.in_args(), r.out_args()); + } + + for (auto&& r : requests){ + r.wait_for_result(); + r.verify(); + } + }; + + std::vector pool {number_of_threads}; + for (auto&& t : pool){ + t = std::thread{thread_body}; + } + + for (auto&& t : pool){ + t.join(); + } +} +REGISTER_TYPED_TEST_CASE_P(stress, test); + +template class callback_or_future_t, template class compiled_or_apply_t> +struct Case + : compute_fixture_t, + callback_or_future_t>, + compiled_or_apply_t > +{}; + +template +using cases = ::testing::Types< + Case, + Case, + Case, + Case + >; +INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPINormalFlow_, normal, cases); +INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases); + +INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress, stress, cases); + +TEST(AsyncAPI, Sample){ + cv::GComputation self_mul([]{ + cv::GMat in; + cv::GMat out = cv::gapi::mul(in, in); + return GComputation{in, out}; + }); + + const cv::Size sz{2, 2}; + cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; + cv::Mat out; + + auto f = cv::gapi::wip::async_apply(self_mul,cv::gin(in_mat), cv::gout(out)); + f.wait(); +} +} // namespace opencv_test