* G-API-NG/Streaming: Introduced a Streaming API Now a GComputation can be compiled in a special "streaming" way and then "played" on a video stream. Currently only VideoCapture is supported as an input source. * G-API-NG/Streaming: added threading & real streaming * G-API-NG/Streaming: Added tests & docs on Copy kernel - Added very simple pipeline tests, not all data types are covered yet (in fact, only GMat is tested now); - Started testing non-OCV backends in the streaming mode; - Added required fixes to Fluid backend, likely it works OK now; - Added required fixes to OCL backend, and now it is likely broken - Also added a UMat-based (OCL) version of Copy kernel * G-API-NG/Streaming: Added own concurrent queue class - Used only if TBB is not available * G-API-NG/Streaming: Fixing various issues - Added missing header to CMakeLists.txt - Fixed various CI issues and warnings * G-API-NG/Streaming: Fixed a compile-time GScalar queue deadlock - GStreamingExecutor blindly created island's input queues for compile-time (value-initialized) GScalars which didn't have any producers, making island actor threads wait there forever * G-API-NG/Streaming: Dropped own version of Copy kernel One was added into master already * G-API-NG/Streaming: Addressed GArray<T> review comments - Added tests on mov() - Removed unnecessary changes in garray.hpp * G-API-NG/Streaming: Added Doxygen comments to new public APIs Also fixed some other comments in the code * G-API-NG/Streaming: Removed debug info, added some comments & renamed vars * G-API-NG/Streaming: Fixed own-vs-cv abstraction leak - Now every island is triggered with own:: (instead of cv::) data objects as inputs; - Changes in Fluid backend required to support cv::Mat/Scalar were reverted; * G-API-NG/Streaming: use holds_alternative<> instead of index/index_of test - Also fixed regression test comments - Also added metadata check comments for GStreamingCompiled * G-API-NG/Streaming: Made start()/stop() more robust - Fixed various possible deadlocks - Unified the shutdown code - Added more tests covering different corner cases on start/stop * G-API-NG/Streaming: Finally fixed Windows crashes In fact the problem hasn't been Windows-only. Island thread popped data from queues without preserving the Cmd objects and without taking the ownership over data acquired so when islands started to process the data, this data may be already freed. Linux version worked only by occasion. * G-API-NG/Streaming: Fixed (I hope so) Windows warnings * G-API-NG/Streaming: fixed typos in internal comments - Also added some more explanation on Streaming/OpenCL status * G-API-NG/Streaming: Added more unit tests on streaming - Various start()/stop()/setSource() call flow combinations * G-API-NG/Streaming: Added tests on own concurrent bounded queue * G-API-NG/Streaming: Added more tests on various data types, + more - Vector/Scalar passed as input; - Vector/Scalar passed in-between islands; - Some more assertions; - Also fixed a deadlock problem when inputs are mixed (1 constant, 1 stream) * G-API-NG/Streaming: Added tests on output data types handling - Vector - Scalar * G-API-NG/Streaming: Fixed test issues with IE + Windows warnings * G-API-NG/Streaming: Decoupled G-API from videoio - Now the core G-API doesn't use a cv::VideoCapture directly, it comes in via an abstract interface; - Polished a little bit the setSource()/start()/stop() semantics, now setSource() is mandatory before ANY call to start(). * G-API-NG/Streaming: Fix STANDALONE build (errors brought by render)
772 lines
29 KiB
C++
772 lines
29 KiB
C++
// This file is part of OpenCV project.
|
|
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
|
// of this distribution and at http://opencv.org/license.html.
|
|
//
|
|
// Copyright (C) 2019 Intel Corporation
|
|
|
|
#include "precomp.hpp"
|
|
|
|
#include <iostream>
|
|
|
|
#include <ade/util/zip_range.hpp>
|
|
|
|
#include <opencv2/gapi/opencv_includes.hpp>
|
|
|
|
#include "executor/gstreamingexecutor.hpp"
|
|
#include "compiler/passes/passes.hpp"
|
|
#include "backends/common/gbackend.hpp" // createMat
|
|
|
|
namespace
|
|
{
|
|
using namespace cv::gimpl::stream;
|
|
|
|
#if !defined(GAPI_STANDALONE)
|
|
class VideoEmitter final: public cv::gimpl::GIslandEmitter {
|
|
cv::gapi::wip::IStreamSource::Ptr src;
|
|
|
|
virtual bool pull(cv::GRunArg &arg) override {
|
|
// FIXME: probably we can maintain a pool of (then) pre-allocated
|
|
// buffers to avoid runtime allocations.
|
|
// Pool size can be determined given the internal queue size.
|
|
cv::gapi::wip::Data newData;
|
|
if (!src->pull(newData)) {
|
|
return false;
|
|
}
|
|
arg = std::move(static_cast<cv::GRunArg&>(newData));
|
|
return true;
|
|
}
|
|
public:
|
|
explicit VideoEmitter(const cv::GRunArg &arg) {
|
|
src = cv::util::get<cv::gapi::wip::IStreamSource::Ptr>(arg);
|
|
}
|
|
};
|
|
#endif // GAPI_STANDALONE
|
|
|
|
class ConstEmitter final: public cv::gimpl::GIslandEmitter {
|
|
cv::GRunArg m_arg;
|
|
|
|
virtual bool pull(cv::GRunArg &arg) override {
|
|
arg = const_cast<const cv::GRunArg&>(m_arg); // FIXME: variant workaround
|
|
return true;
|
|
}
|
|
public:
|
|
|
|
explicit ConstEmitter(const cv::GRunArg &arg) : m_arg(arg) {
|
|
}
|
|
};
|
|
|
|
struct DataQueue {
|
|
static const char *name() { return "StreamingDataQueue"; }
|
|
|
|
explicit DataQueue(std::size_t capacity) {
|
|
if (capacity) {
|
|
q.set_capacity(capacity);
|
|
}
|
|
}
|
|
|
|
cv::gimpl::stream::Q q;
|
|
};
|
|
|
|
std::vector<cv::gimpl::stream::Q*> reader_queues( ade::Graph &g,
|
|
const ade::NodeHandle &obj)
|
|
{
|
|
ade::TypedGraph<DataQueue> qgr(g);
|
|
std::vector<cv::gimpl::stream::Q*> result;
|
|
for (auto &&out_eh : obj->outEdges())
|
|
{
|
|
result.push_back(&qgr.metadata(out_eh).get<DataQueue>().q);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
std::vector<cv::gimpl::stream::Q*> input_queues( ade::Graph &g,
|
|
const ade::NodeHandle &obj)
|
|
{
|
|
ade::TypedGraph<DataQueue> qgr(g);
|
|
std::vector<cv::gimpl::stream::Q*> result;
|
|
for (auto &&in_eh : obj->inEdges())
|
|
{
|
|
result.push_back(qgr.metadata(in_eh).contains<DataQueue>()
|
|
? &qgr.metadata(in_eh).get<DataQueue>().q
|
|
: nullptr);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs)
|
|
{
|
|
namespace own = cv::gapi::own;
|
|
|
|
for (auto && it : ade::util::zip(ade::util::toRange(outputs),
|
|
ade::util::toRange(results)))
|
|
{
|
|
auto &out_obj = std::get<0>(it);
|
|
auto &res_obj = std::get<1>(it);
|
|
|
|
// FIXME: this conversion should be unified
|
|
using T = cv::GRunArgP;
|
|
switch (out_obj.index())
|
|
{
|
|
#if !defined(GAPI_STANDALONE)
|
|
case T::index_of<cv::Mat*>():
|
|
*cv::util::get<cv::Mat*>(out_obj) = std::move(cv::util::get<cv::Mat>(res_obj));
|
|
break;
|
|
case T::index_of<cv::Scalar*>():
|
|
*cv::util::get<cv::Scalar*>(out_obj) = std::move(cv::util::get<cv::Scalar>(res_obj));
|
|
break;
|
|
#endif // !GAPI_STANDALONE
|
|
case T::index_of<own::Mat*>():
|
|
*cv::util::get<own::Mat*>(out_obj) = std::move(cv::util::get<own::Mat>(res_obj));
|
|
break;
|
|
case T::index_of<own::Scalar*>():
|
|
*cv::util::get<own::Scalar*>(out_obj) = std::move(cv::util::get<own::Scalar>(res_obj));
|
|
break;
|
|
case T::index_of<cv::detail::VectorRef>():
|
|
cv::util::get<cv::detail::VectorRef>(out_obj).mov(cv::util::get<cv::detail::VectorRef>(res_obj));
|
|
break;
|
|
default:
|
|
GAPI_Assert(false && "This value type is not supported!"); // ...maybe because of STANDALONE mode.
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// This thread is a plain dump source actor. What it do is just:
|
|
// - Check input queue (the only one) for a control command
|
|
// - Depending on the state, obtains next data object and pushes it to the
|
|
// pipeline.
|
|
void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
|
|
Q& in_queue,
|
|
std::vector<Q*> out_queues,
|
|
std::function<void()> cb_completion)
|
|
{
|
|
// Wait for the explicit Start command.
|
|
// ...or Stop command, this also happens.
|
|
Cmd cmd;
|
|
in_queue.pop(cmd);
|
|
GAPI_Assert( cv::util::holds_alternative<Start>(cmd)
|
|
|| cv::util::holds_alternative<Stop>(cmd));
|
|
if (cv::util::holds_alternative<Stop>(cmd))
|
|
{
|
|
for (auto &&oq : out_queues) oq->push(cmd);
|
|
return;
|
|
}
|
|
|
|
// Now start emitting the data from the source to the pipeline.
|
|
while (true)
|
|
{
|
|
Cmd cancel;
|
|
if (in_queue.try_pop(cancel))
|
|
{
|
|
// if we just popped a cancellation command...
|
|
GAPI_Assert(cv::util::holds_alternative<Stop>(cancel));
|
|
// Broadcast it to the readers and quit.
|
|
for (auto &&oq : out_queues) oq->push(cancel);
|
|
return;
|
|
}
|
|
|
|
// Try to obrain next data chunk from the source
|
|
cv::GRunArg data;
|
|
if (emitter->pull(data))
|
|
{
|
|
// // On success, broadcast it to our readers
|
|
for (auto &&oq : out_queues)
|
|
{
|
|
// FIXME: FOR SOME REASON, oq->push(Cmd{data}) doesn't work!!
|
|
// empty mats are arrived to the receivers!
|
|
// There may be a fatal bug in our variant!
|
|
const auto tmp = data;
|
|
oq->push(Cmd{tmp});
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Otherwise, broadcast STOP message to our readers and quit.
|
|
// This usually means end-of-stream, so trigger a callback
|
|
for (auto &&oq : out_queues) oq->push(Cmd{Stop{}});
|
|
if (cb_completion) cb_completion();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// This thread is a plain dumb processing actor. What it do is just:
|
|
// - Reads input from the input queue(s), sleeps if there's nothing to read
|
|
// - Once a full input vector is obtained, passes it to the underlying island
|
|
// executable for processing.
|
|
// - Pushes processing results down to consumers - to the subsequent queues.
|
|
// Note: Every data object consumer has its own queue.
|
|
void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, // FIXME: this is...
|
|
std::vector<cv::gimpl::RcDesc> out_rcs, // FIXME: ...basically just...
|
|
cv::GMetaArgs out_metas, // ...
|
|
std::shared_ptr<cv::gimpl::GIslandExecutable> island, // FIXME: ...a copy of OpDesc{}.
|
|
std::vector<Q*> in_queues,
|
|
std::vector<cv::GRunArg> in_constants,
|
|
std::vector< std::vector<Q*> > out_queues)
|
|
{
|
|
GAPI_Assert(in_queues.size() == in_rcs.size());
|
|
GAPI_Assert(out_queues.size() == out_rcs.size());
|
|
GAPI_Assert(out_queues.size() == out_metas.size());
|
|
while (true)
|
|
{
|
|
std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
|
|
isl_inputs.resize(in_rcs.size());
|
|
|
|
// Try to obtain the full input vector.
|
|
// Note this may block us. We also may get Stop signal here
|
|
// and then exit the thread.
|
|
// NOTE: in order to maintain the GRunArg's underlying object
|
|
// lifetime, keep the whole cmd vector (of size == # of inputs)
|
|
// in memory.
|
|
std::vector<Cmd> cmd(in_queues.size());
|
|
for (auto &&it : ade::util::indexed(in_queues))
|
|
{
|
|
auto id = ade::util::index(it);
|
|
auto &q = ade::util::value(it);
|
|
|
|
isl_inputs[id].first = in_rcs[id];
|
|
if (q == nullptr)
|
|
{
|
|
// NULL queue means a graph-constant value
|
|
// (like a value-initialized scalar)
|
|
// FIXME: Variant move problem
|
|
isl_inputs[id].second = const_cast<const cv::GRunArg&>(in_constants[id]);
|
|
}
|
|
else
|
|
{
|
|
q->pop(cmd[id]);
|
|
if (cv::util::holds_alternative<Stop>(cmd[id]))
|
|
{
|
|
// FIXME: This logic must be unified with what collectorThread is doing!
|
|
// Just got a stop sign. Reiterate through all queues
|
|
// and rewind data to every Stop sign per queue
|
|
for (auto &&qit : ade::util::indexed(in_queues))
|
|
{
|
|
auto id2 = ade::util::index(qit);
|
|
auto &q2 = ade::util::value(qit);
|
|
if (id == id2) continue;
|
|
|
|
Cmd cmd2;
|
|
while (q2 && !cv::util::holds_alternative<Stop>(cmd2))
|
|
q2->pop(cmd2);
|
|
}
|
|
// Broadcast Stop down to the pipeline and quit
|
|
for (auto &&out_qq : out_queues)
|
|
{
|
|
for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
|
|
}
|
|
return;
|
|
}
|
|
// FIXME: MOVE PROBLEM
|
|
const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd[id]);
|
|
#if defined(GAPI_STANDALONE)
|
|
// Standalone mode - simply store input argument in the vector as-is
|
|
isl_inputs[id].second = in_arg;
|
|
#else
|
|
// Make Islands operate on own:: data types (i.e. in the same
|
|
// environment as GExecutor provides)
|
|
// This way several backends (e.g. Fluid) remain OpenCV-independent.
|
|
switch (in_arg.index()) {
|
|
case cv::GRunArg::index_of<cv::Mat>():
|
|
isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
|
|
break;
|
|
case cv::GRunArg::index_of<cv::Scalar>():
|
|
isl_inputs[id].second = cv::GRunArg{cv::to_own(cv::util::get<cv::Scalar>(in_arg))};
|
|
break;
|
|
default:
|
|
isl_inputs[id].second = in_arg;
|
|
break;
|
|
}
|
|
#endif // GAPI_STANDALONE
|
|
}
|
|
}
|
|
// Once the vector is obtained, prepare data for island execution
|
|
// Note - we first allocate output vector via GRunArg!
|
|
// Then it is converted to a GRunArgP.
|
|
std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
|
|
std::vector<cv::GRunArg> out_data;
|
|
isl_outputs.resize(out_rcs.size());
|
|
out_data.resize(out_rcs.size());
|
|
for (auto &&it : ade::util::indexed(out_rcs))
|
|
{
|
|
auto id = ade::util::index(it);
|
|
auto &r = ade::util::value(it);
|
|
|
|
#if !defined(GAPI_STANDALONE)
|
|
using MatType = cv::Mat;
|
|
using SclType = cv::Scalar;
|
|
#else
|
|
using MatType = cv::gapi::own::Mat;
|
|
using SclType = cv::gapi::own::Scalar;
|
|
#endif // GAPI_STANDALONE
|
|
|
|
switch (r.shape) {
|
|
// Allocate a data object based on its shape & meta, and put it into our vectors.
|
|
// Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
|
|
// pointer as an output parameter - to make sure that after island completes,
|
|
// our GRunArg still has the right (up-to-date) value.
|
|
// Same applies to other types.
|
|
// FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
|
|
case cv::GShape::GMAT:
|
|
{
|
|
MatType newMat;
|
|
cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(out_metas[id]), newMat);
|
|
out_data[id] = cv::GRunArg(std::move(newMat));
|
|
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<MatType>(out_data[id])) };
|
|
}
|
|
break;
|
|
case cv::GShape::GSCALAR:
|
|
{
|
|
SclType newScl;
|
|
out_data[id] = cv::GRunArg(std::move(newScl));
|
|
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<SclType>(out_data[id])) };
|
|
}
|
|
break;
|
|
case cv::GShape::GARRAY:
|
|
{
|
|
cv::detail::VectorRef newVec;
|
|
cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
|
|
out_data[id] = cv::GRunArg(std::move(newVec));
|
|
// VectorRef is implicitly shared so no pointer is taken here
|
|
const auto &rr = cv::util::get<cv::detail::VectorRef>(out_data[id]); // FIXME: that variant MOVE problem again
|
|
isl_outputs[id] = { r, cv::GRunArgP(rr) };
|
|
}
|
|
break;
|
|
default:
|
|
cv::util::throw_error(std::logic_error("Unsupported GShape"));
|
|
break;
|
|
}
|
|
}
|
|
// Now ask Island to execute on this data
|
|
island->run(std::move(isl_inputs), std::move(isl_outputs));
|
|
|
|
// Once executed, dispatch our results down to the pipeline.
|
|
for (auto &&it : ade::util::zip(ade::util::toRange(out_queues),
|
|
ade::util::toRange(out_data)))
|
|
{
|
|
for (auto &&q : std::get<0>(it))
|
|
{
|
|
// FIXME: FATAL VARIANT ISSUE!!
|
|
const auto tmp = std::get<1>(it);
|
|
q->push(Cmd{tmp});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// The idea of collectorThread is easy. If there're multiple outputs
|
|
// in the graph, we need to pull an object from every associated queue
|
|
// and then put the resulting vector into one single queue. While it
|
|
// looks redundant, it simplifies dramatically the way how try_pull()
|
|
// is implemented - we need to check one queue instead of many.
|
|
void collectorThread(std::vector<Q*> in_queues,
|
|
Q& out_queue)
|
|
{
|
|
while (true)
|
|
{
|
|
cv::GRunArgs this_result(in_queues.size());
|
|
for (auto &&it : ade::util::indexed(in_queues))
|
|
{
|
|
Cmd cmd;
|
|
ade::util::value(it)->pop(cmd);
|
|
if (cv::util::holds_alternative<Stop>(cmd))
|
|
{
|
|
// FIXME: Unify this code with island thread
|
|
for (auto &&qit : ade::util::indexed(in_queues))
|
|
{
|
|
if (ade::util::index(qit) == ade::util::index(it)) continue;
|
|
Cmd cmd2;
|
|
while (!cv::util::holds_alternative<Stop>(cmd2))
|
|
ade::util::value(qit)->pop(cmd2);
|
|
}
|
|
out_queue.push(Cmd{Stop{}});
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
// FIXME: MOVE_PROBLEM
|
|
const cv::GRunArg &in_arg = cv::util::get<cv::GRunArg>(cmd);
|
|
this_result[ade::util::index(it)] = in_arg;
|
|
// FIXME: Check for other message types.
|
|
}
|
|
}
|
|
out_queue.push(Cmd{this_result});
|
|
}
|
|
}
|
|
} // anonymous namespace
|
|
|
|
cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&g_model)
|
|
: m_orig_graph(std::move(g_model))
|
|
, m_island_graph(GModel::Graph(*m_orig_graph).metadata()
|
|
.get<IslandModel>().model)
|
|
, m_gim(*m_island_graph)
|
|
{
|
|
GModel::Graph gm(*m_orig_graph);
|
|
// NB: Right now GIslandModel is acyclic, and all the below code assumes that.
|
|
// NB: This naive execution code is taken from GExecutor nearly "as-is"
|
|
|
|
const auto proto = gm.metadata().get<Protocol>();
|
|
m_emitters .resize(proto.in_nhs.size());
|
|
m_emitter_queues.resize(proto.in_nhs.size());
|
|
m_sinks .resize(proto.out_nhs.size());
|
|
m_sink_queues .resize(proto.out_nhs.size());
|
|
|
|
// Very rough estimation to limit internal queue sizes.
|
|
// Pipeline depth is equal to number of its (pipeline) steps.
|
|
const auto queue_capacity = std::count_if
|
|
(m_gim.nodes().begin(),
|
|
m_gim.nodes().end(),
|
|
[&](ade::NodeHandle nh) {
|
|
return m_gim.metadata(nh).get<NodeKind>().k == NodeKind::ISLAND;
|
|
});
|
|
|
|
auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
|
|
for (auto nh : sorted.nodes())
|
|
{
|
|
switch (m_gim.metadata(nh).get<NodeKind>().k)
|
|
{
|
|
case NodeKind::ISLAND:
|
|
{
|
|
std::vector<RcDesc> input_rcs;
|
|
std::vector<RcDesc> output_rcs;
|
|
std::vector<GRunArg> in_constants;
|
|
cv::GMetaArgs output_metas;
|
|
input_rcs.reserve(nh->inNodes().size());
|
|
in_constants.reserve(nh->inNodes().size()); // FIXME: Ugly
|
|
output_rcs.reserve(nh->outNodes().size());
|
|
output_metas.reserve(nh->outNodes().size());
|
|
|
|
std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node> > const_ins;
|
|
|
|
// FIXME: THIS ORDER IS IRRELEVANT TO PROTOCOL OR ANY OTHER ORDER!
|
|
// FIXME: SAME APPLIES TO THE REGULAR GEEXECUTOR!!
|
|
auto xtract_in = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec) {
|
|
const auto orig_data_nh
|
|
= m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
|
|
const auto &orig_data_info
|
|
= gm.metadata(orig_data_nh).get<Data>();
|
|
if (orig_data_info.storage == Data::Storage::CONST_VAL) {
|
|
const_ins.insert(slot_nh);
|
|
// FIXME: Variant move issue
|
|
in_constants.push_back(const_cast<const cv::GRunArg&>(gm.metadata(orig_data_nh).get<ConstValue>().arg));
|
|
} else in_constants.push_back(cv::GRunArg{}); // FIXME: Make it in some smarter way pls
|
|
if (orig_data_info.shape == GShape::GARRAY) {
|
|
// FIXME: GArray lost host constructor problem
|
|
GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
|
|
}
|
|
vec.emplace_back(RcDesc{ orig_data_info.rc
|
|
, orig_data_info.shape
|
|
, orig_data_info.ctor});
|
|
};
|
|
auto xtract_out = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec, cv::GMetaArgs &metas) {
|
|
const auto orig_data_nh
|
|
= m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
|
|
const auto &orig_data_info
|
|
= gm.metadata(orig_data_nh).get<Data>();
|
|
if (orig_data_info.shape == GShape::GARRAY) {
|
|
// FIXME: GArray lost host constructor problem
|
|
GAPI_Assert(!cv::util::holds_alternative<cv::util::monostate>(orig_data_info.ctor));
|
|
}
|
|
vec.emplace_back(RcDesc{ orig_data_info.rc
|
|
, orig_data_info.shape
|
|
, orig_data_info.ctor});
|
|
metas.emplace_back(orig_data_info.meta);
|
|
};
|
|
// FIXME: JEZ IT WAS SO AWFUL!!!!
|
|
for (auto in_slot_nh : nh->inNodes()) xtract_in(in_slot_nh, input_rcs);
|
|
for (auto out_slot_nh : nh->outNodes()) xtract_out(out_slot_nh, output_rcs, output_metas);
|
|
|
|
m_ops.emplace_back(OpDesc{ std::move(input_rcs)
|
|
, std::move(output_rcs)
|
|
, std::move(output_metas)
|
|
, nh
|
|
, in_constants
|
|
, m_gim.metadata(nh).get<IslandExec>().object});
|
|
|
|
// Initialize queues for every operation's input
|
|
ade::TypedGraph<DataQueue> qgr(*m_island_graph);
|
|
for (auto eh : nh->inEdges())
|
|
{
|
|
// ...only if the data is not compile-const
|
|
if (const_ins.count(eh->srcNode()) == 0) {
|
|
qgr.metadata(eh).set(DataQueue(queue_capacity));
|
|
m_internal_queues.insert(&qgr.metadata(eh).get<DataQueue>().q);
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
case NodeKind::SLOT:
|
|
{
|
|
const auto orig_data_nh
|
|
= m_gim.metadata(nh).get<DataSlot>().original_data_node;
|
|
m_slots.emplace_back(DataDesc{nh, orig_data_nh});
|
|
}
|
|
break;
|
|
case NodeKind::EMIT:
|
|
{
|
|
const auto emitter_idx
|
|
= m_gim.metadata(nh).get<Emitter>().proto_index;
|
|
GAPI_Assert(emitter_idx < m_emitters.size());
|
|
m_emitters[emitter_idx] = nh;
|
|
}
|
|
break;
|
|
case NodeKind::SINK:
|
|
{
|
|
const auto sink_idx
|
|
= m_gim.metadata(nh).get<Sink>().proto_index;
|
|
GAPI_Assert(sink_idx < m_sinks.size());
|
|
m_sinks[sink_idx] = nh;
|
|
|
|
// Also initialize Sink's input queue
|
|
ade::TypedGraph<DataQueue> qgr(*m_island_graph);
|
|
GAPI_Assert(nh->inEdges().size() == 1u);
|
|
qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity));
|
|
m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get<DataQueue>().q;
|
|
}
|
|
break;
|
|
default:
|
|
GAPI_Assert(false);
|
|
break;
|
|
} // switch(kind)
|
|
} // for(gim nodes)
|
|
m_out_queue.set_capacity(queue_capacity);
|
|
}
|
|
|
|
cv::gimpl::GStreamingExecutor::~GStreamingExecutor()
|
|
{
|
|
if (state == State::READY || state == State::RUNNING)
|
|
stop();
|
|
}
|
|
|
|
void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
|
{
|
|
GAPI_Assert(state == State::READY || state == State::STOPPED);
|
|
|
|
const auto is_video = [](const GRunArg &arg) {
|
|
return util::holds_alternative<cv::gapi::wip::IStreamSource::Ptr>(arg);
|
|
};
|
|
const auto num_videos = std::count_if(ins.begin(), ins.end(), is_video);
|
|
if (num_videos > 1u)
|
|
{
|
|
// See below why (another reason - no documented behavior
|
|
// on handling videos streams of different length)
|
|
util::throw_error(std::logic_error("Only one video source is"
|
|
" currently supported!"));
|
|
}
|
|
|
|
// Walk through the protocol, set-up emitters appropriately
|
|
// There's a 1:1 mapping between emitters and corresponding data inputs.
|
|
for (auto it : ade::util::zip(ade::util::toRange(m_emitters),
|
|
ade::util::toRange(ins),
|
|
ade::util::iota(m_emitters.size())))
|
|
{
|
|
auto emit_nh = std::get<0>(it);
|
|
auto& emit_arg = std::get<1>(it);
|
|
auto emit_idx = std::get<2>(it);
|
|
auto& emitter = m_gim.metadata(emit_nh).get<Emitter>().object;
|
|
|
|
using T = GRunArg;
|
|
switch (emit_arg.index())
|
|
{
|
|
// Create a streaming emitter.
|
|
// Produces the next video frame when pulled.
|
|
case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
|
|
#if !defined(GAPI_STANDALONE)
|
|
emitter.reset(new VideoEmitter{emit_arg});
|
|
#else
|
|
util::throw_error(std::logic_error("Video is not supported in the "
|
|
"standalone mode"));
|
|
#endif
|
|
break;
|
|
default:
|
|
// Create a constant emitter.
|
|
// Produces always the same ("constant") value when pulled.
|
|
emitter.reset(new ConstEmitter{emit_arg});
|
|
m_const_emitter_queues.push_back(&m_emitter_queues[emit_idx]);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// FIXME: The below code assumes our graph may have only one
|
|
// real video source (and so, only one stream which may really end)
|
|
// all other inputs are "constant" generators.
|
|
// Craft here a completion callback to notify Const emitters that
|
|
// a video source is over
|
|
auto real_video_completion_cb = [this]() {
|
|
for (auto q : m_const_emitter_queues) q->push(Cmd{Stop{}});
|
|
};
|
|
|
|
// FIXME: ONLY now, after all executable objects are created,
|
|
// we can set up our execution threads. Let's do it.
|
|
// First create threads for all the emitters.
|
|
// FIXME: One way to avoid this may be including an Emitter object as a part of
|
|
// START message. Why not?
|
|
if (state == State::READY)
|
|
{
|
|
stop();
|
|
}
|
|
|
|
for (auto it : ade::util::indexed(m_emitters))
|
|
{
|
|
const auto id = ade::util::index(it); // = index in GComputation's protocol
|
|
const auto eh = ade::util::value(it);
|
|
|
|
// Prepare emitter thread parameters
|
|
auto emitter = m_gim.metadata(eh).get<Emitter>().object;
|
|
|
|
// Collect all reader queues from the emitter's the only output object
|
|
auto out_queues = reader_queues(*m_island_graph, eh->outNodes().front());
|
|
|
|
m_threads.emplace_back(emitterActorThread,
|
|
emitter,
|
|
std::ref(m_emitter_queues[id]),
|
|
out_queues,
|
|
real_video_completion_cb);
|
|
}
|
|
|
|
// Now do this for every island (in a topological order)
|
|
for (auto &&op : m_ops)
|
|
{
|
|
// Prepare island thread parameters
|
|
auto island = m_gim.metadata(op.nh).get<IslandExec>().object;
|
|
|
|
// Collect actor's input queues
|
|
auto in_queues = input_queues(*m_island_graph, op.nh);
|
|
|
|
// Collect actor's output queues.
|
|
// This may be tricky...
|
|
std::vector< std::vector<stream::Q*> > out_queues;
|
|
for (auto &&out_eh : op.nh->outNodes()) {
|
|
out_queues.push_back(reader_queues(*m_island_graph, out_eh));
|
|
}
|
|
|
|
m_threads.emplace_back(islandActorThread,
|
|
op.in_objects,
|
|
op.out_objects,
|
|
op.out_metas,
|
|
island,
|
|
in_queues,
|
|
op.in_constants,
|
|
out_queues);
|
|
}
|
|
|
|
// Finally, start a collector thread.
|
|
m_threads.emplace_back(collectorThread,
|
|
m_sink_queues,
|
|
std::ref(m_out_queue));
|
|
state = State::READY;
|
|
}
|
|
|
|
void cv::gimpl::GStreamingExecutor::start()
|
|
{
|
|
if (state == State::STOPPED)
|
|
{
|
|
util::throw_error(std::logic_error("Please call setSource() before start() "
|
|
"if the pipeline has been already stopped"));
|
|
}
|
|
GAPI_Assert(state == State::READY);
|
|
|
|
// Currently just trigger our emitters to work
|
|
state = State::RUNNING;
|
|
for (auto &q : m_emitter_queues)
|
|
{
|
|
q.push(stream::Cmd{stream::Start{}});
|
|
}
|
|
}
|
|
|
|
void cv::gimpl::GStreamingExecutor::wait_shutdown()
|
|
{
|
|
// This utility is used by pull/try_pull/stop() to uniformly
|
|
// shutdown the worker threads.
|
|
// FIXME: Of course it can be designed much better
|
|
for (auto &t : m_threads) t.join();
|
|
m_threads.clear();
|
|
|
|
// Clear all queues
|
|
// If there are constant emitters, internal queues
|
|
// may be polluted with constant values and have extra
|
|
// data at the point of shutdown.
|
|
// It usually happens when there's multiple inputs,
|
|
// one constant and one is not, and the latter ends (e.g.
|
|
// with end-of-stream).
|
|
for (auto &q : m_emitter_queues) q.clear();
|
|
for (auto &q : m_sink_queues) q->clear();
|
|
for (auto &q : m_internal_queues) q->clear();
|
|
m_out_queue.clear();
|
|
|
|
state = State::STOPPED;
|
|
}
|
|
|
|
bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs)
|
|
{
|
|
if (state == State::STOPPED)
|
|
return false;
|
|
GAPI_Assert(state == State::RUNNING);
|
|
GAPI_Assert(m_sink_queues.size() == outs.size());
|
|
|
|
Cmd cmd;
|
|
m_out_queue.pop(cmd);
|
|
if (cv::util::holds_alternative<Stop>(cmd))
|
|
{
|
|
wait_shutdown();
|
|
return false;
|
|
}
|
|
|
|
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
|
|
cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
|
|
sync_data(this_result, outs);
|
|
return true;
|
|
}
|
|
|
|
bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs)
|
|
{
|
|
if (state == State::STOPPED)
|
|
return false;
|
|
|
|
GAPI_Assert(m_sink_queues.size() == outs.size());
|
|
|
|
Cmd cmd;
|
|
if (!m_out_queue.try_pop(cmd)) {
|
|
return false;
|
|
}
|
|
if (cv::util::holds_alternative<Stop>(cmd))
|
|
{
|
|
wait_shutdown();
|
|
return false;
|
|
}
|
|
|
|
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(cmd));
|
|
cv::GRunArgs &this_result = cv::util::get<cv::GRunArgs>(cmd);
|
|
sync_data(this_result, outs);
|
|
return true;
|
|
}
|
|
|
|
void cv::gimpl::GStreamingExecutor::stop()
|
|
{
|
|
if (state == State::STOPPED)
|
|
return;
|
|
|
|
// FIXME: ...and how to deal with still-unread data then?
|
|
// Push a Stop message to the every emitter,
|
|
// wait until it broadcasts within the pipeline,
|
|
// FIXME: worker threads could stuck on push()!
|
|
// need to read the output queues until Stop!
|
|
for (auto &q : m_emitter_queues) {
|
|
q.push(stream::Cmd{stream::Stop{}});
|
|
}
|
|
|
|
// Pull messages from the final queue to ensure completion
|
|
Cmd cmd;
|
|
while (!cv::util::holds_alternative<Stop>(cmd))
|
|
{
|
|
m_out_queue.pop(cmd);
|
|
}
|
|
GAPI_Assert(cv::util::holds_alternative<Stop>(cmd));
|
|
wait_shutdown();
|
|
}
|
|
|
|
bool cv::gimpl::GStreamingExecutor::running() const
|
|
{
|
|
return (state == State::RUNNING);
|
|
}
|