Enable stateful kernels in G-API OCV Backend
This commit is contained in:
@@ -33,13 +33,13 @@
|
||||
//
|
||||
// If not, we need to introduce that!
|
||||
using GCPUModel = ade::TypedGraph
|
||||
< cv::gimpl::Unit
|
||||
< cv::gimpl::CPUUnit
|
||||
, cv::gimpl::Protocol
|
||||
>;
|
||||
|
||||
// FIXME: Same issue with Typed and ConstTyped
|
||||
using GConstGCPUModel = ade::ConstTypedGraph
|
||||
< cv::gimpl::Unit
|
||||
< cv::gimpl::CPUUnit
|
||||
, cv::gimpl::Protocol
|
||||
>;
|
||||
|
||||
@@ -53,7 +53,7 @@ namespace
|
||||
{
|
||||
GCPUModel gm(graph);
|
||||
auto cpu_impl = cv::util::any_cast<cv::GCPUKernel>(impl.opaque);
|
||||
gm.metadata(op_node).set(cv::gimpl::Unit{cpu_impl});
|
||||
gm.metadata(op_node).set(cv::gimpl::CPUUnit{cpu_impl});
|
||||
}
|
||||
|
||||
virtual EPtr compile(const ade::Graph &graph,
|
||||
@@ -78,11 +78,23 @@ cv::gimpl::GCPUExecutable::GCPUExecutable(const ade::Graph &g,
|
||||
{
|
||||
// Convert list of operations (which is topologically sorted already)
|
||||
// into an execution script.
|
||||
GConstGCPUModel gcm(m_g);
|
||||
for (auto &nh : nodes)
|
||||
{
|
||||
switch (m_gm.metadata(nh).get<NodeType>().t)
|
||||
{
|
||||
case NodeType::OP: m_script.push_back({nh, GModel::collectOutputMeta(m_gm, nh)}); break;
|
||||
case NodeType::OP:
|
||||
{
|
||||
m_script.push_back({nh, GModel::collectOutputMeta(m_gm, nh)});
|
||||
|
||||
// If kernel is stateful then prepare storage for its state.
|
||||
GCPUKernel k = gcm.metadata(nh).get<CPUUnit>().k;
|
||||
if (k.m_isStateful)
|
||||
{
|
||||
m_nodesToStates[nh] = GArg{ };
|
||||
}
|
||||
break;
|
||||
}
|
||||
case NodeType::DATA:
|
||||
{
|
||||
m_dataNodes.push_back(nh);
|
||||
@@ -104,6 +116,9 @@ cv::gimpl::GCPUExecutable::GCPUExecutable(const ade::Graph &g,
|
||||
default: util::throw_error(std::logic_error("Unsupported NodeType type"));
|
||||
}
|
||||
}
|
||||
|
||||
// For each stateful kernel call 'setup' user callback to initialize state.
|
||||
setupKernelStates();
|
||||
}
|
||||
|
||||
// FIXME: Document what it does
|
||||
@@ -140,6 +155,26 @@ cv::GArg cv::gimpl::GCPUExecutable::packArg(const GArg &arg)
|
||||
}
|
||||
}
|
||||
|
||||
void cv::gimpl::GCPUExecutable::setupKernelStates()
|
||||
{
|
||||
GConstGCPUModel gcm(m_g);
|
||||
for (auto& nodeToState : m_nodesToStates)
|
||||
{
|
||||
auto& kernelNode = nodeToState.first;
|
||||
auto& kernelState = nodeToState.second;
|
||||
|
||||
const GCPUKernel& kernel = gcm.metadata(kernelNode).get<CPUUnit>().k;
|
||||
kernel.m_setupF(GModel::collectInputMeta(m_gm, kernelNode),
|
||||
m_gm.metadata(kernelNode).get<Op>().args,
|
||||
kernelState);
|
||||
}
|
||||
}
|
||||
|
||||
void cv::gimpl::GCPUExecutable::handleNewStream()
|
||||
{
|
||||
m_newStreamStarted = true;
|
||||
}
|
||||
|
||||
void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
|
||||
std::vector<OutObj> &&output_objs)
|
||||
{
|
||||
@@ -167,6 +202,14 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
|
||||
}
|
||||
}
|
||||
|
||||
// In case if new video-stream happens - for each stateful kernel
|
||||
// call 'setup' user callback to re-initialize state.
|
||||
if (m_newStreamStarted)
|
||||
{
|
||||
setupKernelStates();
|
||||
m_newStreamStarted = false;
|
||||
}
|
||||
|
||||
// OpenCV backend execution is not a rocket science at all.
|
||||
// Simply invoke our kernels in the proper order.
|
||||
GConstGCPUModel gcm(m_g);
|
||||
@@ -176,7 +219,7 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
|
||||
|
||||
// Obtain our real execution unit
|
||||
// TODO: Should kernels be copyable?
|
||||
GCPUKernel k = gcm.metadata(op_info.nh).get<Unit>().k;
|
||||
GCPUKernel k = gcm.metadata(op_info.nh).get<CPUUnit>().k;
|
||||
|
||||
// Initialize kernel's execution context:
|
||||
// - Input parameters
|
||||
@@ -185,8 +228,8 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
|
||||
|
||||
using namespace std::placeholders;
|
||||
ade::util::transform(op.args,
|
||||
std::back_inserter(context.m_args),
|
||||
std::bind(&GCPUExecutable::packArg, this, _1));
|
||||
std::back_inserter(context.m_args),
|
||||
std::bind(&GCPUExecutable::packArg, this, _1));
|
||||
|
||||
// - Output parameters.
|
||||
// FIXME: pre-allocate internal Mats, etc, according to the known meta
|
||||
@@ -198,8 +241,14 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
|
||||
context.m_results[out_port] = magazine::getObjPtr(m_res, out_desc);
|
||||
}
|
||||
|
||||
// For stateful kernel add state to its execution context
|
||||
if (k.m_isStateful)
|
||||
{
|
||||
context.m_state = m_nodesToStates.at(op_info.nh);
|
||||
}
|
||||
|
||||
// Now trigger the executable unit
|
||||
k.apply(context);
|
||||
k.m_runF(context);
|
||||
|
||||
//As Kernels are forbidden to allocate memory for (Mat) outputs,
|
||||
//this code seems redundant, at least for Mats
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
namespace cv { namespace gimpl {
|
||||
|
||||
struct Unit
|
||||
struct CPUUnit
|
||||
{
|
||||
static const char *name() { return "HostKernel"; }
|
||||
GCPUKernel k;
|
||||
@@ -48,6 +48,13 @@ class GCPUExecutable final: public GIslandExecutable
|
||||
// Actual data of all resources in graph (both internal and external)
|
||||
Mag m_res;
|
||||
GArg packArg(const GArg &arg);
|
||||
void setupKernelStates();
|
||||
|
||||
// TODO: Check that it is thread-safe
|
||||
std::unordered_map<ade::NodeHandle, GArg,
|
||||
ade::HandleHasher<ade::Node>> m_nodesToStates;
|
||||
|
||||
bool m_newStreamStarted = false;
|
||||
|
||||
public:
|
||||
GCPUExecutable(const ade::Graph &graph,
|
||||
@@ -62,6 +69,8 @@ public:
|
||||
util::throw_error(std::logic_error("GCPUExecutable::reshape() should never be called"));
|
||||
}
|
||||
|
||||
virtual void handleNewStream() override;
|
||||
|
||||
virtual void run(std::vector<InObj> &&input_objs,
|
||||
std::vector<OutObj> &&output_objs) override;
|
||||
};
|
||||
|
||||
@@ -45,13 +45,7 @@ cv::GCPUKernel::GCPUKernel()
|
||||
{
|
||||
}
|
||||
|
||||
cv::GCPUKernel::GCPUKernel(const GCPUKernel::F &f)
|
||||
: m_f(f)
|
||||
cv::GCPUKernel::GCPUKernel(const GCPUKernel::RunF &runF, const GCPUKernel::SetupF &setupF)
|
||||
: m_runF(runF), m_setupF(setupF), m_isStateful(m_setupF != nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
void cv::GCPUKernel::apply(GCPUContext &ctx)
|
||||
{
|
||||
GAPI_Assert(m_f);
|
||||
m_f(ctx);
|
||||
}
|
||||
|
||||
@@ -33,13 +33,13 @@
|
||||
//
|
||||
// If not, we need to introduce that!
|
||||
using GOCLModel = ade::TypedGraph
|
||||
< cv::gimpl::Unit
|
||||
< cv::gimpl::OCLUnit
|
||||
, cv::gimpl::Protocol
|
||||
>;
|
||||
|
||||
// FIXME: Same issue with Typed and ConstTyped
|
||||
using GConstGOCLModel = ade::ConstTypedGraph
|
||||
< cv::gimpl::Unit
|
||||
< cv::gimpl::OCLUnit
|
||||
, cv::gimpl::Protocol
|
||||
>;
|
||||
|
||||
@@ -53,7 +53,7 @@ namespace
|
||||
{
|
||||
GOCLModel gm(graph);
|
||||
auto ocl_impl = cv::util::any_cast<cv::GOCLKernel>(impl.opaque);
|
||||
gm.metadata(op_node).set(cv::gimpl::Unit{ocl_impl});
|
||||
gm.metadata(op_node).set(cv::gimpl::OCLUnit{ocl_impl});
|
||||
}
|
||||
|
||||
virtual EPtr compile(const ade::Graph &graph,
|
||||
@@ -198,7 +198,7 @@ void cv::gimpl::GOCLExecutable::run(std::vector<InObj> &&input_objs,
|
||||
|
||||
// Obtain our real execution unit
|
||||
// TODO: Should kernels be copyable?
|
||||
GOCLKernel k = gcm.metadata(op_info.nh).get<Unit>().k;
|
||||
GOCLKernel k = gcm.metadata(op_info.nh).get<OCLUnit>().k;
|
||||
|
||||
// Initialize kernel's execution context:
|
||||
// - Input parameters
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
|
||||
namespace cv { namespace gimpl {
|
||||
|
||||
struct Unit
|
||||
struct OCLUnit
|
||||
{
|
||||
static const char *name() { return "OCLKernel"; }
|
||||
GOCLKernel k;
|
||||
|
||||
@@ -93,7 +93,7 @@ void cv::gimpl::render::ocv::GRenderExecutable::run(std::vector<InObj> &&input_
|
||||
|
||||
context.m_args.emplace_back(m_ftpr.get());
|
||||
|
||||
k.apply(context);
|
||||
k.m_runF(context);
|
||||
|
||||
for (auto &it : output_objs) magazine::writeBack(m_res, it.first, it.second);
|
||||
}
|
||||
|
||||
@@ -72,6 +72,12 @@ void cv::GCompiled::Priv::reshape(const GMetaArgs& inMetas, const GCompileArgs&
|
||||
m_metas = inMetas;
|
||||
}
|
||||
|
||||
void cv::GCompiled::Priv::prepareForNewStream()
|
||||
{
|
||||
GAPI_Assert(m_exec);
|
||||
m_exec->prepareForNewStream();
|
||||
}
|
||||
|
||||
const cv::gimpl::GModel::Graph& cv::GCompiled::Priv::model() const
|
||||
{
|
||||
GAPI_Assert(nullptr != m_exec);
|
||||
@@ -155,3 +161,8 @@ void cv::GCompiled::reshape(const GMetaArgs& inMetas, const GCompileArgs& args)
|
||||
{
|
||||
m_priv->reshape(inMetas, args);
|
||||
}
|
||||
|
||||
void cv::GCompiled::prepareForNewStream()
|
||||
{
|
||||
m_priv->prepareForNewStream();
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ public:
|
||||
|
||||
bool canReshape() const;
|
||||
void reshape(const GMetaArgs& inMetas, const GCompileArgs &args);
|
||||
void prepareForNewStream();
|
||||
|
||||
void run(cv::gimpl::GRuntimeArgs &&args);
|
||||
const GMetaArgs& metas() const;
|
||||
|
||||
@@ -265,3 +265,11 @@ void cv::gimpl::GExecutor::reshape(const GMetaArgs& inMetas, const GCompileArgs&
|
||||
passes::inferMeta(ctx, true);
|
||||
m_ops[0].isl_exec->reshape(g, args);
|
||||
}
|
||||
|
||||
void cv::gimpl::GExecutor::prepareForNewStream()
|
||||
{
|
||||
for (auto &op : m_ops)
|
||||
{
|
||||
op.isl_exec->handleNewStream();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +91,8 @@ public:
|
||||
bool canReshape() const;
|
||||
void reshape(const GMetaArgs& inMetas, const GCompileArgs& args);
|
||||
|
||||
void prepareForNewStream();
|
||||
|
||||
const GModel::Graph& model() const; // FIXME: make it ConstGraph?
|
||||
};
|
||||
|
||||
|
||||
@@ -800,6 +800,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
||||
}
|
||||
}
|
||||
};
|
||||
bool islandsRecompiled = false;
|
||||
const auto new_meta = cv::descr_of(ins); // 0
|
||||
if (gm.metadata().contains<OriginalInputMeta>()) // (1)
|
||||
{
|
||||
@@ -821,6 +822,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
||||
}
|
||||
update_int_metas(); // (7)
|
||||
m_reshapable = util::make_optional(is_reshapable);
|
||||
|
||||
islandsRecompiled = true;
|
||||
}
|
||||
else // (8)
|
||||
{
|
||||
@@ -929,7 +932,15 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
|
||||
for (auto &&out_eh : op.nh->outNodes()) {
|
||||
out_queues.push_back(reader_queues(*m_island_graph, out_eh));
|
||||
}
|
||||
op.isl_exec->handleNewStream();
|
||||
|
||||
// If Island Executable is recompiled, all its stuff including internal kernel states
|
||||
// are recreated and re-initialized automatically.
|
||||
// But if not, we should notify Island Executable about new started stream to let it update
|
||||
// its internal variables.
|
||||
if (!islandsRecompiled)
|
||||
{
|
||||
op.isl_exec->handleNewStream();
|
||||
}
|
||||
|
||||
m_threads.emplace_back(islandActorThread,
|
||||
op.in_objects,
|
||||
|
||||
Reference in New Issue
Block a user