Fluid Internal Parallelism

- Added new graph compile time argument to specify multiple independent
ROIs (Tiles)
 - Added new "executable" with serial loop other user specified
ROIs(Tiles)
 - refactored graph traversal code into separate function to be called
once
 - added saturate cast to Fluid AddCsimple test kernel
This commit is contained in:
Anton Potapov
2019-07-03 11:35:54 +03:00
parent 097d81363b
commit 97e88bd769
5 changed files with 370 additions and 34 deletions
+108 -29
View File
@@ -91,7 +91,13 @@ namespace
cv::util::throw_error(std::logic_error("GFluidOutputRois feature supports only one-island graphs"));
auto rois = out_rois.value_or(cv::GFluidOutputRois());
return EPtr{new cv::gimpl::GFluidExecutable(graph, nodes, std::move(rois.rois))};
auto graph_data = fluidExtractInputDataFromGraph(graph, nodes);
const auto parallel_out_rois = cv::gimpl::getCompileArg<cv::GFluidParallelOutputRois>(args);
return parallel_out_rois.has_value() ?
EPtr{new cv::gimpl::GParallelFluidExecutable (graph, graph_data, std::move(parallel_out_rois.value().parallel_rois))}
: EPtr{new cv::gimpl::GFluidExecutable (graph, graph_data, std::move(rois.rois))}
;
}
virtual void addBackendPasses(ade::ExecutionEngineSetupContext &ectx) override;
@@ -700,27 +706,31 @@ void cv::gimpl::GFluidExecutable::initBufferRois(std::vector<int>& readStarts,
} // while (!nodesToVisit.empty())
}
cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
const std::vector<ade::NodeHandle> &nodes,
const std::vector<cv::gapi::own::Rect> &outputRois)
: m_g(g), m_gm(m_g)
cv::gimpl::FluidGraphInputData cv::gimpl::fluidExtractInputDataFromGraph(const ade::Graph &g, const std::vector<ade::NodeHandle> &nodes)
{
GConstFluidModel fg(m_g);
decltype(FluidGraphInputData::m_agents_data) agents_data;
decltype(FluidGraphInputData::m_scratch_users) scratch_users;
decltype(FluidGraphInputData::m_id_map) id_map;
decltype(FluidGraphInputData::m_all_gmat_ids) all_gmat_ids;
std::size_t mat_count = 0;
GConstFluidModel fg(g);
GModel::ConstGraph m_gm(g);
// Initialize vector of data buffers, build list of operations
// FIXME: There _must_ be a better way to [query] count number of DATA nodes
std::size_t mat_count = 0;
std::size_t last_agent = 0;
auto grab_mat_nh = [&](ade::NodeHandle nh) {
auto rc = m_gm.metadata(nh).get<Data>().rc;
if (m_id_map.count(rc) == 0)
if (id_map.count(rc) == 0)
{
m_all_gmat_ids[mat_count] = nh;
m_id_map[rc] = mat_count++;
all_gmat_ids[mat_count] = nh;
id_map[rc] = mat_count++;
}
};
std::size_t last_agent = 0;
for (const auto &nh : nodes)
{
switch (m_gm.metadata(nh).get<NodeType>().t)
@@ -733,15 +743,10 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
case NodeType::OP:
{
const auto& fu = fg.metadata(nh).get<FluidUnit>();
switch (fu.k.m_kind)
{
case GFluidKernel::Kind::Filter: m_agents.emplace_back(new FluidFilterAgent(m_g, nh)); break;
case GFluidKernel::Kind::Resize: m_agents.emplace_back(new FluidResizeAgent(m_g, nh)); break;
case GFluidKernel::Kind::NV12toRGB: m_agents.emplace_back(new FluidNV12toRGBAgent(m_g, nh)); break;
default: GAPI_Assert(false);
}
agents_data.push_back({fu.k.m_kind, nh, {}, {}});
// NB.: in_buffer_ids size is equal to Arguments size, not Edges size!!!
m_agents.back()->in_buffer_ids.resize(m_gm.metadata(nh).get<Op>().args.size(), -1);
agents_data.back().in_buffer_ids.resize(m_gm.metadata(nh).get<Op>().args.size(), -1);
for (auto eh : nh->inEdges())
{
// FIXME Only GMats are currently supported (which can be represented
@@ -751,23 +756,23 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
const auto in_port = m_gm.metadata(eh).get<Input>().port;
const int in_buf = m_gm.metadata(eh->srcNode()).get<Data>().rc;
m_agents.back()->in_buffer_ids[in_port] = in_buf;
agents_data.back().in_buffer_ids[in_port] = in_buf;
grab_mat_nh(eh->srcNode());
}
}
// FIXME: Assumption that all operation outputs MUST be connected
m_agents.back()->out_buffer_ids.resize(nh->outEdges().size(), -1);
agents_data.back().out_buffer_ids.resize(nh->outEdges().size(), -1);
for (auto eh : nh->outEdges())
{
const auto& data = m_gm.metadata(eh->dstNode()).get<Data>();
const auto out_port = m_gm.metadata(eh).get<Output>().port;
const int out_buf = data.rc;
m_agents.back()->out_buffer_ids[out_port] = out_buf;
agents_data.back().out_buffer_ids[out_port] = out_buf;
if (data.shape == GShape::GMAT) grab_mat_nh(eh->dstNode());
}
if (fu.k.m_scratch)
m_scratch_users.push_back(last_agent);
scratch_users.push_back(last_agent);
last_agent++;
break;
}
@@ -776,12 +781,50 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
}
// Check that IDs form a continiuos set (important for further indexing)
GAPI_Assert(m_id_map.size() > 0);
GAPI_Assert(m_id_map.size() == static_cast<size_t>(mat_count));
GAPI_Assert(id_map.size() > 0);
GAPI_Assert(id_map.size() == static_cast<size_t>(mat_count));
return FluidGraphInputData {std::move(agents_data), std::move(scratch_users), std::move(id_map), std::move(all_gmat_ids), mat_count};
}
cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
const cv::gimpl::FluidGraphInputData &traverse_res,
const std::vector<cv::gapi::own::Rect> &outputRois)
: m_g(g), m_gm(m_g)
{
GConstFluidModel fg(m_g);
auto tie_traverse_res = [&traverse_res](){
auto& r = traverse_res;
return std::tie(r.m_scratch_users, r.m_id_map, r.m_all_gmat_ids, r.m_mat_count);
};
auto tie_this = [this](){
return std::tie(m_scratch_users, m_id_map, m_all_gmat_ids, m_num_int_buffers);
};
tie_this() = tie_traverse_res();
auto create_fluid_agent = [&g](agent_data_t const& agent_data) -> std::unique_ptr<FluidAgent> {
std::unique_ptr<FluidAgent> agent_ptr;
switch (agent_data.kind)
{
case GFluidKernel::Kind::Filter: agent_ptr.reset(new FluidFilterAgent(g, agent_data.nh)); break;
case GFluidKernel::Kind::Resize: agent_ptr.reset(new FluidResizeAgent(g, agent_data.nh)); break;
case GFluidKernel::Kind::NV12toRGB: agent_ptr.reset(new FluidNV12toRGBAgent(g, agent_data.nh)); break;
default: GAPI_Assert(false);
}
std::tie(agent_ptr->in_buffer_ids, agent_ptr->out_buffer_ids) = std::tie(agent_data.in_buffer_ids, agent_data.out_buffer_ids);
return agent_ptr;
};
for (auto const& agent_data : traverse_res.m_agents_data){
m_agents.push_back(create_fluid_agent(agent_data));
}
// Actually initialize Fluid buffers
GAPI_LOG_INFO(NULL, "Initializing " << mat_count << " fluid buffer(s)" << std::endl);
m_num_int_buffers = mat_count;
GAPI_LOG_INFO(NULL, "Initializing " << m_num_int_buffers << " fluid buffer(s)" << std::endl);
const std::size_t num_scratch = m_scratch_users.size();
m_buffers.resize(m_num_int_buffers + num_scratch);
@@ -847,6 +890,12 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
makeReshape(outputRois);
GAPI_LOG_INFO(NULL, "Internal buffers: " << std::fixed << std::setprecision(2) << static_cast<float>(total_buffers_size())/1024 << " KB\n");
}
std::size_t cv::gimpl::GFluidExecutable::total_buffers_size() const
{
GConstFluidModel fg(m_g);
std::size_t total_size = 0;
for (const auto &i : ade::util::indexed(m_buffers))
{
@@ -854,7 +903,7 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
const auto idx = ade::util::index(i);
const auto b = ade::util::value(i);
if (idx >= m_num_int_buffers ||
fg.metadata(m_all_gmat_ids[idx]).get<FluidData>().internal == true)
fg.metadata(m_all_gmat_ids.at(idx)).get<FluidData>().internal == true)
{
GAPI_Assert(b.priv().size() > 0);
}
@@ -863,7 +912,7 @@ cv::gimpl::GFluidExecutable::GFluidExecutable(const ade::Graph &g,
// (There can be non-zero sized const border buffer allocated in such buffers)
total_size += b.priv().size();
}
GAPI_LOG_INFO(NULL, "Internal buffers: " << std::fixed << std::setprecision(2) << static_cast<float>(total_size)/1024 << " KB\n");
return total_size;
}
namespace
@@ -1196,6 +1245,11 @@ void cv::gimpl::GFluidExecutable::packArg(cv::GArg &in_arg, const cv::GArg &op_a
void cv::gimpl::GFluidExecutable::run(std::vector<InObj> &&input_objs,
std::vector<OutObj> &&output_objs)
{
run(input_objs, output_objs);
}
void cv::gimpl::GFluidExecutable::run(std::vector<InObj> &input_objs,
std::vector<OutObj> &output_objs)
{
// Bind input buffers from parameters
for (auto& it : input_objs) bindInArg(it.first, it.second);
@@ -1269,6 +1323,31 @@ void cv::gimpl::GFluidExecutable::run(std::vector<InObj> &&input_objs,
}
}
cv::gimpl::GParallelFluidExecutable::GParallelFluidExecutable(const ade::Graph &g,
const FluidGraphInputData &graph_data,
const std::vector<GFluidOutputRois> &parallelOutputRois)
{
for (auto&& rois : parallelOutputRois){
tiles.emplace_back(g, graph_data, rois.rois);
}
}
void cv::gimpl::GParallelFluidExecutable::reshape(ade::Graph&, const GCompileArgs& )
{
//TODO: implement ?
GAPI_Assert(false && "Not Implemented;");
}
void cv::gimpl::GParallelFluidExecutable::run(std::vector<InObj> &&input_objs,
std::vector<OutObj> &&output_objs)
{
for (auto& tile : tiles ){
tile.run(input_objs, output_objs);
}
}
// FIXME: these passes operate on graph global level!!!
// Need to fix this for heterogeneous (island-based) processing
void GFluidBackendImpl::addBackendPasses(ade::ExecutionEngineSetupContext &ectx)
@@ -51,6 +51,13 @@ struct FluidData
gapi::fluid::BorderOpt border;
};
struct agent_data_t {
GFluidKernel::Kind kind;
ade::NodeHandle nh;
std::vector<int> in_buffer_ids;
std::vector<int> out_buffer_ids;
};
struct FluidAgent
{
public:
@@ -96,6 +103,19 @@ private:
virtual std::pair<int,int> linesReadAndnextWindow(std::size_t inPort) const = 0;
};
//helper data structure for accumulating graph traversal/analysis data
struct FluidGraphInputData {
std::vector<agent_data_t> m_agents_data;
std::vector<std::size_t> m_scratch_users;
std::unordered_map<int, std::size_t> m_id_map; // GMat id -> buffer idx map
std::map<std::size_t, ade::NodeHandle> m_all_gmat_ids;
std::size_t m_mat_count;
};
//local helper function to traverse the graph once and pass the results to multiple instances of GFluidExecutable
FluidGraphInputData fluidExtractInputDataFromGraph(const ade::Graph &m_g, const std::vector<ade::NodeHandle> &nodes);
class GFluidExecutable final: public GIslandExecutable
{
const ade::Graph &m_g;
@@ -121,15 +141,36 @@ class GFluidExecutable final: public GIslandExecutable
void initBufferRois(std::vector<int>& readStarts, std::vector<cv::gapi::own::Rect>& rois, const std::vector<gapi::own::Rect> &out_rois);
void makeReshape(const std::vector<cv::gapi::own::Rect>& out_rois);
std::size_t total_buffers_size() const;
public:
GFluidExecutable(const ade::Graph &g,
const std::vector<ade::NodeHandle> &nodes,
const std::vector<cv::gapi::own::Rect> &outputRois);
virtual inline bool canReshape() const override { return true; }
virtual void reshape(ade::Graph& g, const GCompileArgs& args) override;
virtual void run(std::vector<InObj> &&input_objs,
std::vector<OutObj> &&output_objs) override;
void run(std::vector<InObj> &input_objs,
std::vector<OutObj> &output_objs);
GFluidExecutable(const ade::Graph &g,
const FluidGraphInputData &graph_data,
const std::vector<cv::gapi::own::Rect> &outputRois);
};
class GParallelFluidExecutable final: public GIslandExecutable {
std::vector<GFluidExecutable> tiles;
public:
GParallelFluidExecutable(const ade::Graph &g,
const FluidGraphInputData &graph_data,
const std::vector<GFluidOutputRois> &parallelOutputRois);
virtual inline bool canReshape() const override { return false; }
virtual void reshape(ade::Graph& g, const GCompileArgs& args) override;
virtual void run(std::vector<InObj> &&input_objs,
std::vector<OutObj> &&output_objs) override;
};