diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index 4a2bb54dfd..233877deb2 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -19,6 +19,7 @@ set(the_description "OpenCV G-API Core Module") ocv_add_module(gapi opencv_imgproc) + file(GLOB gapi_ext_hdrs "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/*.hpp" @@ -30,6 +31,7 @@ file(GLOB gapi_ext_hdrs "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/fluid/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/own/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/infer/*.hpp" + "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/*.hpp" ) set(gapi_srcs @@ -57,6 +59,7 @@ set(gapi_srcs src/compiler/gislandmodel.cpp src/compiler/gcompiler.cpp src/compiler/gcompiled.cpp + src/compiler/gstreaming.cpp src/compiler/passes/helpers.cpp src/compiler/passes/dump_dot.cpp src/compiler/passes/islands.cpp @@ -66,9 +69,11 @@ set(gapi_srcs src/compiler/passes/transformations.cpp src/compiler/passes/pattern_matching.cpp src/compiler/passes/perform_substitution.cpp + src/compiler/passes/streaming.cpp # Executor src/executor/gexecutor.cpp + src/executor/gstreamingexecutor.cpp src/executor/gasync.cpp # CPU Backend (currently built-in) @@ -127,5 +132,11 @@ if(TARGET opencv_test_gapi) target_link_libraries(opencv_test_gapi PRIVATE ade) endif() +if(MSVC) + # Disable obsollete warning C4503 popping up on MSVC <<2017 + # https://docs.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-level-1-c4503?view=vs-2019 + set_target_properties(${the_module} PROPERTIES COMPILE_FLAGS "/wd4503") +endif() + ocv_add_perf_tests() ocv_add_samples() diff --git a/modules/gapi/cmake/standalone.cmake b/modules/gapi/cmake/standalone.cmake index e8dbcaae45..f448d97daf 100644 --- a/modules/gapi/cmake/standalone.cmake +++ b/modules/gapi/cmake/standalone.cmake @@ -39,6 +39,9 @@ if(MSVC) target_compile_options(${FLUID_TARGET} PUBLIC "/wd4251") target_compile_options(${FLUID_TARGET} PUBLIC "/wd4275") target_compile_definitions(${FLUID_TARGET} PRIVATE _CRT_SECURE_NO_DEPRECATE) + # Disable obsollete warning C4503 popping up on MSVC <<2017 + # https://docs.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warning-level-1-c4503?view=vs-2019 + set_target_properties(${FLUID_TARGET} PROPERTIES COMPILE_FLAGS "/wd4503") endif() target_link_libraries(${FLUID_TARGET} PRIVATE ade) diff --git a/modules/gapi/include/opencv2/gapi/core.hpp b/modules/gapi/include/opencv2/gapi/core.hpp index de595e3189..a2c3f81ec6 100644 --- a/modules/gapi/include/opencv2/gapi/core.hpp +++ b/modules/gapi/include/opencv2/gapi/core.hpp @@ -436,6 +436,7 @@ namespace core { } }; + // TODO: eliminate the need in this kernel (streaming) G_TYPED_KERNEL(GCrop, , "org.opencv.core.transform.crop") { static GMatDesc outMeta(GMatDesc in, Rect rc) { return in.withSize(Size(rc.width, rc.height)); @@ -1505,11 +1506,11 @@ Output matrix must be of the same depth as input one, size is specified by given */ GAPI_EXPORTS GMat crop(const GMat& src, const Rect& rect); -/** @brief Copies a 2D matrix. +/** @brief Copies a matrix. -The function copies the matrix. - -Output matrix must be of the same size and depth as input one. +Copies an input array. Works as a regular Mat::clone but happens in-graph. +Mainly is used to workaround some existing limitations (e.g. to forward an input frame to outputs +in the streaming mode). Will be deprecated and removed in the future. @note Function textual ID is "org.opencv.core.transform.copy" diff --git a/modules/gapi/include/opencv2/gapi/garg.hpp b/modules/gapi/include/opencv2/gapi/garg.hpp index c8463ea305..9b835d9567 100644 --- a/modules/gapi/include/opencv2/gapi/garg.hpp +++ b/modules/gapi/include/opencv2/gapi/garg.hpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace cv { @@ -92,12 +93,33 @@ using GRunArg = util::variant< cv::Scalar, cv::UMat, #endif // !defined(GAPI_STANDALONE) + cv::gapi::wip::IStreamSource::Ptr, cv::gapi::own::Mat, cv::gapi::own::Scalar, cv::detail::VectorRef >; using GRunArgs = std::vector; +namespace gapi +{ +namespace wip +{ +/** + * @brief This aggregate type represents all types which G-API can handle (via variant). + * + * It only exists to overcome C++ language limitations (where a `using`-defined class can't be forward-declared). + */ +struct Data: public GRunArg +{ + using GRunArg::GRunArg; + template + Data& operator= (const T& t) { GRunArg::operator=(t); return *this; } + template + Data& operator= (T&& t) { GRunArg::operator=(std::move(t)); return *this; } +}; +} // namespace wip +} // namespace gapi + using GRunArgP = util::variant< #if !defined(GAPI_STANDALONE) cv::Mat*, @@ -110,7 +132,6 @@ using GRunArgP = util::variant< >; using GRunArgsP = std::vector; - template inline GRunArgs gin(const Ts&... args) { return GRunArgs{ GRunArg(detail::wrap_host_helper::wrap_in(args))... }; diff --git a/modules/gapi/include/opencv2/gapi/garray.hpp b/modules/gapi/include/opencv2/gapi/garray.hpp index a545535e55..b989071e0e 100644 --- a/modules/gapi/include/opencv2/gapi/garray.hpp +++ b/modules/gapi/include/opencv2/gapi/garray.hpp @@ -114,9 +114,11 @@ namespace detail std::size_t m_elemSize = 0ul; cv::GArrayDesc m_desc; virtual ~BasicVectorRef() {} + + virtual void mov(BasicVectorRef &ref) = 0; }; - template class VectorRefT: public BasicVectorRef + template class VectorRefT final: public BasicVectorRef { using empty_t = util::monostate; using ro_ext_t = const std::vector *; @@ -200,6 +202,12 @@ namespace detail if (isRWOwn()) return util::get(m_ref); util::throw_error(std::logic_error("Impossible happened")); } + + virtual void mov(BasicVectorRef &v) override { + VectorRefT *tv = dynamic_cast*>(&v); + GAPI_Assert(tv != nullptr); + wref() = std::move(tv->wref()); + } }; // This class strips type information from VectorRefT<> and makes it usable @@ -245,6 +253,11 @@ namespace detail return static_cast&>(*m_ref).rref(); } + void mov(VectorRef &v) + { + m_ref->mov(*v.m_ref); + } + cv::GArrayDesc descr_of() const { return m_ref->m_desc; diff --git a/modules/gapi/include/opencv2/gapi/gcompiled.hpp b/modules/gapi/include/opencv2/gapi/gcompiled.hpp index c825edf1a7..0a411126c9 100644 --- a/modules/gapi/include/opencv2/gapi/gcompiled.hpp +++ b/modules/gapi/include/opencv2/gapi/gcompiled.hpp @@ -60,6 +60,8 @@ namespace cv { * At the same time, two different GCompiled objects produced from the * single cv::GComputation are completely independent and can be used * concurrently. + * + * @sa GStreamingCompiled */ class GAPI_EXPORTS GCompiled { diff --git a/modules/gapi/include/opencv2/gapi/gcomputation.hpp b/modules/gapi/include/opencv2/gapi/gcomputation.hpp index 0be5f7edfe..0a4edc1c8a 100644 --- a/modules/gapi/include/opencv2/gapi/gcomputation.hpp +++ b/modules/gapi/include/opencv2/gapi/gcomputation.hpp @@ -15,6 +15,7 @@ #include #include #include +#include namespace cv { @@ -394,6 +395,77 @@ public: typename detail::MkSeq::type()); } + + // FIXME: Document properly in the Doxygen format + // Video-oriented pipeline compilation: + // 1. A generic version + /** + * @brief Compile the computation for streaming mode. + * + * This method triggers compilation process and produces a new + * GStreamingCompiled object which then can process video stream + * data of the given format. Passing a stream in a different + * format to the compiled computation will generate a run-time + * exception. + * + * @param in_metas vector of input metadata configuration. Grab + * metadata from real data objects (like cv::Mat or cv::Scalar) + * using cv::descr_of(), or create it on your own. @param args + * compilation arguments for this compilation process. Compilation + * arguments directly affect what kind of executable object would + * be produced, e.g. which kernels (and thus, devices) would be + * used to execute computation. + * + * @return GStreamingCompiled, a streaming-oriented executable + * computation compiled specifically for the given input + * parameters. + * + * @sa @ref gapi_compile_args + */ + GStreamingCompiled compileStreaming(GMetaArgs &&in_metas, GCompileArgs &&args = {}); + + // 2. Direct metadata version + /** + * @overload + * + * Takes a variadic parameter pack with metadata + * descriptors for which a compiled object needs to be produced. + * + * @return GStreamingCompiled, a streaming-oriented executable + * computation compiled specifically for the given input + * parameters. + */ + template + auto compileStreaming(const Ts&... metas) -> + typename std::enable_if::value, GStreamingCompiled>::type + { + return compileStreaming(GMetaArgs{GMetaArg(metas)...}, GCompileArgs()); + } + + // 2. Direct metadata + compile arguments version + /** + * @overload + * + * Takes a variadic parameter pack with metadata + * descriptors for which a compiled object needs to be produced, + * followed by GCompileArgs object representing compilation + * arguments for this process. + * + * @return GStreamingCompiled, a streaming-oriented executable + * computation compiled specifically for the given input + * parameters. + */ + template + auto compileStreaming(const Ts&... meta_and_compile_args) -> + typename std::enable_if::value + && std::is_same >::value, + GStreamingCompiled>::type + { + //FIXME: wrapping meta_and_compile_args into a tuple to unwrap them inside a helper function is the overkill + return compileStreaming(std::make_tuple(meta_and_compile_args...), + typename detail::MkSeq::type()); + } + // Internal use only /// @private Priv& priv(); @@ -402,7 +474,7 @@ public: protected: - // 4. Helper method for (3) + // 4. Helper methods for (3) /// @private template GCompiled compile(const std::tuple &meta_and_compile_args, detail::Seq) @@ -411,6 +483,13 @@ protected: GCompileArgs comp_args = std::get(meta_and_compile_args); return compile(std::move(meta_args), std::move(comp_args)); } + template + GStreamingCompiled compileStreaming(const std::tuple &meta_and_compile_args, detail::Seq) + { + GMetaArgs meta_args = {GMetaArg(std::get(meta_and_compile_args))...}; + GCompileArgs comp_args = std::get(meta_and_compile_args); + return compileStreaming(std::move(meta_args), std::move(comp_args)); + } /// @private std::shared_ptr m_priv; }; diff --git a/modules/gapi/include/opencv2/gapi/gmat.hpp b/modules/gapi/include/opencv2/gapi/gmat.hpp index 8416a3a6c5..440c433ff6 100644 --- a/modules/gapi/include/opencv2/gapi/gmat.hpp +++ b/modules/gapi/include/opencv2/gapi/gmat.hpp @@ -211,6 +211,7 @@ GAPI_EXPORTS GMatDesc descr_of(const cv::UMat &mat); /** @} */ +// FIXME: WHY??? WHY it is under different namespace? namespace gapi { namespace own { GAPI_EXPORTS GMatDesc descr_of(const Mat &mat); }}//gapi::own diff --git a/modules/gapi/include/opencv2/gapi/gstreaming.hpp b/modules/gapi/include/opencv2/gapi/gstreaming.hpp new file mode 100644 index 0000000000..7079042069 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/gstreaming.hpp @@ -0,0 +1,231 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2018 Intel Corporation + + +#ifndef OPENCV_GAPI_GSTREAMING_COMPILED_HPP +#define OPENCV_GAPI_GSTREAMING_COMPILED_HPP + +#include + +#include +#include +#include +#include + +namespace cv { + +/** + * \addtogroup gapi_main_classes + * @{ + */ +/** + * @brief Represents a computation (graph) compiled for streaming. + * + * This class represents a product of graph compilation (calling + * cv::GComputation::compileStreaming()). Objects of this class + * actually do stream processing, and the whole pipeline execution + * complexity is incapsulated into objects of this class. Execution + * model has two levels: at the very top, the execution of a + * heterogeneous graph is aggressively pipelined; at the very bottom + * the execution of every internal block is determined by its + * associated backend. Backends are selected based on kernel packages + * passed via compilation arguments ( see @ref gapi_compile_args, + * GNetworkPackage, GKernelPackage for details). + * + * GStreamingCompiled objects have a "player" semantics -- there are + * methods like start() and stop(). GStreamingCompiled has a full + * control over a videostream and so is stateful. You need to specify the + * input stream data using setSource() and then call start() to + * actually start processing. After that, use pull() or try_pull() to + * obtain next processed data frame from the graph in a blocking or + * non-blocking way, respectively. + * + * Currently a single GStreamingCompiled can process only one video + * streat at time. Produce multiple GStreamingCompiled objects to run the + * same graph on multiple video streams. + * + * @sa GCompiled + */ +class GAPI_EXPORTS GStreamingCompiled +{ +public: + class GAPI_EXPORTS Priv; + GStreamingCompiled(); + + // FIXME: More overloads? + /** + * @brief Specify the input data to GStreamingCompiled for + * processing, a generic version. + * + * Use gin() to create an input parameter vector. + * + * Input vectors must have the same number of elements as defined + * in the cv::GComputation protocol (at the moment of its + * construction). Shapes of elements also must conform to protocol + * (e.g. cv::Mat needs to be passed where cv::GMat has been + * declared as input, and so on). Run-time exception is generated + * on type mismatch. + * + * In contrast with regular GCompiled, user can also pass an + * object of type GVideoCapture for a GMat parameter of the parent + * GComputation. The compiled pipeline will start fetching data + * from that GVideoCapture and feeding it into the + * pipeline. Pipeline stops when a GVideoCapture marks end of the + * stream (or when stop() is called). + * + * Passing a regular Mat for a GMat parameter makes it "infinite" + * source -- pipeline may run forever feeding with this Mat until + * stopped explicitly. + * + * Currently only a single GVideoCapture is supported as input. If + * the parent GComputation is declared with multiple input GMat's, + * one of those can be specified as GVideoCapture but all others + * must be regular Mat objects. + * + * Throws if pipeline is already running. Use stop() and then + * setSource() to run the graph on a new video stream. + * + * @note This method is not thread-safe (with respect to the user + * side) at the moment. Protect the access if + * start()/stop()/setSource() may be called on the same object in + * multiple threads in your application. + * + * @param ins vector of inputs to process. + * @sa gin + */ + void setSource(GRunArgs &&ins); + + /** + * @brief Specify an input video stream for a single-input + * computation pipeline. + * + * Throws if pipeline is already running. Use stop() and then + * setSource() to run the graph on a new video stream. + * + * @overload + * @param s a shared pointer to IStreamSource representing the + * input video stream. + */ + void setSource(const gapi::wip::IStreamSource::Ptr& s); + + /** + * @brief Start the pipeline execution. + * + * Use pull()/try_pull() to obtain data. Throws an exception if + * a video source was not specified. + * + * setSource() must be called first, even if the pipeline has been + * working already and then stopped (explicitly via stop() or due + * stream completion) + * + * @note This method is not thread-safe (with respect to the user + * side) at the moment. Protect the access if + * start()/stop()/setSource() may be called on the same object in + * multiple threads in your application. + */ + void start(); + + /** + * @brief Get the next processed frame from the pipeline. + * + * Use gout() to create an output parameter vector. + * + * Output vectors must have the same number of elements as defined + * in the cv::GComputation protocol (at the moment of its + * construction). Shapes of elements also must conform to protocol + * (e.g. cv::Mat needs to be passed where cv::GMat has been + * declared as output, and so on). Run-time exception is generated + * on type mismatch. + * + * This method writes new data into objects passed via output + * vector. If there is no data ready yet, this method blocks. Use + * try_pull() if you need a non-blocking version. + * + * @param outs vector of output parameters to obtain. + * @return true if next result has been obtained, + * false marks end of the stream. + */ + bool pull(cv::GRunArgsP &&outs); + + /** + * @brief Try to get the next processed frame from the pipeline. + * + * Use gout() to create an output parameter vector. + * + * This method writes new data into objects passed via output + * vector. If there is no data ready yet, the output vector + * remains unchanged and false is returned. + * + * @return true if data has been obtained, and false if it was + * not. Note: false here doesn't mark the end of the stream. + */ + bool try_pull(cv::GRunArgsP &&outs); + + /** + * @brief Stop (abort) processing the pipeline. + * + * Note - it is not pause but a complete stop. Calling start() + * will cause G-API to start processing the stream from the early beginning. + * + * Throws if the pipeline is not running. + */ + void stop(); + + /** + * @brief Test if the pipeline is running. + * + * @note This method is not thread-safe (with respect to the user + * side) at the moment. Protect the access if + * start()/stop()/setSource() may be called on the same object in + * multiple threads in your application. + * + * @return true if the current stream is not over yet. + */ + bool running() const; + + /// @private + Priv& priv(); + + /** + * @brief Check if compiled object is valid (non-empty) + * + * @return true if the object is runnable (valid), false otherwise + */ + explicit operator bool () const; + + /** + * @brief Vector of metadata this graph was compiled for. + * + * @return Unless _reshape_ is not supported, return value is the + * same vector which was passed to cv::GComputation::compile() to + * produce this compiled object. Otherwise, it is the latest + * metadata vector passed to reshape() (if that call was + * successful). + */ + const GMetaArgs& metas() const; // Meta passed to compile() + + /** + * @brief Vector of metadata descriptions of graph outputs + * + * @return vector with formats/resolutions of graph's output + * objects, auto-inferred from input metadata vector by + * operations which form this computation. + * + * @note GCompiled objects produced from the same + * cv::GComputiation graph with different input metas may return + * different values in this vector. + */ + const GMetaArgs& outMetas() const; + +protected: + /// @private + std::shared_ptr m_priv; +}; +/** @} */ + +} + +#endif // OPENCV_GAPI_GSTREAMING_COMPILED_HPP diff --git a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp index 694c032d4e..8cea478813 100644 --- a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp +++ b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -99,6 +100,9 @@ namespace detail template<> struct GTypeOf { using type = cv::GMat; }; template<> struct GTypeOf { using type = cv::GScalar; }; template struct GTypeOf > { using type = cv::GArray; }; + // FIXME: This is not quite correct since IStreamSource may produce not only Mat but also Scalar + // and vector data. TODO: Extend the type dispatchig on these types too. + template<> struct GTypeOf { using type = cv::GMat;}; template using g_type_of_t = typename GTypeOf::type; // Marshalling helper for G-types and its Host types. Helps G-API diff --git a/modules/gapi/include/opencv2/gapi/ocl/goclkernel.hpp b/modules/gapi/include/opencv2/gapi/ocl/goclkernel.hpp index 1cf4a61eb9..e409652ed2 100644 --- a/modules/gapi/include/opencv2/gapi/ocl/goclkernel.hpp +++ b/modules/gapi/include/opencv2/gapi/ocl/goclkernel.hpp @@ -120,7 +120,8 @@ struct tracked_cv_umat{ //TODO Think if T - API could reallocate UMat to a proper size - how do we handle this ? //tracked_cv_umat(cv::UMat& m) : r{(m)}, original_data{m.getMat(ACCESS_RW).data} {} tracked_cv_umat(cv::UMat& m) : r{ (m) }, original_data{ nullptr } {} - cv::UMat r; + cv::UMat &r; // FIXME: It was a value (not a reference) before. + // Actually OCL backend should allocate its internal data! uchar* original_data; operator cv::UMat& (){ return r;} diff --git a/modules/gapi/include/opencv2/gapi/own/convert.hpp b/modules/gapi/include/opencv2/gapi/own/convert.hpp index 576b5ee791..a315444456 100644 --- a/modules/gapi/include/opencv2/gapi/own/convert.hpp +++ b/modules/gapi/include/opencv2/gapi/own/convert.hpp @@ -28,6 +28,7 @@ namespace cv } cv::gapi::own::Mat to_own(Mat&&) = delete; + inline cv::gapi::own::Mat to_own(Mat const& m) { return (m.dims == 2) ? cv::gapi::own::Mat{m.rows, m.cols, m.type(), m.data, m.step} @@ -42,7 +43,6 @@ namespace cv inline cv::gapi::own::Rect to_own (const Rect& r) { return {r.x, r.y, r.width, r.height}; }; - namespace gapi { namespace own diff --git a/modules/gapi/include/opencv2/gapi/streaming/cap.hpp b/modules/gapi/include/opencv2/gapi/streaming/cap.hpp new file mode 100644 index 0000000000..1879929c06 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/cap.hpp @@ -0,0 +1,79 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_CAP_HPP +#define OPENCV_GAPI_STREAMING_CAP_HPP + +/** + * YOUR ATTENTION PLEASE! + * + * This is a header-only implementation of cv::VideoCapture-based + * Stream source. It is not built by default with G-API as G-API + * doesn't depend on videoio module. + * + * If you want to use it in your application, please make sure + * videioio is available in your OpenCV package and is linked to your + * application. + * + * Note for developers: please don't put videoio dependency in G-API + * because of this file. + */ + +#include +#include + +namespace cv { +namespace gapi { +namespace wip { + +/** + * @brief OpenCV's VideoCapture-based streaming source. + * + * This class implements IStreamSource interface. + * Its constructor takes the same parameters as cv::VideoCapture does. + * + * Please make sure that videoio OpenCV module is avaiable before using + * this in your application (G-API doesn't depend on it directly). + * + * @note stream sources are passed to G-API via shared pointers, so + * please gapi::make_src<> to create objects and ptr() to pass a + * GCaptureSource to cv::gin(). + */ +class GCaptureSource: public IStreamSource +{ +public: + explicit GCaptureSource(int id) : cap(id) {} + explicit GCaptureSource(const std::string &path) : cap(path) {} + + // TODO: Add more constructor overloads to make it + // fully compatible with VideoCapture's interface. + +protected: + cv::VideoCapture cap; + virtual bool pull(cv::gapi::wip::Data &data) override + { + if (!cap.isOpened()) return false; + cv::Mat frame; + if (!cap.read(frame)) + { + // end-of-stream happened + return false; + } + + // NOTE: Some decode/media VideoCapture backends continue + // owning the video buffer under cv::Mat so in order to + // process it safely in a highly concurrent pipeline, clone() + // is the only right way. + data = frame.clone(); + return true; + } +}; + +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_CAP_HPP diff --git a/modules/gapi/include/opencv2/gapi/streaming/source.hpp b/modules/gapi/include/opencv2/gapi/streaming/source.hpp new file mode 100644 index 0000000000..04650ffa63 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/source.hpp @@ -0,0 +1,57 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_SOURCE_HPP +#define OPENCV_GAPI_STREAMING_SOURCE_HPP + +#include // shared_ptr +#include // is_base_of + +namespace cv { +namespace gapi { +namespace wip { + struct Data; // "forward-declaration" of GRunArg + +/** + * @brief Abstract streaming pipeline source. + * + * Implement this interface if you want customize the way how data is + * streaming into GStreamingCompiled. + * + * Objects implementing this interface can be passes to + * GStreamingCompiled via setSource()/cv::gin(). Regular compiled + * graphs (GCompiled) don't support input objects of this type. + * + * Default cv::VideoCapture-based implementation is available, see + * cv::gapi::GCaptureSource. + * + * @note stream sources are passed to G-API via shared pointers, so + * please use ptr() when passing a IStreamSource implementation to + * cv::gin(). + */ +class IStreamSource: public std::enable_shared_from_this +{ +public: + using Ptr = std::shared_ptr; + Ptr ptr() { return shared_from_this(); } + virtual bool pull(Data &data) = 0; + virtual ~IStreamSource() = default; +}; + +template +IStreamSource::Ptr inline make_src(Args&&... args) +{ + static_assert(std::is_base_of::value, + "T must implement the cv::gapi::IStreamSource interface!"); + auto src_ptr = std::make_shared(std::forward(args)...); + return src_ptr->ptr(); +} + +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_SOURCE_HPP diff --git a/modules/gapi/src/api/gbackend.cpp b/modules/gapi/src/api/gbackend.cpp index a1015d3880..c88d58116e 100644 --- a/modules/gapi/src/api/gbackend.cpp +++ b/modules/gapi/src/api/gbackend.cpp @@ -118,7 +118,7 @@ void bindInArg(Mag& mag, const RcDesc &rc, const GRunArg &arg, bool is_umat) if (is_umat) { auto& mag_umat = mag.template slot()[rc.id]; - mag_umat = (util::get(arg)); + mag_umat = util::get(arg).getUMat(ACCESS_READ); } else { @@ -185,7 +185,7 @@ void bindOutArg(Mag& mag, const RcDesc &rc, const GRunArgP &arg, bool is_umat) if (is_umat) { auto& mag_umat = mag.template slot()[rc.id]; - mag_umat = (*util::get(arg)); + mag_umat = util::get(arg)->getUMat(ACCESS_RW); } else { diff --git a/modules/gapi/src/api/gcomputation.cpp b/modules/gapi/src/api/gcomputation.cpp index 64a449530b..2f7b8dbda9 100644 --- a/modules/gapi/src/api/gcomputation.cpp +++ b/modules/gapi/src/api/gcomputation.cpp @@ -76,6 +76,12 @@ cv::GCompiled cv::GComputation::compile(GMetaArgs &&metas, GCompileArgs &&args) return comp.compile(); } +cv::GStreamingCompiled cv::GComputation::compileStreaming(GMetaArgs &&metas, GCompileArgs &&args) +{ + cv::gimpl::GCompiler comp(*this, std::move(metas), std::move(args)); + return comp.compileStreaming(); +} + // FIXME: Introduce similar query/test method for GMetaArgs as a building block // for functions like this? static bool formats_are_same(const cv::GMetaArgs& metas1, const cv::GMetaArgs& metas2) diff --git a/modules/gapi/src/api/gproto.cpp b/modules/gapi/src/api/gproto.cpp index 1106af96da..90e2ce255f 100644 --- a/modules/gapi/src/api/gproto.cpp +++ b/modules/gapi/src/api/gproto.cpp @@ -169,6 +169,7 @@ bool cv::can_describe(const GMetaArg& meta, const GRunArg& arg) util::get(meta).canDescribe(util::get(arg)); case GRunArg::index_of(): return meta == cv::GMetaArg(descr_of(util::get(arg))); case GRunArg::index_of(): return meta == cv::GMetaArg(util::get(arg).descr_of()); + case GRunArg::index_of(): return util::holds_alternative(meta); // FIXME(?) may be not the best option default: util::throw_error(std::logic_error("Unsupported GRunArg type")); } } diff --git a/modules/gapi/src/backends/cpu/gcpucore.cpp b/modules/gapi/src/backends/cpu/gcpucore.cpp index 27c17097a1..24e612aa78 100644 --- a/modules/gapi/src/backends/cpu/gcpucore.cpp +++ b/modules/gapi/src/backends/cpu/gcpucore.cpp @@ -505,7 +505,7 @@ GAPI_OCV_KERNEL(GCPUCopy, cv::gapi::core::GCopy) { static void run(const cv::Mat& in, cv::Mat& out) { - cv::Mat(in).copyTo(out); + in.copyTo(out); } }; diff --git a/modules/gapi/src/backends/fluid/gfluidbackend.cpp b/modules/gapi/src/backends/fluid/gfluidbackend.cpp index 78bc20229b..1a8a3c8178 100644 --- a/modules/gapi/src/backends/fluid/gfluidbackend.cpp +++ b/modules/gapi/src/backends/fluid/gfluidbackend.cpp @@ -1243,22 +1243,38 @@ void cv::gimpl::GFluidExecutable::bindInArg(const cv::gimpl::RcDesc &rc, const G case GShape::GMAT: m_buffers[m_id_map.at(rc.id)].priv().bindTo(util::get(arg), true); break; case GShape::GSCALAR: m_res.slot()[rc.id] = util::get(arg); break; case GShape::GARRAY: m_res.slot()[rc.id] = util::get(arg); break; - default: util::throw_error(std::logic_error("Unsupported GShape type")); } } void cv::gimpl::GFluidExecutable::bindOutArg(const cv::gimpl::RcDesc &rc, const GRunArgP &arg) { // Only GMat is supported as return type + using T = GRunArgP; switch (rc.shape) { case GShape::GMAT: { cv::GMatDesc desc = m_buffers[m_id_map.at(rc.id)].meta(); - auto &outMat = *util::get(arg); - GAPI_Assert(outMat.data != nullptr); - GAPI_Assert(descr_of(outMat) == desc && "Output argument was not preallocated as it should be ?"); - m_buffers[m_id_map.at(rc.id)].priv().bindTo(outMat, false); + auto &bref = m_buffers[m_id_map.at(rc.id)].priv(); + + switch (arg.index()) { + // FIXME: See the bindInArg comment on Streaming-related changes + case T::index_of(): { + auto &outMat = *util::get(arg); + GAPI_Assert(outMat.data != nullptr); + GAPI_Assert(descr_of(outMat) == desc && "Output argument was not preallocated as it should be ?"); + bref.bindTo(outMat, false); + } break; +#if !defined(GAPI_STANDALONE) + case T::index_of(): { + auto &outMat = *util::get(arg); + GAPI_Assert(outMat.data != nullptr); + GAPI_Assert(descr_of(outMat) == desc && "Output argument was not preallocated as it should be ?"); + bref.bindTo(cv::to_own(outMat), false); + } break; +#endif // GAPI_STANDALONE + default: GAPI_Assert(false); + } // switch(arg.index()) break; } default: util::throw_error(std::logic_error("Unsupported return GShape type")); @@ -1446,6 +1462,13 @@ void GFluidBackendImpl::addBackendPasses(ade::ExecutionEngineSetupContext &ectx) // regardless if it is one fluid island (both writing to and reading from this object) // or two distinct islands (both fluid) auto isFluidIsland = [&](const ade::NodeHandle& node) { + // With Streaming, Emitter islands may have no FusedIsland thing in meta. + // FIXME: Probably this is a concept misalignment + if (!gim.metadata(node).contains()) { + const auto kind = gim.metadata(node).get().k; + GAPI_Assert(kind == NodeKind::EMIT || kind == NodeKind::SINK); + return false; + } const auto isl = gim.metadata(node).get().object; return isl->backend() == cv::gapi::fluid::backend(); }; @@ -1456,6 +1479,9 @@ void GFluidBackendImpl::addBackendPasses(ade::ExecutionEngineSetupContext &ectx) setFluidData(data_node, false); } } break; // case::SLOT + case NodeKind::EMIT: + case NodeKind::SINK: + break; // do nothing for Streaming nodes default: GAPI_Assert(false); } // switch } // for (gim.nodes()) diff --git a/modules/gapi/src/backends/ocl/goclcore.cpp b/modules/gapi/src/backends/ocl/goclcore.cpp index af0e311d75..ec6ab18f84 100644 --- a/modules/gapi/src/backends/ocl/goclcore.cpp +++ b/modules/gapi/src/backends/ocl/goclcore.cpp @@ -486,7 +486,7 @@ GAPI_OCL_KERNEL(GOCLCopy, cv::gapi::core::GCopy) { static void run(const cv::UMat& in, cv::UMat& out) { - cv::UMat(in).copyTo(out); + in.copyTo(out); } }; diff --git a/modules/gapi/src/compiler/gcompiler.cpp b/modules/gapi/src/compiler/gcompiler.cpp index 82aeb21209..7fc8394a8f 100644 --- a/modules/gapi/src/compiler/gcompiler.cpp +++ b/modules/gapi/src/compiler/gcompiler.cpp @@ -28,21 +28,23 @@ #include "compiler/gmodelbuilder.hpp" #include "compiler/gcompiler.hpp" #include "compiler/gcompiled_priv.hpp" +#include "compiler/gstreaming_priv.hpp" #include "compiler/passes/passes.hpp" #include "compiler/passes/pattern_matching.hpp" #include "executor/gexecutor.hpp" +#include "executor/gstreamingexecutor.hpp" #include "backends/common/gbackend.hpp" // #if !defined(GAPI_STANDALONE) #include // Also directly refer to Core #include // ...and Imgproc kernel implementations +#include // render::ocv::backend() #endif // !defined(GAPI_STANDALONE) // #include // compound::backend() -#include // render::ocv::backend() #include "logger.hpp" @@ -272,6 +274,12 @@ cv::gimpl::GCompiler::GCompiler(const cv::GComputation &c, m_e.addPass("exec", "fuse_islands", passes::fuseIslands); m_e.addPass("exec", "sync_islands", passes::syncIslandTags); + // FIXME: Since a set of passes is shared between + // GCompiled/GStreamingCompiled, this pass is added here unconditionally + // (even if it is not actually required to produce a GCompiled). + // FIXME: add a better way to do that! + m_e.addPass("exec", "add_streaming", passes::addStreaming); + if (dump_path.has_value()) { m_e.addPass("exec", "dump_dot", std::bind(passes::dumpGraph, _1, @@ -407,6 +415,17 @@ cv::GCompiled cv::gimpl::GCompiler::produceCompiled(GPtr &&pg) return compiled; } +cv::GStreamingCompiled cv::gimpl::GCompiler::produceStreamingCompiled(GPtr &&pg) +{ + const auto &outMetas = GModel::ConstGraph(*pg).metadata() + .get().outMeta; + std::unique_ptr pE(new GStreamingExecutor(std::move(pg))); + + GStreamingCompiled compiled; + compiled.priv().setup(m_metas, outMetas, std::move(pE)); + return compiled; +} + cv::GCompiled cv::gimpl::GCompiler::compile() { std::unique_ptr pG = generateGraph(); @@ -414,3 +433,13 @@ cv::GCompiled cv::gimpl::GCompiler::compile() compileIslands(*pG); return produceCompiled(std::move(pG)); } + +cv::GStreamingCompiled cv::gimpl::GCompiler::compileStreaming() +{ + // FIXME: self-note to DM: now keep these compile()/compileStreaming() in sync! + std::unique_ptr pG = generateGraph(); + GModel::Graph(*pG).metadata().set(Streaming{}); + runPasses(*pG); + compileIslands(*pG); + return produceStreamingCompiled(std::move(pG)); +} diff --git a/modules/gapi/src/compiler/gcompiler.hpp b/modules/gapi/src/compiler/gcompiler.hpp index d55f84d56e..92a5d3a407 100644 --- a/modules/gapi/src/compiler/gcompiler.hpp +++ b/modules/gapi/src/compiler/gcompiler.hpp @@ -42,12 +42,16 @@ public: // The method which does everything... GCompiled compile(); - // But is actually composed of this: + // This too. + GStreamingCompiled compileStreaming(); + + // But those are actually composed of this: using GPtr = std::unique_ptr; GPtr generateGraph(); // Unroll GComputation into a GModel void runPasses(ade::Graph &g); // Apply all G-API passes on a GModel void compileIslands(ade::Graph &g); // Instantiate GIslandExecutables in GIslandModel GCompiled produceCompiled(GPtr &&pg); // Produce GCompiled from processed GModel + GStreamingCompiled produceStreamingCompiled(GPtr &&pg); // Produce GStreamingCompiled from processed GMbodel }; }} diff --git a/modules/gapi/src/compiler/gislandmodel.cpp b/modules/gapi/src/compiler/gislandmodel.cpp index 8e20302a35..7f03ee9a37 100644 --- a/modules/gapi/src/compiler/gislandmodel.cpp +++ b/modules/gapi/src/compiler/gislandmodel.cpp @@ -215,6 +215,22 @@ ade::NodeHandle GIslandModel::mkIslandNode(Graph &g, std::shared_ptr&& return nh; } +ade::NodeHandle GIslandModel::mkEmitNode(Graph &g, std::size_t in_idx) +{ + ade::NodeHandle nh = g.createNode(); + g.metadata(nh).set(cv::gimpl::NodeKind{cv::gimpl::NodeKind::EMIT}); + g.metadata(nh).set(cv::gimpl::Emitter{in_idx, {}}); + return nh; +} + +ade::NodeHandle GIslandModel::mkSinkNode(Graph &g, std::size_t out_idx) +{ + ade::NodeHandle nh = g.createNode(); + g.metadata(nh).set(cv::gimpl::NodeKind{cv::gimpl::NodeKind::SINK}); + g.metadata(nh).set(cv::gimpl::Sink{out_idx}); + return nh; +} + void GIslandModel::syncIslandTags(Graph &g, ade::Graph &orig_g) { GModel::Graph gm(orig_g); diff --git a/modules/gapi/src/compiler/gislandmodel.hpp b/modules/gapi/src/compiler/gislandmodel.hpp index d25db58300..a3dcdbbd43 100644 --- a/modules/gapi/src/compiler/gislandmodel.hpp +++ b/modules/gapi/src/compiler/gislandmodel.hpp @@ -2,7 +2,7 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2018 Intel Corporation +// Copyright (C) 2018-2019 Intel Corporation #ifndef OPENCV_GAPI_GISLANDMODEL_HPP @@ -115,13 +115,21 @@ public: virtual ~GIslandExecutable() = default; }; - +// GIslandEmitter - a backend-specific thing which feeds data into +// the pipeline. This one is just an interface, implementations are executor-defined. +class GIslandEmitter +{ +public: + // Obtain next value from the emitter + virtual bool pull(GRunArg &) = 0; + virtual ~GIslandEmitter() = default; +}; // Couldn't reuse NodeType here - FIXME unify (move meta to a shared place) struct NodeKind { static const char *name() { return "NodeKind"; } - enum { ISLAND, SLOT} k; + enum { ISLAND, SLOT, EMIT, SINK} k; }; // FIXME: Rename to Island (as soon as current GModel::Island is renamed @@ -144,6 +152,19 @@ struct IslandExec std::shared_ptr object; }; +struct Emitter +{ + static const char *name() { return "Emitter"; } + std::size_t proto_index; + std::shared_ptr object; +}; + +struct Sink +{ + static const char *name() { return "Sink"; } + std::size_t proto_index; +}; + namespace GIslandModel { using Graph = ade::TypedGraph @@ -151,6 +172,8 @@ namespace GIslandModel , FusedIsland , DataSlot , IslandExec + , Emitter + , Sink , ade::passes::TopologicalSortData >; @@ -160,6 +183,8 @@ namespace GIslandModel , FusedIsland , DataSlot , IslandExec + , Emitter + , Sink , ade::passes::TopologicalSortData >; @@ -169,6 +194,8 @@ namespace GIslandModel ade::NodeHandle mkSlotNode(Graph &g, const ade::NodeHandle &data_nh); ade::NodeHandle mkIslandNode(Graph &g, const gapi::GBackend &bknd, const ade::NodeHandle &op_nh, const ade::Graph &orig_g); ade::NodeHandle mkIslandNode(Graph &g, std::shared_ptr&& isl); + ade::NodeHandle mkEmitNode(Graph &g, std::size_t in_idx); // streaming-related + ade::NodeHandle mkSinkNode(Graph &g, std::size_t out_idx); // streaming-related // GIslandModel API void syncIslandTags(Graph &g, ade::Graph &orig_g); diff --git a/modules/gapi/src/compiler/gmodel.cpp b/modules/gapi/src/compiler/gmodel.cpp index a5e4f2c760..74345f16b1 100644 --- a/modules/gapi/src/compiler/gmodel.cpp +++ b/modules/gapi/src/compiler/gmodel.cpp @@ -50,21 +50,11 @@ ade::NodeHandle GModel::mkDataNode(GModel::Graph &g, const GOrigin& origin) storage = Data::Storage::CONST_VAL; g.metadata(data_h).set(ConstValue{value}); } - g.metadata(data_h).set(Data{origin.shape, id, meta, origin.ctor, storage}); - return data_h; -} - -ade::NodeHandle GModel::mkDataNode(GModel::Graph &g, const GShape shape) -{ - ade::NodeHandle data_h = g.createNode(); - g.metadata(data_h).set(NodeType{NodeType::DATA}); - - const auto id = g.metadata().get().GetNewId(shape); - GMetaArg meta; - HostCtor ctor; - Data::Storage storage = Data::Storage::INTERNAL; // By default, all objects are marked INTERNAL - - g.metadata(data_h).set(Data{shape, id, meta, ctor, storage}); + // FIXME: Sometimes a GArray-related node may be created w/o the + // associated host-type constructor (e.g. when the array is + // somewhere in the middle of the graph). + auto ctor_copy = origin.ctor; + g.metadata(data_h).set(Data{origin.shape, id, meta, ctor_copy, storage}); return data_h; } diff --git a/modules/gapi/src/compiler/gmodel.hpp b/modules/gapi/src/compiler/gmodel.hpp index d5e03d5497..626770d881 100644 --- a/modules/gapi/src/compiler/gmodel.hpp +++ b/modules/gapi/src/compiler/gmodel.hpp @@ -145,6 +145,16 @@ struct ActiveBackends std::unordered_set backends; }; +// This is a graph-global flag indicating this graph is compiled for +// the streaming case. Streaming-neutral passes (i.e. nearly all of +// them) can ignore this flag safely. +// +// FIXME: Probably a better design can be suggested. +struct Streaming +{ + static const char *name() { return "StreamingFlag"; } +}; + // Backend-specific inference parameters for a neural network. // Since these parameters are set on compilation stage (not // on a construction stage), these parameters are bound lately @@ -190,6 +200,7 @@ namespace GModel , IslandModel , ActiveBackends , CustomMetaFunction + , Streaming >; // FIXME: How to define it based on GModel??? @@ -209,6 +220,7 @@ namespace GModel , IslandModel , ActiveBackends , CustomMetaFunction + , Streaming >; // FIXME: @@ -220,8 +232,6 @@ namespace GModel GAPI_EXPORTS ade::NodeHandle mkOpNode(Graph &g, const GKernel &k, const std::vector& args, const std::string &island); - GAPI_EXPORTS ade::NodeHandle mkDataNode(Graph &g, const GShape shape); - // Adds a string message to a node. Any node can be subject of log, messages then // appear in the dumped .dot file.x GAPI_EXPORTS void log(Graph &g, ade::NodeHandle op, std::string &&message, ade::NodeHandle updater = ade::NodeHandle()); diff --git a/modules/gapi/src/compiler/gmodelbuilder.cpp b/modules/gapi/src/compiler/gmodelbuilder.cpp index bc97045931..c53c7b23fd 100644 --- a/modules/gapi/src/compiler/gmodelbuilder.cpp +++ b/modules/gapi/src/compiler/gmodelbuilder.cpp @@ -303,5 +303,14 @@ ade::NodeHandle cv::gimpl::GModelBuilder::put_DataNode(const GOrigin &origin) m_graph_data[origin] = nh; return nh; } - else return it->second; + else + { + // FIXME: One of the ugliest workarounds ever + if (it->first.ctor.index() == it->first.ctor.index_of() + && origin.ctor.index() != origin.ctor.index_of()) { + // meanwhile update existing object + m_gm.metadata(it->second).get().ctor = origin.ctor; + } + return it->second; + } } diff --git a/modules/gapi/src/compiler/gstreaming.cpp b/modules/gapi/src/compiler/gstreaming.cpp new file mode 100644 index 0000000000..0288eb6b20 --- /dev/null +++ b/modules/gapi/src/compiler/gstreaming.cpp @@ -0,0 +1,148 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + + +#include "precomp.hpp" + +#if !defined(GAPI_STANDALONE) + +#include + +#include // can_describe +#include + +#include "compiler/gstreaming_priv.hpp" +#include "backends/common/gbackend.hpp" + +// GStreamingCompiled private implementation /////////////////////////////////// +void cv::GStreamingCompiled::Priv::setup(const GMetaArgs &_metaArgs, + const GMetaArgs &_outMetas, + std::unique_ptr &&_pE) +{ + m_metas = _metaArgs; + m_outMetas = _outMetas; + m_exec = std::move(_pE); +} + +bool cv::GStreamingCompiled::Priv::isEmpty() const +{ + return !m_exec; +} + +const cv::GMetaArgs& cv::GStreamingCompiled::Priv::metas() const +{ + return m_metas; +} + +const cv::GMetaArgs& cv::GStreamingCompiled::Priv::outMetas() const +{ + return m_outMetas; +} + +// FIXME: What is the reason in having Priv here if Priv actually dispatches +// everything to the underlying executable?? May be this executable may become +// the G*Compiled's priv? +void cv::GStreamingCompiled::Priv::setSource(cv::GRunArgs &&args) +{ + // FIXME: This metadata checking should be removed at all + // for the streaming case. + if (!can_describe(m_metas, args)) + { + util::throw_error(std::logic_error("This object was compiled " + "for different metadata!")); + } + GAPI_Assert(m_exec != nullptr); + m_exec->setSource(std::move(args)); +} + +void cv::GStreamingCompiled::Priv::start() +{ + m_exec->start(); +} + +bool cv::GStreamingCompiled::Priv::pull(cv::GRunArgsP &&outs) +{ + return m_exec->pull(std::move(outs)); +} + +bool cv::GStreamingCompiled::Priv::try_pull(cv::GRunArgsP &&outs) +{ + return m_exec->try_pull(std::move(outs)); +} + +void cv::GStreamingCompiled::Priv::stop() +{ + m_exec->stop(); +} + +bool cv::GStreamingCompiled::Priv::running() const +{ + return m_exec->running(); +} + +// GStreamingCompiled public implementation //////////////////////////////////// +cv::GStreamingCompiled::GStreamingCompiled() + : m_priv(new Priv()) +{ +} + +void cv::GStreamingCompiled::setSource(GRunArgs &&ins) +{ + // FIXME: verify these input parameters according to the graph input meta + m_priv->setSource(std::move(ins)); +} + +void cv::GStreamingCompiled::setSource(const cv::gapi::wip::IStreamSource::Ptr &s) +{ + setSource(cv::gin(s)); +} + +void cv::GStreamingCompiled::start() +{ + m_priv->start(); +} + +bool cv::GStreamingCompiled::pull(cv::GRunArgsP &&outs) +{ + return m_priv->pull(std::move(outs)); +} + +bool cv::GStreamingCompiled::try_pull(cv::GRunArgsP &&outs) +{ + return m_priv->try_pull(std::move(outs)); +} + +void cv::GStreamingCompiled::stop() +{ + m_priv->stop(); +} + +bool cv::GStreamingCompiled::running() const +{ + return m_priv->running(); +} + +cv::GStreamingCompiled::operator bool() const +{ + return !m_priv->isEmpty(); +} + +const cv::GMetaArgs& cv::GStreamingCompiled::metas() const +{ + return m_priv->metas(); +} + +const cv::GMetaArgs& cv::GStreamingCompiled::outMetas() const +{ + return m_priv->outMetas(); +} + +cv::GStreamingCompiled::Priv& cv::GStreamingCompiled::priv() +{ + return *m_priv; +} + +#endif // GAPI_STANDALONE diff --git a/modules/gapi/src/compiler/gstreaming_priv.hpp b/modules/gapi/src/compiler/gstreaming_priv.hpp new file mode 100644 index 0000000000..10e836e667 --- /dev/null +++ b/modules/gapi/src/compiler/gstreaming_priv.hpp @@ -0,0 +1,51 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + + +#ifndef OPENCV_GAPI_GSTREAMING_COMPILED_PRIV_HPP +#define OPENCV_GAPI_GSTREAMING_COMPILED_PRIV_HPP + +#include // unique_ptr +#include "executor/gstreamingexecutor.hpp" + +namespace cv { + +namespace gimpl +{ + struct GRuntimeArgs; +}; + +// FIXME: GAPI_EXPORTS is here only due to tests and Windows linker issues +// FIXME: It seems it clearly duplicates the GStreamingCompiled and +// GStreamingExecutable APIs so is highly redundant now. +// Same applies to GCompiled/GCompiled::Priv/GExecutor. +class GAPI_EXPORTS GStreamingCompiled::Priv +{ + GMetaArgs m_metas; // passed by user + GMetaArgs m_outMetas; // inferred by compiler + std::unique_ptr m_exec; + +public: + void setup(const GMetaArgs &metaArgs, + const GMetaArgs &outMetas, + std::unique_ptr &&pE); + bool isEmpty() const; + + const GMetaArgs& metas() const; + const GMetaArgs& outMetas() const; + + void setSource(GRunArgs &&args); + void start(); + bool pull(cv::GRunArgsP &&outs); + bool try_pull(cv::GRunArgsP &&outs); + void stop(); + + bool running() const; +}; + +} // namespace cv + +#endif // OPENCV_GAPI_GSTREAMING_COMPILED_PRIV_HPP diff --git a/modules/gapi/src/compiler/passes/dump_dot.cpp b/modules/gapi/src/compiler/passes/dump_dot.cpp index 1eefe723b8..86e008b962 100644 --- a/modules/gapi/src/compiler/passes/dump_dot.cpp +++ b/modules/gapi/src/compiler/passes/dump_dot.cpp @@ -172,20 +172,40 @@ void dumpDot(const ade::Graph &g, std::ostream& os) } } break; - case NodeKind::SLOT: { const auto obj_name = format_obj(gim.metadata(nh).get() .original_data_node); for (auto cons_nh : nh->outNodes()) { - os << "\"slot:" << obj_name << "\" -> \"" - << gim.metadata(cons_nh).get().object->name() - << "\"\n"; + if (gim.metadata(cons_nh).get().k == NodeKind::ISLAND) { + os << "\"slot:" << obj_name << "\" -> \"" + << gim.metadata(cons_nh).get().object->name() + << "\"\n"; + } // other data consumers -- sinks -- are processed separately + } + } + break; + case NodeKind::EMIT: + { + for (auto out_nh : nh->outNodes()) + { + const auto obj_name = format_obj(gim.metadata(out_nh).get() + .original_data_node); + os << "\"emit:" << nh << "\" -> \"slot:" << obj_name << "\"\n"; + } + } + break; + case NodeKind::SINK: + { + for (auto in_nh : nh->inNodes()) + { + const auto obj_name = format_obj(gim.metadata(in_nh).get() + .original_data_node); + os << "\"slot:" << obj_name << "\" -> \"sink:" << nh << "\"\n"; } } break; - default: GAPI_Assert(false); break; diff --git a/modules/gapi/src/compiler/passes/passes.hpp b/modules/gapi/src/compiler/passes/passes.hpp index e102a2f0e9..bed77227ba 100644 --- a/modules/gapi/src/compiler/passes/passes.hpp +++ b/modules/gapi/src/compiler/passes/passes.hpp @@ -63,6 +63,8 @@ void applyTransformations(ade::passes::PassContext &ctx, const gapi::GKernelPackage &pkg, const std::vector> &preGeneratedPatterns); +void addStreaming(ade::passes::PassContext &ctx); + }} // namespace gimpl::passes } // namespace cv diff --git a/modules/gapi/src/compiler/passes/streaming.cpp b/modules/gapi/src/compiler/passes/streaming.cpp new file mode 100644 index 0000000000..6e982e2553 --- /dev/null +++ b/modules/gapi/src/compiler/passes/streaming.cpp @@ -0,0 +1,84 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#include "precomp.hpp" + +#include // cout +#include // stringstream +#include // ofstream +#include + +#include +#include // indexed() + +#include +#include "compiler/gmodel.hpp" +#include "compiler/gislandmodel.hpp" +#include "compiler/passes/passes.hpp" + +namespace cv { namespace gimpl { namespace passes { + +/** + * This pass extends a GIslandModel with streaming-oriented + * information. + * + * Every input data object (according to the protocol) is connected to + * a new "Emitter" node which becomes its _consumer_. + * + * Every output data object (again, according to the protocol) is + * connected to a new "Sink" node which becomes its _consumer_. + * + * These extra nodes are required to streamline the queues + * initialization by the GStreamingExecutable and its derivatives. + */ +void addStreaming(ade::passes::PassContext &ctx) +{ + GModel::Graph gm(ctx.graph); + if (!gm.metadata().contains()) { + return; + } + + // Note: This pass is working on a GIslandModel. + // FIXME: May be introduce a new variant of GIslandModel to + // deal with streams? + auto igr = gm.metadata().get().model; + GIslandModel::Graph igm(*igr); + + // First collect all data slots & their respective original + // data objects + using M = std::unordered_map + < ade::NodeHandle // key: a GModel's data object node + , ade::NodeHandle // value: an appropriate GIslandModel's slot node + , ade::HandleHasher + >; + M orig_to_isl; + for (auto &&nh : igm.nodes()) { + if (igm.metadata(nh).get().k == NodeKind::SLOT) { + const auto &orig_nh = igm.metadata(nh).get().original_data_node; + orig_to_isl[orig_nh] = nh; + } + } + + // Now walk through the list of input slots and connect those + // to a Streaming source. + const auto proto = gm.metadata().get(); + for (auto &&it : ade::util::indexed(proto.in_nhs)) { + const auto in_idx = ade::util::index(it); + const auto in_nh = ade::util::value(it); + auto emit_nh = GIslandModel::mkEmitNode(igm, in_idx); + igm.link(emit_nh, orig_to_isl.at(in_nh)); + } + + // Same for output slots + for (auto &&it : ade::util::indexed(proto.out_nhs)) { + const auto out_idx = ade::util::index(it); + const auto out_nh = ade::util::value(it); + auto sink_nh = GIslandModel::mkSinkNode(igm, out_idx); + igm.link(orig_to_isl.at(out_nh), sink_nh); + } +} + +}}} // cv::gimpl::passes diff --git a/modules/gapi/src/executor/conc_queue.hpp b/modules/gapi/src/executor/conc_queue.hpp new file mode 100644 index 0000000000..5de50ef34b --- /dev/null +++ b/modules/gapi/src/executor/conc_queue.hpp @@ -0,0 +1,129 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_EXECUTOR_CONC_QUEUE_HPP +#define OPENCV_GAPI_EXECUTOR_CONC_QUEUE_HPP + +#include +#include +#include + +#include + +namespace cv { +namespace gapi { +namespace own { + +// This class implements a bare minimum interface of TBB's +// concurrent_bounded_queue with only std:: stuff to make streaming +// API work without TBB. +// +// Highly inefficient, please use it as a last resort if TBB is not +// available in the build. +template +class concurrent_bounded_queue { + std::queue m_data; + std::size_t m_capacity; + + std::mutex m_mutex; + std::condition_variable m_cond_empty; + std::condition_variable m_cond_full; + + void unsafe_pop(T &t); + +public: + concurrent_bounded_queue() : m_capacity(0) {} + concurrent_bounded_queue(const concurrent_bounded_queue &cc) + : m_data(cc.m_data), m_capacity(cc.m_capacity) { + // FIXME: what to do with all that locks, etc? + } + concurrent_bounded_queue(concurrent_bounded_queue &&cc) + : m_data(std::move(cc.m_data)), m_capacity(cc.m_capacity) { + // FIXME: what to do with all that locks, etc? + } + + // FIXME: && versions + void push(const T &t); + void pop(T &t); + bool try_pop(T &t); + + void set_capacity(std::size_t capacity); + + // Not thread-safe - as in TBB + void clear(); +}; + +// Internal: do shared pop things assuming the lock is already there +template +void concurrent_bounded_queue::unsafe_pop(T &t) { + GAPI_Assert(!m_data.empty()); + t = m_data.front(); + m_data.pop(); +} + +// Push an element to the queue. Blocking if there's no space left +template +void concurrent_bounded_queue::push(const T& t) { + std::unique_lock lock(m_mutex); + + if (m_capacity && m_capacity == m_data.size()) { + // if there is a limit and it is reached, wait + m_cond_full.wait(lock, [&](){return m_capacity > m_data.size();}); + GAPI_Assert(m_capacity > m_data.size()); + } + m_data.push(t); + lock.unlock(); + m_cond_empty.notify_one(); +} + +// Pop an element from the queue. Blocking if there's no items +template +void concurrent_bounded_queue::pop(T &t) { + std::unique_lock lock(m_mutex); + if (m_data.empty()) { + // if there is no data, wait + m_cond_empty.wait(lock, [&](){return !m_data.empty();}); + } + unsafe_pop(t); + lock.unlock(); + m_cond_full.notify_one(); +} + +// Try pop an element from the queue. Returns false if queue is empty +template +bool concurrent_bounded_queue::try_pop(T &t) { + std::unique_lock lock(m_mutex); + if (m_data.empty()) { + // if there is no data, return + return false; + } + unsafe_pop(t); + lock.unlock(); + m_cond_full.notify_one(); + return true; +} + +// Specify the upper limit to the queue. Assumed to be called after +// queue construction but before any real use, any other case is UB +template +void concurrent_bounded_queue::set_capacity(std::size_t capacity) { + GAPI_Assert(m_data.empty()); + GAPI_Assert(m_capacity == 0u); + GAPI_Assert(capacity != 0u); + m_capacity = capacity; +} + +// Clear the queue. Similar to the TBB version, this method is not +// thread-safe. +template +void concurrent_bounded_queue::clear() +{ + m_data = std::queue{}; +} + +}}} // namespace cv::gapi::own + +#endif // OPENCV_GAPI_EXECUTOR_CONC_QUEUE_HPP diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp new file mode 100644 index 0000000000..cd5f1452ca --- /dev/null +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -0,0 +1,771 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#include "precomp.hpp" + +#include + +#include + +#include + +#include "executor/gstreamingexecutor.hpp" +#include "compiler/passes/passes.hpp" +#include "backends/common/gbackend.hpp" // createMat + +namespace +{ +using namespace cv::gimpl::stream; + +#if !defined(GAPI_STANDALONE) +class VideoEmitter final: public cv::gimpl::GIslandEmitter { + cv::gapi::wip::IStreamSource::Ptr src; + + virtual bool pull(cv::GRunArg &arg) override { + // FIXME: probably we can maintain a pool of (then) pre-allocated + // buffers to avoid runtime allocations. + // Pool size can be determined given the internal queue size. + cv::gapi::wip::Data newData; + if (!src->pull(newData)) { + return false; + } + arg = std::move(static_cast(newData)); + return true; + } +public: + explicit VideoEmitter(const cv::GRunArg &arg) { + src = cv::util::get(arg); + } +}; +#endif // GAPI_STANDALONE + +class ConstEmitter final: public cv::gimpl::GIslandEmitter { + cv::GRunArg m_arg; + + virtual bool pull(cv::GRunArg &arg) override { + arg = const_cast(m_arg); // FIXME: variant workaround + return true; + } +public: + + explicit ConstEmitter(const cv::GRunArg &arg) : m_arg(arg) { + } +}; + +struct DataQueue { + static const char *name() { return "StreamingDataQueue"; } + + explicit DataQueue(std::size_t capacity) { + if (capacity) { + q.set_capacity(capacity); + } + } + + cv::gimpl::stream::Q q; +}; + +std::vector reader_queues( ade::Graph &g, + const ade::NodeHandle &obj) +{ + ade::TypedGraph qgr(g); + std::vector result; + for (auto &&out_eh : obj->outEdges()) + { + result.push_back(&qgr.metadata(out_eh).get().q); + } + return result; +} + +std::vector input_queues( ade::Graph &g, + const ade::NodeHandle &obj) +{ + ade::TypedGraph qgr(g); + std::vector result; + for (auto &&in_eh : obj->inEdges()) + { + result.push_back(qgr.metadata(in_eh).contains() + ? &qgr.metadata(in_eh).get().q + : nullptr); + } + return result; +} + +void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs) +{ + namespace own = cv::gapi::own; + + for (auto && it : ade::util::zip(ade::util::toRange(outputs), + ade::util::toRange(results))) + { + auto &out_obj = std::get<0>(it); + auto &res_obj = std::get<1>(it); + + // FIXME: this conversion should be unified + using T = cv::GRunArgP; + switch (out_obj.index()) + { +#if !defined(GAPI_STANDALONE) + case T::index_of(): + *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); + break; + case T::index_of(): + *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); + break; +#endif // !GAPI_STANDALONE + case T::index_of(): + *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); + break; + case T::index_of(): + *cv::util::get(out_obj) = std::move(cv::util::get(res_obj)); + break; + case T::index_of(): + cv::util::get(out_obj).mov(cv::util::get(res_obj)); + break; + default: + GAPI_Assert(false && "This value type is not supported!"); // ...maybe because of STANDALONE mode. + break; + } + } +} + +// This thread is a plain dump source actor. What it do is just: +// - Check input queue (the only one) for a control command +// - Depending on the state, obtains next data object and pushes it to the +// pipeline. +void emitterActorThread(std::shared_ptr emitter, + Q& in_queue, + std::vector out_queues, + std::function cb_completion) +{ + // Wait for the explicit Start command. + // ...or Stop command, this also happens. + Cmd cmd; + in_queue.pop(cmd); + GAPI_Assert( cv::util::holds_alternative(cmd) + || cv::util::holds_alternative(cmd)); + if (cv::util::holds_alternative(cmd)) + { + for (auto &&oq : out_queues) oq->push(cmd); + return; + } + + // Now start emitting the data from the source to the pipeline. + while (true) + { + Cmd cancel; + if (in_queue.try_pop(cancel)) + { + // if we just popped a cancellation command... + GAPI_Assert(cv::util::holds_alternative(cancel)); + // Broadcast it to the readers and quit. + for (auto &&oq : out_queues) oq->push(cancel); + return; + } + + // Try to obrain next data chunk from the source + cv::GRunArg data; + if (emitter->pull(data)) + { + // // On success, broadcast it to our readers + for (auto &&oq : out_queues) + { + // FIXME: FOR SOME REASON, oq->push(Cmd{data}) doesn't work!! + // empty mats are arrived to the receivers! + // There may be a fatal bug in our variant! + const auto tmp = data; + oq->push(Cmd{tmp}); + } + } + else + { + // Otherwise, broadcast STOP message to our readers and quit. + // This usually means end-of-stream, so trigger a callback + for (auto &&oq : out_queues) oq->push(Cmd{Stop{}}); + if (cb_completion) cb_completion(); + return; + } + } +} + +// This thread is a plain dumb processing actor. What it do is just: +// - Reads input from the input queue(s), sleeps if there's nothing to read +// - Once a full input vector is obtained, passes it to the underlying island +// executable for processing. +// - Pushes processing results down to consumers - to the subsequent queues. +// Note: Every data object consumer has its own queue. +void islandActorThread(std::vector in_rcs, // FIXME: this is... + std::vector out_rcs, // FIXME: ...basically just... + cv::GMetaArgs out_metas, // ... + std::shared_ptr island, // FIXME: ...a copy of OpDesc{}. + std::vector in_queues, + std::vector in_constants, + std::vector< std::vector > out_queues) +{ + GAPI_Assert(in_queues.size() == in_rcs.size()); + GAPI_Assert(out_queues.size() == out_rcs.size()); + GAPI_Assert(out_queues.size() == out_metas.size()); + while (true) + { + std::vector isl_inputs; + isl_inputs.resize(in_rcs.size()); + + // Try to obtain the full input vector. + // Note this may block us. We also may get Stop signal here + // and then exit the thread. + // NOTE: in order to maintain the GRunArg's underlying object + // lifetime, keep the whole cmd vector (of size == # of inputs) + // in memory. + std::vector cmd(in_queues.size()); + for (auto &&it : ade::util::indexed(in_queues)) + { + auto id = ade::util::index(it); + auto &q = ade::util::value(it); + + isl_inputs[id].first = in_rcs[id]; + if (q == nullptr) + { + // NULL queue means a graph-constant value + // (like a value-initialized scalar) + // FIXME: Variant move problem + isl_inputs[id].second = const_cast(in_constants[id]); + } + else + { + q->pop(cmd[id]); + if (cv::util::holds_alternative(cmd[id])) + { + // FIXME: This logic must be unified with what collectorThread is doing! + // Just got a stop sign. Reiterate through all queues + // and rewind data to every Stop sign per queue + for (auto &&qit : ade::util::indexed(in_queues)) + { + auto id2 = ade::util::index(qit); + auto &q2 = ade::util::value(qit); + if (id == id2) continue; + + Cmd cmd2; + while (q2 && !cv::util::holds_alternative(cmd2)) + q2->pop(cmd2); + } + // Broadcast Stop down to the pipeline and quit + for (auto &&out_qq : out_queues) + { + for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}}); + } + return; + } + // FIXME: MOVE PROBLEM + const cv::GRunArg &in_arg = cv::util::get(cmd[id]); +#if defined(GAPI_STANDALONE) + // Standalone mode - simply store input argument in the vector as-is + isl_inputs[id].second = in_arg; +#else + // Make Islands operate on own:: data types (i.e. in the same + // environment as GExecutor provides) + // This way several backends (e.g. Fluid) remain OpenCV-independent. + switch (in_arg.index()) { + case cv::GRunArg::index_of(): + isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get(in_arg))}; + break; + case cv::GRunArg::index_of(): + isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get(in_arg))}; + break; + default: + isl_inputs[id].second = in_arg; + break; + } +#endif // GAPI_STANDALONE + } + } + // Once the vector is obtained, prepare data for island execution + // Note - we first allocate output vector via GRunArg! + // Then it is converted to a GRunArgP. + std::vector isl_outputs; + std::vector out_data; + isl_outputs.resize(out_rcs.size()); + out_data.resize(out_rcs.size()); + for (auto &&it : ade::util::indexed(out_rcs)) + { + auto id = ade::util::index(it); + auto &r = ade::util::value(it); + +#if !defined(GAPI_STANDALONE) + using MatType = cv::Mat; + using SclType = cv::Scalar; +#else + using MatType = cv::gapi::own::Mat; + using SclType = cv::gapi::own::Scalar; +#endif // GAPI_STANDALONE + + switch (r.shape) { + // Allocate a data object based on its shape & meta, and put it into our vectors. + // Yes, first we put a cv::Mat GRunArg, and then specify _THAT_ + // pointer as an output parameter - to make sure that after island completes, + // our GRunArg still has the right (up-to-date) value. + // Same applies to other types. + // FIXME: This is absolutely ugly but seem to work perfectly for its purpose. + case cv::GShape::GMAT: + { + MatType newMat; + cv::gimpl::createMat(cv::util::get(out_metas[id]), newMat); + out_data[id] = cv::GRunArg(std::move(newMat)); + isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get(out_data[id])) }; + } + break; + case cv::GShape::GSCALAR: + { + SclType newScl; + out_data[id] = cv::GRunArg(std::move(newScl)); + isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get(out_data[id])) }; + } + break; + case cv::GShape::GARRAY: + { + cv::detail::VectorRef newVec; + cv::util::get(r.ctor)(newVec); + out_data[id] = cv::GRunArg(std::move(newVec)); + // VectorRef is implicitly shared so no pointer is taken here + const auto &rr = cv::util::get(out_data[id]); // FIXME: that variant MOVE problem again + isl_outputs[id] = { r, cv::GRunArgP(rr) }; + } + break; + default: + cv::util::throw_error(std::logic_error("Unsupported GShape")); + break; + } + } + // Now ask Island to execute on this data + island->run(std::move(isl_inputs), std::move(isl_outputs)); + + // Once executed, dispatch our results down to the pipeline. + for (auto &&it : ade::util::zip(ade::util::toRange(out_queues), + ade::util::toRange(out_data))) + { + for (auto &&q : std::get<0>(it)) + { + // FIXME: FATAL VARIANT ISSUE!! + const auto tmp = std::get<1>(it); + q->push(Cmd{tmp}); + } + } + } +} + +// The idea of collectorThread is easy. If there're multiple outputs +// in the graph, we need to pull an object from every associated queue +// and then put the resulting vector into one single queue. While it +// looks redundant, it simplifies dramatically the way how try_pull() +// is implemented - we need to check one queue instead of many. +void collectorThread(std::vector in_queues, + Q& out_queue) +{ + while (true) + { + cv::GRunArgs this_result(in_queues.size()); + for (auto &&it : ade::util::indexed(in_queues)) + { + Cmd cmd; + ade::util::value(it)->pop(cmd); + if (cv::util::holds_alternative(cmd)) + { + // FIXME: Unify this code with island thread + for (auto &&qit : ade::util::indexed(in_queues)) + { + if (ade::util::index(qit) == ade::util::index(it)) continue; + Cmd cmd2; + while (!cv::util::holds_alternative(cmd2)) + ade::util::value(qit)->pop(cmd2); + } + out_queue.push(Cmd{Stop{}}); + return; + } + else + { + // FIXME: MOVE_PROBLEM + const cv::GRunArg &in_arg = cv::util::get(cmd); + this_result[ade::util::index(it)] = in_arg; + // FIXME: Check for other message types. + } + } + out_queue.push(Cmd{this_result}); + } +} +} // anonymous namespace + +cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr &&g_model) + : m_orig_graph(std::move(g_model)) + , m_island_graph(GModel::Graph(*m_orig_graph).metadata() + .get().model) + , m_gim(*m_island_graph) +{ + GModel::Graph gm(*m_orig_graph); + // NB: Right now GIslandModel is acyclic, and all the below code assumes that. + // NB: This naive execution code is taken from GExecutor nearly "as-is" + + const auto proto = gm.metadata().get(); + m_emitters .resize(proto.in_nhs.size()); + m_emitter_queues.resize(proto.in_nhs.size()); + m_sinks .resize(proto.out_nhs.size()); + m_sink_queues .resize(proto.out_nhs.size()); + + // Very rough estimation to limit internal queue sizes. + // Pipeline depth is equal to number of its (pipeline) steps. + const auto queue_capacity = std::count_if + (m_gim.nodes().begin(), + m_gim.nodes().end(), + [&](ade::NodeHandle nh) { + return m_gim.metadata(nh).get().k == NodeKind::ISLAND; + }); + + auto sorted = m_gim.metadata().get(); + for (auto nh : sorted.nodes()) + { + switch (m_gim.metadata(nh).get().k) + { + case NodeKind::ISLAND: + { + std::vector input_rcs; + std::vector output_rcs; + std::vector in_constants; + cv::GMetaArgs output_metas; + input_rcs.reserve(nh->inNodes().size()); + in_constants.reserve(nh->inNodes().size()); // FIXME: Ugly + output_rcs.reserve(nh->outNodes().size()); + output_metas.reserve(nh->outNodes().size()); + + std::unordered_set > const_ins; + + // FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER! + // FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!! + auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector &vec) { + const auto orig_data_nh + = m_gim.metadata(slot_nh).get().original_data_node; + const auto &orig_data_info + = gm.metadata(orig_data_nh).get(); + if (orig_data_info.storage == Data::Storage::CONST_VAL) { + const_ins.insert(slot_nh); + // FIXME: Variant move issue + in_constants.push_back(const_cast(gm.metadata(orig_data_nh).get().arg)); + } else in_constants.push_back(cv::GRunArg{}); // FIXME: Make it in some smarter way pls + if (orig_data_info.shape == GShape::GARRAY) { + // FIXME: GArray lost host constructor problem + GAPI_Assert(!cv::util::holds_alternative(orig_data_info.ctor)); + } + vec.emplace_back(RcDesc{ orig_data_info.rc + , orig_data_info.shape + , orig_data_info.ctor}); + }; + auto xtract_out = [&](ade::NodeHandle slot_nh, std::vector &vec, cv::GMetaArgs &metas) { + const auto orig_data_nh + = m_gim.metadata(slot_nh).get().original_data_node; + const auto &orig_data_info + = gm.metadata(orig_data_nh).get(); + if (orig_data_info.shape == GShape::GARRAY) { + // FIXME: GArray lost host constructor problem + GAPI_Assert(!cv::util::holds_alternative(orig_data_info.ctor)); + } + vec.emplace_back(RcDesc{ orig_data_info.rc + , orig_data_info.shape + , orig_data_info.ctor}); + metas.emplace_back(orig_data_info.meta); + }; + // FIXME: JEZ IT WAS SO AWFUL!!!! + for (auto in_slot_nh : nh->inNodes()) xtract_in(in_slot_nh, input_rcs); + for (auto out_slot_nh : nh->outNodes()) xtract_out(out_slot_nh, output_rcs, output_metas); + + m_ops.emplace_back(OpDesc{ std::move(input_rcs) + , std::move(output_rcs) + , std::move(output_metas) + , nh + , in_constants + , m_gim.metadata(nh).get().object}); + + // Initialize queues for every operation's input + ade::TypedGraph qgr(*m_island_graph); + for (auto eh : nh->inEdges()) + { + // ...only if the data is not compile-const + if (const_ins.count(eh->srcNode()) == 0) { + qgr.metadata(eh).set(DataQueue(queue_capacity)); + m_internal_queues.insert(&qgr.metadata(eh).get().q); + } + } + } + break; + case NodeKind::SLOT: + { + const auto orig_data_nh + = m_gim.metadata(nh).get().original_data_node; + m_slots.emplace_back(DataDesc{nh, orig_data_nh}); + } + break; + case NodeKind::EMIT: + { + const auto emitter_idx + = m_gim.metadata(nh).get().proto_index; + GAPI_Assert(emitter_idx < m_emitters.size()); + m_emitters[emitter_idx] = nh; + } + break; + case NodeKind::SINK: + { + const auto sink_idx + = m_gim.metadata(nh).get().proto_index; + GAPI_Assert(sink_idx < m_sinks.size()); + m_sinks[sink_idx] = nh; + + // Also initialize Sink's input queue + ade::TypedGraph qgr(*m_island_graph); + GAPI_Assert(nh->inEdges().size() == 1u); + qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity)); + m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get().q; + } + break; + default: + GAPI_Assert(false); + break; + } // switch(kind) + } // for(gim nodes) + m_out_queue.set_capacity(queue_capacity); +} + +cv::gimpl::GStreamingExecutor::~GStreamingExecutor() +{ + if (state == State::READY || state == State::RUNNING) + stop(); +} + +void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) +{ + GAPI_Assert(state == State::READY || state == State::STOPPED); + + const auto is_video = [](const GRunArg &arg) { + return util::holds_alternative(arg); + }; + const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video); + if (num_videos > 1u) + { + // See below why (another reason - no documented behavior + // on handling videos streams of different length) + util::throw_error(std::logic_error("Only one video source is" + " currently supported!")); + } + + // Walk through the protocol, set-up emitters appropriately + // There's a 1:1 mapping between emitters and corresponding data inputs. + for (auto it : ade::util::zip(ade::util::toRange(m_emitters), + ade::util::toRange(ins), + ade::util::iota(m_emitters.size()))) + { + auto emit_nh = std::get<0>(it); + auto& emit_arg = std::get<1>(it); + auto emit_idx = std::get<2>(it); + auto& emitter = m_gim.metadata(emit_nh).get().object; + + using T = GRunArg; + switch (emit_arg.index()) + { + // Create a streaming emitter. + // Produces the next video frame when pulled. + case T::index_of(): +#if !defined(GAPI_STANDALONE) + emitter.reset(new VideoEmitter{emit_arg}); +#else + util::throw_error(std::logic_error("Video is not supported in the " + "standalone mode")); +#endif + break; + default: + // Create a constant emitter. + // Produces always the same ("constant") value when pulled. + emitter.reset(new ConstEmitter{emit_arg}); + m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]); + break; + } + } + + // FIXME: The below code assumes our graph may have only one + // real video source (and so, only one stream which may really end) + // all other inputs are "constant" generators. + // Craft here a completion callback to notify Const emitters that + // a video source is over + auto real_video_completion_cb = [this]() { + for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}}); + }; + + // FIXME: ONLY now, after all executable objects are created, + // we can set up our execution threads. Let's do it. + // First create threads for all the emitters. + // FIXME: One way to avoid this may be including an Emitter object as a part of + // START message. Why not? + if (state == State::READY) + { + stop(); + } + + for (auto it : ade::util::indexed(m_emitters)) + { + const auto id = ade::util::index(it); // = index in GComputation's protocol + const auto eh = ade::util::value(it); + + // Prepare emitter thread parameters + auto emitter = m_gim.metadata(eh).get().object; + + // Collect all reader queues from the emitter's the only output object + auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front()); + + m_threads.emplace_back(emitterActorThread, + emitter, + std::ref(m_emitter_queues[id]), + out_queues, + real_video_completion_cb); + } + + // Now do this for every island (in a topological order) + for (auto &&op : m_ops) + { + // Prepare island thread parameters + auto island = m_gim.metadata(op.nh).get().object; + + // Collect actor's input queues + auto in_queues = input_queues(*m_island_graph, op.nh); + + // Collect actor's output queues. + // This may be tricky... + std::vector< std::vector > out_queues; + for (auto &&out_eh : op.nh->outNodes()) { + out_queues.push_back(reader_queues(*m_island_graph, out_eh)); + } + + m_threads.emplace_back(islandActorThread, + op.in_objects, + op.out_objects, + op.out_metas, + island, + in_queues, + op.in_constants, + out_queues); + } + + // Finally, start a collector thread. + m_threads.emplace_back(collectorThread, + m_sink_queues, + std::ref(m_out_queue)); + state = State::READY; +} + +void cv::gimpl::GStreamingExecutor::start() +{ + if (state == State::STOPPED) + { + util::throw_error(std::logic_error("Please call setSource() before start() " + "if the pipeline has been already stopped")); + } + GAPI_Assert(state == State::READY); + + // Currently just trigger our emitters to work + state = State::RUNNING; + for (auto &q : m_emitter_queues) + { + q.push(stream::Cmd{stream::Start{}}); + } +} + +void cv::gimpl::GStreamingExecutor::wait_shutdown() +{ + // This utility is used by pull/try_pull/stop() to uniformly + // shutdown the worker threads. + // FIXME: Of course it can be designed much better + for (auto &t : m_threads) t.join(); + m_threads.clear(); + + // Clear all queues + // If there are constant emitters, internal queues + // may be polluted with constant values and have extra + // data at the point of shutdown. + // It usually happens when there's multiple inputs, + // one constant and one is not, and the latter ends (e.g. + // with end-of-stream). + for (auto &q : m_emitter_queues) q.clear(); + for (auto &q : m_sink_queues) q->clear(); + for (auto &q : m_internal_queues) q->clear(); + m_out_queue.clear(); + + state = State::STOPPED; +} + +bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs) +{ + if (state == State::STOPPED) + return false; + GAPI_Assert(state == State::RUNNING); + GAPI_Assert(m_sink_queues.size() == outs.size()); + + Cmd cmd; + m_out_queue.pop(cmd); + if (cv::util::holds_alternative(cmd)) + { + wait_shutdown(); + return false; + } + + GAPI_Assert(cv::util::holds_alternative(cmd)); + cv::GRunArgs &this_result = cv::util::get(cmd); + sync_data(this_result, outs); + return true; +} + +bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs) +{ + if (state == State::STOPPED) + return false; + + GAPI_Assert(m_sink_queues.size() == outs.size()); + + Cmd cmd; + if (!m_out_queue.try_pop(cmd)) { + return false; + } + if (cv::util::holds_alternative(cmd)) + { + wait_shutdown(); + return false; + } + + GAPI_Assert(cv::util::holds_alternative(cmd)); + cv::GRunArgs &this_result = cv::util::get(cmd); + sync_data(this_result, outs); + return true; +} + +void cv::gimpl::GStreamingExecutor::stop() +{ + if (state == State::STOPPED) + return; + + // FIXME: ...and how to deal with still-unread data then? + // Push a Stop message to the every emitter, + // wait until it broadcasts within the pipeline, + // FIXME: worker threads could stuck on push()! + // need to read the output queues until Stop! + for (auto &q : m_emitter_queues) { + q.push(stream::Cmd{stream::Stop{}}); + } + + // Pull messages from the final queue to ensure completion + Cmd cmd; + while (!cv::util::holds_alternative(cmd)) + { + m_out_queue.pop(cmd); + } + GAPI_Assert(cv::util::holds_alternative(cmd)); + wait_shutdown(); +} + +bool cv::gimpl::GStreamingExecutor::running() const +{ + return (state == State::RUNNING); +} diff --git a/modules/gapi/src/executor/gstreamingexecutor.hpp b/modules/gapi/src/executor/gstreamingexecutor.hpp new file mode 100644 index 0000000000..dd1dea9e92 --- /dev/null +++ b/modules/gapi/src/executor/gstreamingexecutor.hpp @@ -0,0 +1,132 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP +#define OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP + +#ifdef _MSC_VER +#pragma warning(disable: 4503) // "decorated name length exceeded" + // on concurrent_bounded_queue +#endif + +#include // unique_ptr, shared_ptr +#include // thread + +#if defined(HAVE_TBB) +# include // FIXME: drop it from here! +template using QueueClass = tbb::concurrent_bounded_queue; +#else +# include "executor/conc_queue.hpp" +template using QueueClass = cv::gapi::own::concurrent_bounded_queue; +#endif // TBB + +#include + +#include "backends/common/gbackend.hpp" + +namespace cv { +namespace gimpl { + +namespace stream { +struct Start {}; +struct Stop {}; + +using Cmd = cv::util::variant + < cv::util::monostate + , Start // Tells emitters to start working. Not broadcasted to workers. + , Stop // Tells emitters to stop working. Broadcasted to workers. + , cv::GRunArg // Workers data payload to process. + , cv::GRunArgs // Full results vector + >; +using Q = QueueClass; +} // namespace stream + +// FIXME: Currently all GExecutor comments apply also +// to this one. Please document it separately in the future. + +class GStreamingExecutor final +{ +protected: + // GStreamingExecutor is a state machine described as follows + // + // setSource() called + // STOPPED: - - - - - - - - - ->READY: + // -------- ------ + // Initial state Input data specified + // No threads running Threads are created and IDLE + // ^ (Currently our emitter threads + // : are bounded to input data) + // : stop() called No processing happending + // : OR : + // : end-of-stream reached : start() called + // : during pull()/try_pull() V + // : RUNNING: + // : -------- + // : Actual pipeline execution + // - - - - - - - - - - - - - - Threads are running + // + enum class State { + STOPPED, + READY, + RUNNING, + } state = State::STOPPED; + + std::unique_ptr m_orig_graph; + std::shared_ptr m_island_graph; + + cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const? + + // FIXME: Naive executor details are here for now + // but then it should be moved to another place + struct OpDesc + { + std::vector in_objects; + std::vector out_objects; + cv::GMetaArgs out_metas; + ade::NodeHandle nh; + + std::vector in_constants; + + // FIXME: remove it as unused + std::shared_ptr isl_exec; + }; + std::vector m_ops; + + struct DataDesc + { + ade::NodeHandle slot_nh; + ade::NodeHandle data_nh; + }; + std::vector m_slots; + + // Order in these vectors follows the GComputaion's protocol + std::vector m_emitters; + std::vector m_sinks; + + std::vector m_threads; + std::vector m_emitter_queues; + std::vector m_const_emitter_queues; // a view over m_emitter_queues + std::vector m_sink_queues; + std::unordered_set m_internal_queues; + stream::Q m_out_queue; + + void wait_shutdown(); + +public: + explicit GStreamingExecutor(std::unique_ptr &&g_model); + ~GStreamingExecutor(); + void setSource(GRunArgs &&args); + void start(); + bool pull(cv::GRunArgsP &&outs); + bool try_pull(cv::GRunArgsP &&outs); + void stop(); + bool running() const; +}; + +} // namespace gimpl +} // namespace cv + +#endif // OPENCV_GAPI_GSTREAMING_EXECUTOR_HPP diff --git a/modules/gapi/src/precomp.hpp b/modules/gapi/src/precomp.hpp index 6106cd9bce..f72d0a1ecc 100644 --- a/modules/gapi/src/precomp.hpp +++ b/modules/gapi/src/precomp.hpp @@ -18,4 +18,7 @@ #include #include +// FIXME: Should this file be extended with our new headers? +// (which sometimes may be implicitly included here already?) + #endif // __OPENCV_GAPI_PRECOMP_HPP__ diff --git a/modules/gapi/test/common/gapi_core_tests_inl.hpp b/modules/gapi/test/common/gapi_core_tests_inl.hpp index 7fb6bda33e..601d39d777 100644 --- a/modules/gapi/test/common/gapi_core_tests_inl.hpp +++ b/modules/gapi/test/common/gapi_core_tests_inl.hpp @@ -1293,6 +1293,8 @@ TEST_P(NormalizeTest, Test) } } +// PLEASE DO NOT PUT NEW ACCURACY TESTS BELOW THIS POINT! ////////////////////// + TEST_P(BackendOutputAllocationTest, EmptyOutput) { // G-API code ////////////////////////////////////////////////////////////// diff --git a/modules/gapi/test/cpu/gapi_core_tests_cpu.cpp b/modules/gapi/test/cpu/gapi_core_tests_cpu.cpp index 44f440dabd..07f176f6d7 100644 --- a/modules/gapi/test/cpu/gapi_core_tests_cpu.cpp +++ b/modules/gapi/test/cpu/gapi_core_tests_cpu.cpp @@ -447,6 +447,8 @@ INSTANTIATE_TEST_CASE_P(NormalizeTestCPU, NormalizeTest, Values(NORM_MINMAX, NORM_INF, NORM_L1, NORM_L2), Values(-1, CV_8U, CV_16U, CV_16S, CV_32F))); +// PLEASE DO NOT PUT NEW ACCURACY TESTS BELOW THIS POINT! ////////////////////// + INSTANTIATE_TEST_CASE_P(BackendOutputAllocationTestCPU, BackendOutputAllocationTest, Combine(Values(CV_8UC3, CV_16SC2, CV_32FC1), Values(cv::Size(50, 50)), @@ -467,4 +469,5 @@ INSTANTIATE_TEST_CASE_P(ReInitOutTestCPU, ReInitOutTest, Values(CORE_CPU), Values(cv::Size(640, 400), cv::Size(10, 480)))); + } diff --git a/modules/gapi/test/cpu/gapi_core_tests_fluid.cpp b/modules/gapi/test/cpu/gapi_core_tests_fluid.cpp index 8bc967544e..e20fb11db9 100644 --- a/modules/gapi/test/cpu/gapi_core_tests_fluid.cpp +++ b/modules/gapi/test/cpu/gapi_core_tests_fluid.cpp @@ -518,6 +518,8 @@ INSTANTIATE_TEST_CASE_P(LUTTestCustomCPU, LUTTest, cv::Size(128, 128)), /*init output matrices or not*/ Values(true))); +// PLEASE DO NOT PUT NEW ACCURACY TESTS BELOW THIS POINT! ////////////////////// + INSTANTIATE_TEST_CASE_P(ConvertToCPU, ConvertToTest, Combine(Values(CV_8UC3, CV_8UC1, CV_16UC1, CV_32FC1), Values(CV_8U, CV_16U, CV_32F), diff --git a/modules/gapi/test/gapi_array_tests.cpp b/modules/gapi/test/gapi_array_tests.cpp index e5765624cf..922a2ebddb 100644 --- a/modules/gapi/test/gapi_array_tests.cpp +++ b/modules/gapi/test/gapi_array_tests.cpp @@ -163,4 +163,28 @@ TEST(GArray, TestIntermediateOutput) EXPECT_EQ(10u, out_points.size()); EXPECT_EQ(10, out_count[0]); } + +TEST(GArray_VectorRef, TestMov) +{ + // Warning: this test is testing some not-very-public APIs + // Test how VectorRef's mov() (aka poor man's move()) is working. + + using I = int; + using V = std::vector; + const V vgold = { 1, 2, 3}; + V vtest = vgold; + const I* vptr = vtest.data(); + + cv::detail::VectorRef vref(vtest); + cv::detail::VectorRef vmov; + vmov.reset(); + + EXPECT_EQ(vgold, vref.rref()); + + vmov.mov(vref); + EXPECT_EQ(vgold, vmov.rref()); + EXPECT_EQ(vptr, vmov.rref().data()); + EXPECT_EQ(V{}, vref.rref()); + EXPECT_EQ(V{}, vtest); +} } // namespace opencv_test diff --git a/modules/gapi/test/gpu/gapi_core_tests_gpu.cpp b/modules/gapi/test/gpu/gapi_core_tests_gpu.cpp index 5ee292a735..2f766824eb 100644 --- a/modules/gapi/test/gpu/gapi_core_tests_gpu.cpp +++ b/modules/gapi/test/gpu/gapi_core_tests_gpu.cpp @@ -390,6 +390,8 @@ INSTANTIATE_TEST_CASE_P(ConcatVertTestGPU, ConcatVertTest, Values(-1), Values(CORE_GPU))); +// PLEASE DO NOT PUT NEW ACCURACY TESTS BELOW THIS POINT! ////////////////////// + INSTANTIATE_TEST_CASE_P(BackendOutputAllocationTestGPU, BackendOutputAllocationTest, Combine(Values(CV_8UC3, CV_16SC2, CV_32FC1), Values(cv::Size(50, 50)), diff --git a/modules/gapi/test/infer/gapi_infer_ie_test.cpp b/modules/gapi/test/infer/gapi_infer_ie_test.cpp index 5fc065add7..d81285fff0 100644 --- a/modules/gapi/test/infer/gapi_infer_ie_test.cpp +++ b/modules/gapi/test/infer/gapi_infer_ie_test.cpp @@ -101,7 +101,6 @@ TEST(TestAgeGenderIE, InferBasicTensor) const auto &iedims = net.getInputsInfo().begin()->second->getTensorDesc().getDims(); auto cvdims = cv::gapi::ie::util::to_ocv(iedims); - std::reverse(cvdims.begin(), cvdims.end()); in_mat.create(cvdims, CV_32F); cv::randu(in_mat, -1, 1); diff --git a/modules/gapi/test/own/conc_queue_tests.cpp b/modules/gapi/test/own/conc_queue_tests.cpp new file mode 100644 index 0000000000..c3e6fd6e08 --- /dev/null +++ b/modules/gapi/test/own/conc_queue_tests.cpp @@ -0,0 +1,197 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#include "../test_precomp.hpp" + +#include +#include + +#include "executor/conc_queue.hpp" + +namespace opencv_test +{ +using namespace cv::gapi; + +TEST(ConcQueue, PushPop) +{ + own::concurrent_bounded_queue q; + for (int i = 0; i < 100; i++) + { + q.push(i); + } + + for (int i = 0; i < 100; i++) + { + int x; + q.pop(x); + EXPECT_EQ(x, i); + } +} + +TEST(ConcQueue, TryPop) +{ + own::concurrent_bounded_queue q; + int x = 0; + EXPECT_FALSE(q.try_pop(x)); + + q.push(1); + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(1, x); +} + +TEST(ConcQueue, Clear) +{ + own::concurrent_bounded_queue q; + for (int i = 0; i < 10; i++) + { + q.push(i); + } + + q.clear(); + int x = 0; + EXPECT_FALSE(q.try_pop(x)); +} + +// In this test, every writer thread produce its own range of integer +// numbers, writing those to a shared queue. +// +// Every reader thread pops elements from the queue (until -1 is +// reached) and stores those in its own associated set. +// +// Finally, the master thread waits for completion of all other +// threads and verifies that all the necessary data is +// produced/obtained. +using StressParam = std::tuple; // Queue capacity +namespace +{ +constexpr int STOP_SIGN = -1; +constexpr int BASE = 1000; +} +struct ConcQueue_: public ::testing::TestWithParam +{ + using Q = own::concurrent_bounded_queue; + using S = std::unordered_set; + + static void writer(int base, int writes, Q& q) + { + for (int i = 0; i < writes; i++) + { + q.push(base + i); + } + q.push(STOP_SIGN); + } + + static void reader(Q& q, S& s) + { + int x = 0; + while (true) + { + q.pop(x); + if (x == STOP_SIGN) return; + s.insert(x); + } + } +}; + +TEST_P(ConcQueue_, Test) +{ + int num_writers = 0; + int num_writes = 0; + int num_readers = 0; + std::size_t capacity = 0u; + std::tie(num_writers, num_writes, num_readers, capacity) = GetParam(); + + CV_Assert(num_writers < 20); + CV_Assert(num_writes < BASE); + + Q q; + if (capacity) + { + // see below (2) + CV_Assert(static_cast(capacity) > (num_writers - num_readers)); + q.set_capacity(capacity); + } + + // Start reader threads + std::vector storage(num_readers); + std::vector readers; + for (S& s : storage) + { + readers.emplace_back(reader, std::ref(q), std::ref(s)); + } + + // Start writer threads, also pre-generate reference numbers + S reference; + std::vector writers; + for (int w = 0; w < num_writers; w++) + { + writers.emplace_back(writer, w*BASE, num_writes, std::ref(q)); + for (int r = 0; r < num_writes; r++) + { + reference.insert(w*BASE + r); + } + } + + // Every writer puts a STOP_SIGN at the end, + // There are three cases: + // 1) num_writers == num_readers + // every reader should get its own STOP_SIGN from any + // of the writers + // + // 2) num_writers > num_readers + // every reader will get a STOP_SIGN but there're more + // STOP_SIGNs may be pushed to the queue - and if this + // number exceeds capacity, writers block (to a deadlock). + // The latter situation must be avoided at parameters level. + // [a] Also not every data produced by writers will be consumed + // by a reader in this case. Master thread will read the rest + // + // 3) num_readers > num_writers + // in this case, some readers will stuck and will never get + // a STOP_SIGN. Master thread will push extra STOP_SIGNs to the + // queue. + + // Solution to (2a) + S remnants; + if (num_writers > num_readers) + { + int extra = num_writers - num_readers; + while (extra) + { + int x = 0; + q.pop(x); + if (x == STOP_SIGN) extra--; + else remnants.insert(x); + } + } + + // Solution to (3) + if (num_readers > num_writers) + { + int extra = num_readers - num_writers; + while (extra--) q.push(STOP_SIGN); + } + + // Wait for completions + for (auto &t : readers) t.join(); + for (auto &t : writers) t.join(); + + // Accumulate and validate the result + S result(remnants.begin(), remnants.end()); + for (const auto &s : storage) result.insert(s.begin(), s.end()); + + EXPECT_EQ(reference, result); +} + +INSTANTIATE_TEST_CASE_P(ConcQueueStress, ConcQueue_, + Combine( Values(1, 2, 4, 8, 16) // writers + , Values(1, 32, 96, 256) // writes + , Values(1, 2, 10) // readers + , Values(0u, 16u, 32u))); // capacity +} // namespace opencv_test diff --git a/modules/gapi/test/streaming/gapi_streaming_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_tests.cpp new file mode 100644 index 0000000000..216258d524 --- /dev/null +++ b/modules/gapi/test/streaming/gapi_streaming_tests.cpp @@ -0,0 +1,827 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + + +#include "../test_precomp.hpp" + +#include +#include + +#include +#include +#include + +#include +#include + +#include + +namespace opencv_test +{ +namespace +{ +void initTestDataPath() +{ +#ifndef WINRT + static bool initialized = false; + if (!initialized) + { + // Since G-API has no own test data (yet), it is taken from the common space + const char* testDataPath = getenv("OPENCV_TEST_DATA_PATH"); + GAPI_Assert(testDataPath != nullptr); + + cvtest::addDataSearchPath(testDataPath); + initialized = true; + } +#endif // WINRT +} + +cv::gapi::GKernelPackage OCV_KERNELS() +{ + static cv::gapi::GKernelPackage pkg = + cv::gapi::combine(cv::gapi::core::cpu::kernels(), + cv::gapi::imgproc::cpu::kernels()); + return pkg; +} + +cv::gapi::GKernelPackage OCV_FLUID_KERNELS() +{ + static cv::gapi::GKernelPackage pkg = + cv::gapi::combine(OCV_KERNELS(), + cv::gapi::core::fluid::kernels()); + return pkg; +} + +#if 0 +// FIXME: OpenCL backend seem to work fine with Streaming +// however the results are not very bit exact with CPU +// It may be a problem but may be just implementation innacuracy. +// Need to customize the comparison function in tests where OpenCL +// is involved. +cv::gapi::GKernelPackage OCL_KERNELS() +{ + static cv::gapi::GKernelPackage pkg = + cv::gapi::combine(cv::gapi::core::ocl::kernels(), + cv::gapi::imgproc::ocl::kernels()); + return pkg; +} + +cv::gapi::GKernelPackage OCL_FLUID_KERNELS() +{ + static cv::gapi::GKernelPackage pkg = + cv::gapi::combine(OCL_KERNELS(), + cv::gapi::core::fluid::kernels()); + return pkg; +} +#endif // 0 + +struct GAPI_Streaming: public ::testing::TestWithParam { + GAPI_Streaming() { initTestDataPath(); } +}; + +} // anonymous namespace + +TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat) +{ + // This graph models the following use-case: + // Canny here is used as some "feature detector" + // + // Island/device layout may be different given the contents + // of the passed kernel package. + // + // The expectation is that we get as much islands in the + // graph as backends the GKernelPackage contains. + // + // [Capture] --> Crop --> Resize --> Canny --> [out] + + const auto crop_rc = cv::Rect(13, 75, 377, 269); + const auto resample_sz = cv::Size(224, 224); + const auto thr_lo = 64.; + const auto thr_hi = 192.; + + cv::GMat in; + auto roi = cv::gapi::crop(in, crop_rc); + auto res = cv::gapi::resize(roi, resample_sz); + auto out = cv::gapi::Canny(res, thr_lo, thr_hi); + cv::GComputation c(in, out); + + // Input data + cv::Mat in_mat = cv::imread(findDataFile("cv/edgefilter/kodim23.png")); + cv::Mat out_mat_gapi; + + // OpenCV reference image + cv::Mat out_mat_ocv; + { + cv::Mat tmp; + cv::resize(in_mat(crop_rc), tmp, resample_sz); + cv::Canny(tmp, out_mat_ocv, thr_lo, thr_hi); + } + + // Compilation & testing + auto ccomp = c.compileStreaming(cv::descr_of(in_mat), + cv::compile_args(cv::gapi::use_only{GetParam()})); + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + ccomp.setSource(cv::gin(in_mat)); + + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Fetch the result 15 times + for (int i = 0; i < 15; i++) { + // With constant inputs, the stream is endless so + // the blocking pull() should never return `false`. + EXPECT_TRUE(ccomp.pull(cv::gout(out_mat_gapi))); + EXPECT_EQ(0, cv::countNonZero(out_mat_gapi != out_mat_ocv)); + } + + EXPECT_TRUE(ccomp.running()); + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); +} + +TEST_P(GAPI_Streaming, SmokeTest_VideoInput_GMat) +{ + const auto crop_rc = cv::Rect(13, 75, 377, 269); + const auto resample_sz = cv::Size(224, 224); + const auto thr_lo = 64.; + const auto thr_hi = 192.; + + cv::GMat in; + auto roi = cv::gapi::crop(in, crop_rc); + auto res = cv::gapi::resize(roi, resample_sz); + auto out = cv::gapi::Canny(res, thr_lo, thr_hi); + cv::GComputation c(cv::GIn(in), cv::GOut(cv::gapi::copy(in), out)); + + // OpenCV reference image code + auto opencv_ref = [&](const cv::Mat &in_mat, cv::Mat &out_mat) { + cv::Mat tmp; + cv::resize(in_mat(crop_rc), tmp, resample_sz); + cv::Canny(tmp, out_mat, thr_lo, thr_hi); + }; + + // Compilation & testing + auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(cv::gapi::use_only{GetParam()})); + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + ccomp.setSource(gapi::wip::make_src(findDataFile("cv/video/768x576.avi"))); + + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Process the full video + cv::Mat in_mat_gapi, out_mat_gapi; + + std::size_t frames = 0u; + while (ccomp.pull(cv::gout(in_mat_gapi, out_mat_gapi))) { + frames++; + cv::Mat out_mat_ocv; + opencv_ref(in_mat_gapi, out_mat_ocv); + EXPECT_EQ(0, cv::countNonZero(out_mat_gapi != out_mat_ocv)); + } + EXPECT_LT(0u, frames); + EXPECT_FALSE(ccomp.running()); + + // Stop can be called at any time (even if the pipeline is not running) + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); +} + +TEST_P(GAPI_Streaming, Regression_CompileTimeScalar) +{ + // There was a bug with compile-time GScalars. Compile-time + // GScalars generate their own DATA nodes at GModel/GIslandModel + // level, resulting in an extra link at the GIslandModel level, so + // GStreamingExecutor automatically assigned an input queue to + // such edges. Since there were no in-graph producer for that + // data, no data were pushed to such queue what lead to a + // deadlock. + + cv::GMat in; + cv::GMat tmp = cv::gapi::copy(in); + for (int i = 0; i < 3; i++) { + tmp = tmp & cv::gapi::blur(in, cv::Size(3,3)); + } + cv::GComputation c(cv::GIn(in), cv::GOut(tmp, tmp + 1)); + + auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,512}}, + cv::compile_args(cv::gapi::use_only{GetParam()})); + + cv::Mat in_mat = cv::imread(findDataFile("cv/edgefilter/kodim23.png")); + cv::Mat out_mat1, out_mat2; + + // Fetch the result 15 times + ccomp.setSource(cv::gin(in_mat)); + ccomp.start(); + for (int i = 0; i < 15; i++) { + EXPECT_TRUE(ccomp.pull(cv::gout(out_mat1, out_mat2))); + } + + ccomp.stop(); +} + +TEST_P(GAPI_Streaming, SmokeTest_StartRestart) +{ + cv::GMat in; + auto res = cv::gapi::resize(in, cv::Size{300,200}); + auto out = cv::gapi::Canny(res, 95, 220); + cv::GComputation c(cv::GIn(in), cv::GOut(cv::gapi::copy(in), out)); + + auto ccomp = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(cv::gapi::use_only{GetParam()})); + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + // Run 1 + std::size_t num_frames1 = 0u; + ccomp.setSource(gapi::wip::make_src(findDataFile("cv/video/768x576.avi"))); + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + cv::Mat out1, out2; + while (ccomp.pull(cv::gout(out1, out2))) num_frames1++; + + EXPECT_FALSE(ccomp.running()); + + // Run 2 + std::size_t num_frames2 = 0u; + ccomp.setSource(gapi::wip::make_src(findDataFile("cv/video/768x576.avi"))); + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + while (ccomp.pull(cv::gout(out1, out2))) num_frames2++; + + EXPECT_FALSE(ccomp.running()); + + EXPECT_LT(0u, num_frames1); + EXPECT_LT(0u, num_frames2); + EXPECT_EQ(num_frames1, num_frames2); +} + +TEST_P(GAPI_Streaming, SmokeTest_VideoConstSource_NoHang) +{ + // A video source is a finite one, while const source is not. + // Check that pipeline completes when a video source completes. + auto refc = cv::GComputation([](){ + cv::GMat in; + return cv::GComputation(in, cv::gapi::copy(in)); + }).compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(cv::gapi::use_only{GetParam()})); + + refc.setSource(gapi::wip::make_src(findDataFile("cv/video/768x576.avi"))); + refc.start(); + std::size_t ref_frames = 0u; + cv::Mat tmp; + while (refc.pull(cv::gout(tmp))) ref_frames++; + EXPECT_EQ(100u, ref_frames); + + cv::GMat in; + cv::GMat in2; + cv::GMat roi = cv::gapi::crop(in2, cv::Rect{1,1,256,256}); + cv::GMat blr = cv::gapi::blur(roi, cv::Size(3,3)); + cv::GMat out = blr - in; + auto testc = cv::GComputation(cv::GIn(in, in2), cv::GOut(out)) + .compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{256,256}}, + cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(cv::gapi::use_only{GetParam()})); + + cv::Mat in_const = cv::Mat::eye(cv::Size(256,256), CV_8UC3); + testc.setSource(cv::gin(in_const, + gapi::wip::make_src(findDataFile("cv/video/768x576.avi")))); + testc.start(); + std::size_t test_frames = 0u; + while (testc.pull(cv::gout(tmp))) test_frames++; + + EXPECT_EQ(ref_frames, test_frames); +} + +INSTANTIATE_TEST_CASE_P(TestStreaming, GAPI_Streaming, + Values( OCV_KERNELS() + //, OCL_KERNELS() // FIXME: Fails bit-exactness check, maybe relax it? + , OCV_FLUID_KERNELS() + //, OCL_FLUID_KERNELS() // FIXME: Fails bit-exactness check, maybe relax it? + )); + +namespace TypesTest +{ + G_API_OP(SumV, (cv::GMat)>, "test.gapi.sumv") { + static cv::GArrayDesc outMeta(const cv::GMatDesc &) { + return cv::empty_array_desc(); + } + }; + G_API_OP(AddV, )>, "test.gapi.addv") { + static cv::GMatDesc outMeta(const cv::GMatDesc &in, const cv::GArrayDesc &) { + return in; + } + }; + + GAPI_OCV_KERNEL(OCVSumV, SumV) { + static void run(const cv::Mat &in, std::vector &out) { + CV_Assert(in.depth() == CV_8U); + const auto length = in.cols * in.channels(); + out.resize(length); + + const uchar *ptr = in.ptr(0); + for (int c = 0; c < length; c++) { + out[c] = ptr[c]; + } + for (int r = 1; r < in.rows; r++) { + ptr = in.ptr(r); + for (int c = 0; c < length; c++) { + out[c] += ptr[c]; + } + } + } + }; + + GAPI_OCV_KERNEL(OCVAddV, AddV) { + static void run(const cv::Mat &in, const std::vector &inv, cv::Mat &out) { + CV_Assert(in.depth() == CV_8U); + const auto length = in.cols * in.channels(); + CV_Assert(length == static_cast(inv.size())); + + for (int r = 0; r < in.rows; r++) { + const uchar *in_ptr = in.ptr(r); + uchar *out_ptr = out.ptr(r); + + for (int c = 0; c < length; c++) { + out_ptr[c] = cv::saturate_cast(in_ptr[c] + inv[c]); + } + } + } + }; + + GAPI_FLUID_KERNEL(FluidAddV, AddV, false) { + static const int Window = 1; + + static void run(const cv::gapi::fluid::View &in, + const std::vector &inv, + cv::gapi::fluid::Buffer &out) { + const uchar *in_ptr = in.InLineB(0); + uchar *out_ptr = out.OutLineB(0); + + const auto length = in.meta().size.width * in.meta().chan; + CV_Assert(length == static_cast(inv.size())); + + for (int c = 0; c < length; c++) { + out_ptr[c] = cv::saturate_cast(in_ptr[c] + inv[c]); + } + } + }; +} // namespace TypesTest + +TEST(GAPI_Streaming_Types, InputScalar) +{ + // This test verifies if Streaming works with Scalar data @ input. + + cv::GMat in_m; + cv::GScalar in_s; + cv::GMat out_m = in_m * in_s; + cv::GComputation c(cv::GIn(in_m, in_s), cv::GOut(out_m)); + + // Input data + cv::Mat in_mat = cv::Mat::eye(256, 256, CV_8UC1); + cv::Scalar in_scl = 32; + + // Run pipeline + auto sc = c.compileStreaming(cv::descr_of(in_mat), cv::descr_of(in_scl)); + sc.setSource(cv::gin(in_mat, in_scl)); + sc.start(); + + for (int i = 0; i < 10; i++) + { + cv::Mat out; + EXPECT_TRUE(sc.pull(cv::gout(out))); + EXPECT_EQ(0., cv::norm(out, in_mat.mul(in_scl), cv::NORM_INF)); + } +} + +TEST(GAPI_Streaming_Types, InputVector) +{ + // This test verifies if Streaming works with Vector data @ input. + + cv::GMat in_m; + cv::GArray in_v; + cv::GMat out_m = TypesTest::AddV::on(in_m, in_v) - in_m; + cv::GComputation c(cv::GIn(in_m, in_v), cv::GOut(out_m)); + + // Input data + cv::Mat in_mat = cv::Mat::eye(256, 256, CV_8UC1); + std::vector in_vec; + TypesTest::OCVSumV::run(in_mat, in_vec); + EXPECT_EQ(std::vector(256,1), in_vec); // self-sanity-check + + auto opencv_ref = [&](const cv::Mat &in, const std::vector &inv, cv::Mat &out) { + cv::Mat tmp = in_mat.clone(); // allocate the same amount of memory as graph does + TypesTest::OCVAddV::run(in, inv, tmp); + out = tmp - in; + }; + + // Run pipeline + auto sc = c.compileStreaming(cv::descr_of(in_mat), + cv::descr_of(in_vec), + cv::compile_args(cv::gapi::kernels())); + sc.setSource(cv::gin(in_mat, in_vec)); + sc.start(); + + for (int i = 0; i < 10; i++) + { + cv::Mat out_mat; + EXPECT_TRUE(sc.pull(cv::gout(out_mat))); + + cv::Mat ref_mat; + opencv_ref(in_mat, in_vec, ref_mat); + EXPECT_EQ(0., cv::norm(ref_mat, out_mat, cv::NORM_INF)); + } +} + +TEST(GAPI_Streaming_Types, XChangeScalar) +{ + // This test verifies if Streaming works when pipeline steps + // (islands) exchange Scalar data. + + initTestDataPath(); + + cv::GMat in; + cv::GScalar m = cv::gapi::mean(in); + cv::GMat tmp = cv::gapi::convertTo(in, CV_32F) - m; + cv::GMat out = cv::gapi::blur(tmp, cv::Size(3,3)); + cv::GComputation c(cv::GIn(in), cv::GOut(cv::gapi::copy(in), + cv::gapi::convertTo(out, CV_8U))); + + auto ocv_ref = [](const cv::Mat &in_mat, cv::Mat &out_mat) { + cv::Scalar ocv_m = cv::mean(in_mat); + cv::Mat ocv_tmp; + in_mat.convertTo(ocv_tmp, CV_32F); + ocv_tmp -= ocv_m; + cv::blur(ocv_tmp, ocv_tmp, cv::Size(3,3)); + ocv_tmp.convertTo(out_mat, CV_8U); + }; + + // Here we want mean & convertTo run on OCV + // and subC & blur3x3 on Fluid. + // FIXME: With the current API it looks quite awful: + auto ocv_kernels = cv::gapi::core::cpu::kernels(); // convertTo + ocv_kernels.remove(); + + auto fluid_kernels = cv::gapi::combine(cv::gapi::core::fluid::kernels(), // subC + cv::gapi::imgproc::fluid::kernels()); // box3x3 + fluid_kernels.remove(); + fluid_kernels.remove(); + + // FIXME: Now + // - fluid kernels take over ocv kernels (including Copy, SubC, & Box3x3) + // - selected kernels (which were removed from the fluid package) remain in OCV + // (ConvertTo + some others) + // FIXME: This is completely awful. User should easily pick up specific kernels + // to an empty kernel package to craft his own but not do it via exclusion. + // Need to expose kernel declarations to public headers to enable kernels<..>() + // on user side. + auto kernels = cv::gapi::combine(ocv_kernels, fluid_kernels); + + // Compile streaming pipeline + auto sc = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(cv::gapi::use_only{kernels})); + sc.setSource(gapi::wip::make_src(findDataFile("cv/video/768x576.avi"))); + sc.start(); + + cv::Mat in_frame; + cv::Mat out_mat_gapi; + cv::Mat out_mat_ref; + + std::size_t num_frames = 0u; + while (sc.pull(cv::gout(in_frame, out_mat_gapi))) { + num_frames++; + ocv_ref(in_frame, out_mat_ref); + EXPECT_EQ(0., cv::norm(out_mat_gapi, out_mat_ref, cv::NORM_INF)); + } + EXPECT_LT(0u, num_frames); +} + +TEST(GAPI_Streaming_Types, XChangeVector) +{ + // This test verifies if Streaming works when pipeline steps + // (islands) exchange Vector data. + + initTestDataPath(); + + cv::GMat in1, in2; + cv::GMat in = cv::gapi::crop(in1, cv::Rect{0,0,576,576}); + cv::GScalar m = cv::gapi::mean(in); + cv::GArray s = TypesTest::SumV::on(in2); // (in2 = eye, so s = [1,0,0,1,..]) + cv::GMat out = TypesTest::AddV::on(in - m, s); + + cv::GComputation c(cv::GIn(in1, in2), cv::GOut(cv::gapi::copy(in), out)); + + auto ocv_ref = [](const cv::Mat &in_mat1, const cv::Mat &in_mat2, cv::Mat &out_mat) { + cv::Mat in_roi = in_mat1(cv::Rect{0,0,576,576}); + cv::Scalar ocv_m = cv::mean(in_roi); + std::vector ocv_v; + TypesTest::OCVSumV::run(in_mat2, ocv_v); + + out_mat.create(cv::Size(576,576), CV_8UC3); + cv::Mat in_tmp = in_roi - ocv_m; + TypesTest::OCVAddV::run(in_tmp, ocv_v, out_mat); + }; + + // Let crop/mean/sumV be calculated via OCV, + // and AddV/subC be calculated via Fluid + auto ocv_kernels = cv::gapi::core::cpu::kernels(); + ocv_kernels.remove(); + ocv_kernels.include(); + + auto fluid_kernels = cv::gapi::core::fluid::kernels(); + fluid_kernels.include(); + + // Here OCV takes precedense over Fluid, whith SubC & SumV remaining + // in Fluid. + auto kernels = cv::gapi::combine(fluid_kernels, ocv_kernels); + + // Compile streaming pipeline + cv::Mat in_eye = cv::Mat::eye(cv::Size(576, 576), CV_8UC3); + auto sc = c.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::GMatDesc{CV_8U,3,cv::Size{576,576}}, + cv::compile_args(cv::gapi::use_only{kernels})); + sc.setSource(cv::gin(gapi::wip::make_src(findDataFile("cv/video/768x576.avi")), + in_eye)); + sc.start(); + + cv::Mat in_frame; + cv::Mat out_mat_gapi; + cv::Mat out_mat_ref; + + std::size_t num_frames = 0u; + while (sc.pull(cv::gout(in_frame, out_mat_gapi))) { + num_frames++; + ocv_ref(in_frame, in_eye, out_mat_ref); + EXPECT_EQ(0., cv::norm(out_mat_gapi, out_mat_ref, cv::NORM_INF)); + } + EXPECT_LT(0u, num_frames); +} + +TEST(GAPI_Streaming_Types, OutputScalar) +{ + // This test verifies if Streaming works when pipeline + // produces scalar data only + + initTestDataPath(); + + cv::GMat in; + cv::GScalar out = cv::gapi::mean(in); + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(out)) + .compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}); + + const auto video_path = findDataFile("cv/video/768x576.avi"); + sc.setSource(gapi::wip::make_src(video_path)); + sc.start(); + + cv::VideoCapture cap; + cap.open(video_path); + + cv::Mat tmp; + cv::Scalar out_scl; + std::size_t num_frames = 0u; + while (sc.pull(cv::gout(out_scl))) + { + num_frames++; + cap >> tmp; + cv::Scalar out_ref = cv::mean(tmp); + EXPECT_EQ(out_ref, out_scl); + } + EXPECT_LT(0u, num_frames); +} + +TEST(GAPI_Streaming_Types, OutputVector) +{ + // This test verifies if Streaming works when pipeline + // produces vector data only + + initTestDataPath(); + auto pkg = cv::gapi::kernels(); + + cv::GMat in1, in2; + cv::GMat roi = cv::gapi::crop(in2, cv::Rect(3,3,256,256)); + cv::GArray out = TypesTest::SumV::on(cv::gapi::mul(roi, in1)); + auto sc = cv::GComputation(cv::GIn(in1, in2), cv::GOut(out)) + .compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{256,256}}, + cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::compile_args(pkg)); + + auto ocv_ref = [](const cv::Mat &ocv_in1, + const cv::Mat &ocv_in2, + std::vector &ocv_out) { + auto ocv_roi = ocv_in2(cv::Rect{3,3,256,256}); + TypesTest::OCVSumV::run(ocv_roi.mul(ocv_in1), ocv_out); + }; + + cv::Mat in_eye = cv::Mat::eye(cv::Size(256, 256), CV_8UC3); + const auto video_path = findDataFile("cv/video/768x576.avi"); + sc.setSource(cv::gin(in_eye, gapi::wip::make_src(video_path))); + sc.start(); + + cv::VideoCapture cap; + cap.open(video_path); + + cv::Mat tmp; + std::vector ref_vec; + std::vector out_vec; + std::size_t num_frames = 0u; + while (sc.pull(cv::gout(out_vec))) + { + num_frames++; + cap >> tmp; + ref_vec.clear(); + ocv_ref(in_eye, tmp, ref_vec); + EXPECT_EQ(ref_vec, out_vec); + } + EXPECT_LT(0u, num_frames); +} + +struct GAPI_Streaming_Unit: public ::testing::Test { + cv::Mat m; + + cv::GComputation cc; + cv::GStreamingCompiled sc; + + cv::GCompiled ref; + + GAPI_Streaming_Unit() + : m(cv::Mat::ones(224,224,CV_8UC3)) + , cc([]{ + cv::GMat a, b; + cv::GMat c = a + b*2; + return cv::GComputation(cv::GIn(a, b), cv::GOut(c)); + }) + { + initTestDataPath(); + + + const auto a_desc = cv::descr_of(m); + const auto b_desc = cv::descr_of(m); + sc = cc.compileStreaming(a_desc, b_desc); + ref = cc.compile(a_desc, b_desc); + } +}; + +TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail) +{ + // FIXME: Meta check doesn't fail here (but ideally it should) + const auto c_ptr = gapi::wip::make_src(findDataFile("cv/video/768x576.avi")); + EXPECT_NO_THROW(sc.setSource(cv::gin(c_ptr, m))); + EXPECT_NO_THROW(sc.setSource(cv::gin(m, c_ptr))); + EXPECT_ANY_THROW(sc.setSource(cv::gin(c_ptr, c_ptr))); +} + +TEST_F(GAPI_Streaming_Unit, TestStartWithoutnSetSource) +{ + EXPECT_ANY_THROW(sc.start()); +} + +TEST_F(GAPI_Streaming_Unit, TestStopWithoutStart1) +{ + // It is ok! + EXPECT_NO_THROW(sc.stop()); +} + +TEST_F(GAPI_Streaming_Unit, TestStopWithoutStart2) +{ + // It should be ok as well + sc.setSource(cv::gin(m, m)); + EXPECT_NO_THROW(sc.stop()); +} + +TEST_F(GAPI_Streaming_Unit, StopStartStop) +{ + cv::Mat out; + EXPECT_NO_THROW(sc.stop()); + EXPECT_NO_THROW(sc.setSource(cv::gin(m, m))); + EXPECT_NO_THROW(sc.start()); + + std::size_t i = 0u; + while (i++ < 10u) {EXPECT_TRUE(sc.pull(cv::gout(out)));}; + + EXPECT_NO_THROW(sc.stop()); +} + +TEST_F(GAPI_Streaming_Unit, ImplicitStop) +{ + EXPECT_NO_THROW(sc.setSource(cv::gin(m, m))); + EXPECT_NO_THROW(sc.start()); + // No explicit stop here - pipeline stops successfully at the test exit +} + +TEST_F(GAPI_Streaming_Unit, StartStopStart_NoSetSource) +{ + EXPECT_NO_THROW(sc.setSource(cv::gin(m, m))); + EXPECT_NO_THROW(sc.start()); + EXPECT_NO_THROW(sc.stop()); + EXPECT_ANY_THROW(sc.start()); // Should fails since setSource was not called +} + +TEST_F(GAPI_Streaming_Unit, StartStopStress_Const) +{ + // Runs 100 times with no deadlock - assumed stable (robust) enough + for (int i = 0; i < 100; i++) + { + sc.stop(); + sc.setSource(cv::gin(m, m)); + sc.start(); + cv::Mat out; + for (int j = 0; j < 5; j++) EXPECT_TRUE(sc.pull(cv::gout(out))); + } +} + +TEST_F(GAPI_Streaming_Unit, StartStopStress_Video) +{ + // Runs 100 times with no deadlock - assumed stable (robust) enough + sc = cc.compileStreaming(cv::GMatDesc{CV_8U,3,cv::Size{768,576}}, + cv::GMatDesc{CV_8U,3,cv::Size{768,576}}); + m = cv::Mat::eye(cv::Size{768,576}, CV_8UC3); + for (int i = 0; i < 100; i++) + { + auto src = cv::gapi::wip::make_src(findDataFile("cv/video/768x576.avi")); + sc.stop(); + sc.setSource(cv::gin(src, m)); + sc.start(); + cv::Mat out; + for (int j = 0; j < 5; j++) EXPECT_TRUE(sc.pull(cv::gout(out))); + } +} + +TEST_F(GAPI_Streaming_Unit, PullNoStart) +{ + sc.setSource(cv::gin(m, m)); + + cv::Mat out; + EXPECT_ANY_THROW(sc.pull(cv::gout(out))); +} + + +TEST_F(GAPI_Streaming_Unit, SetSource_Multi_BeforeStart) +{ + cv::Mat eye = cv::Mat::eye (224, 224, CV_8UC3); + cv::Mat zrs = cv::Mat::zeros(224, 224, CV_8UC3); + + // Call setSource two times, data specified last time + // should be actually processed. + sc.setSource(cv::gin(zrs, zrs)); + sc.setSource(cv::gin(eye, eye)); + + // Run the pipeline, acquire result once + sc.start(); + cv::Mat out, out_ref; + EXPECT_TRUE(sc.pull(cv::gout(out))); + sc.stop(); + + // Pipeline should process `eye` mat, not `zrs` + ref(cv::gin(eye, eye), cv::gout(out_ref)); + EXPECT_EQ(0., cv::norm(out, out_ref, cv::NORM_INF)); +} + +TEST_F(GAPI_Streaming_Unit, SetSource_During_Execution) +{ + cv::Mat zrs = cv::Mat::zeros(224, 224, CV_8UC3); + + sc.setSource(cv::gin(m, m)); + sc.start(); + EXPECT_ANY_THROW(sc.setSource(cv::gin(zrs, zrs))); + EXPECT_ANY_THROW(sc.setSource(cv::gin(zrs, zrs))); + EXPECT_ANY_THROW(sc.setSource(cv::gin(zrs, zrs))); + sc.stop(); +} + +TEST_F(GAPI_Streaming_Unit, SetSource_After_Completion) +{ + sc.setSource(cv::gin(m, m)); + + // Test pipeline with `m` input + sc.start(); + cv::Mat out, out_ref; + EXPECT_TRUE(sc.pull(cv::gout(out))); + sc.stop(); + + // Test against ref + ref(cv::gin(m, m), cv::gout(out_ref)); + EXPECT_EQ(0., cv::norm(out, out_ref, cv::NORM_INF)); + + // Now set another source + cv::Mat eye = cv::Mat::eye(224, 224, CV_8UC3); + sc.setSource(cv::gin(eye, m)); + sc.start(); + EXPECT_TRUE(sc.pull(cv::gout(out))); + sc.stop(); + + // Test against new ref + ref(cv::gin(eye, m), cv::gout(out_ref)); + EXPECT_EQ(0., cv::norm(out, out_ref, cv::NORM_INF)); +} + + +} // namespace opencv_test