// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Date 2014/09/24 16:01:08 #ifndef BVAR_REDUCER_H #define BVAR_REDUCER_H #include // std::numeric_limits #include "butil/logging.h" // LOG() #include "butil/type_traits.h" // butil::add_cr_non_integral #include "butil/class_name.h" // class_name_str #include "bvar/variable.h" // Variable #include "bvar/detail/combiner.h" // detail::AgentCombiner #include "bvar/detail/sampler.h" // ReducerSampler #include "bvar/detail/series.h" #include "bvar/window.h" namespace bvar { // Reduce multiple values into one with `Op': e1 Op e2 Op e3 ... // `Op' shall satisfy: // - associative: a Op (b Op c) == (a Op b) Op c // - commutative: a Op b == b Op a; // - no side effects: a Op b never changes if a and b are fixed. // otherwise the result is undefined. // // For performance issues, we don't let Op return value, instead it shall // set the result to the first parameter in-place. Namely to add two values, // "+=" should be implemented rather than "+". // // Reducer works for non-primitive T which satisfies: // - T() should be the identity of Op. // - stream << v should compile and put description of v into the stream // Example: // class MyType { // friend std::ostream& operator<<(std::ostream& os, const MyType&); // public: // MyType() : _x(0) {} // explicit MyType(int x) : _x(x) {} // void operator+=(const MyType& rhs) const { // _x += rhs._x; // } // private: // int _x; // }; // std::ostream& operator<<(std::ostream& os, const MyType& value) { // return os << "MyType{" << value._x << "}"; // } // bvar::Adder my_type_sum; // my_type_sum << MyType(1) << MyType(2) << MyType(3); // LOG(INFO) << my_type_sum; // "MyType{6}" template class Reducer : public Variable { public: typedef typename detail::AgentCombiner combiner_type; typedef typename combiner_type::Agent agent_type; typedef detail::ReducerSampler sampler_type; class SeriesSampler : public detail::Sampler { public: SeriesSampler(Reducer* owner, const Op& op) : _owner(owner), _series(op) {} ~SeriesSampler() {} void take_sample() override { _series.append(_owner->get_value()); } void describe(std::ostream& os) { _series.describe(os, NULL); } private: Reducer* _owner; detail::Series _series; }; public: // The `identify' must satisfy: identity Op a == a Reducer(typename butil::add_cr_non_integral::type identity = T(), const Op& op = Op(), const InvOp& inv_op = InvOp()) : _combiner(identity, identity, op) , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) { } ~Reducer() { // Calling hide() manually is a MUST required by Variable. hide(); if (_sampler) { _sampler->destroy(); _sampler = NULL; } if (_series_sampler) { _series_sampler->destroy(); _series_sampler = NULL; } } // Add a value. // Returns self reference for chaining. Reducer& operator<<(typename butil::add_cr_non_integral::type value); // Get reduced value. // Notice that this function walks through threads that ever add values // into this reducer. You should avoid calling it frequently. T get_value() const { CHECK(!(butil::is_same::value) || _sampler == NULL) << "You should not call Reducer<" << butil::class_name_str() << ", " << butil::class_name_str() << ">::get_value() when a" << " Window<> is used because the operator does not have inverse."; return _combiner.combine_agents(); } // Reset the reduced value to T(). // Returns the reduced value before reset. T reset() { return _combiner.reset_all_agents(); } void describe(std::ostream& os, bool quote_string) const override { if (butil::is_same::value && quote_string) { os << '"' << get_value() << '"'; } else { os << get_value(); } } #ifdef BAIDU_INTERNAL void get_value(boost::any* value) const override { *value = get_value(); } #endif // True if this reducer is constructed successfully. bool valid() const { return _combiner.valid(); } // Get instance of Op. const Op& op() const { return _combiner.op(); } const InvOp& inv_op() const { return _inv_op; } sampler_type* get_sampler() { if (NULL == _sampler) { _sampler = new sampler_type(this); _sampler->schedule(); } return _sampler; } int describe_series(std::ostream& os, const SeriesOptions& options) const override { if (_series_sampler == NULL) { return 1; } if (!options.test_only) { _series_sampler->describe(os); } return 0; } protected: int expose_impl(const butil::StringPiece& prefix, const butil::StringPiece& name, DisplayFilter display_filter) override { const int rc = Variable::expose_impl(prefix, name, display_filter); if (rc == 0 && _series_sampler == NULL && !butil::is_same::value && !butil::is_same::value && FLAGS_save_series) { _series_sampler = new SeriesSampler(this, _combiner.op()); _series_sampler->schedule(); } return rc; } private: combiner_type _combiner; sampler_type* _sampler; SeriesSampler* _series_sampler; InvOp _inv_op; }; template inline Reducer& Reducer::operator<<( typename butil::add_cr_non_integral::type value) { // It's wait-free for most time agent_type* agent = _combiner.get_or_create_tls_agent(); if (__builtin_expect(!agent, 0)) { LOG(FATAL) << "Fail to create agent"; return *this; } agent->element.modify(_combiner.op(), value); return *this; } // =================== Common reducers =================== // bvar::Adder sum; // sum << 1 << 2 << 3 << 4; // LOG(INFO) << sum.get_value(); // 10 // Commonly used functors namespace detail { template struct AddTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { lhs += rhs; } }; template struct MinusFrom { void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { lhs -= rhs; } }; } template class Adder : public Reducer, detail::MinusFrom > { public: typedef Reducer, detail::MinusFrom > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Adder() : Base() {} explicit Adder(const butil::StringPiece& name) : Base() { this->expose(name); } Adder(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base() { this->expose_as(prefix, name); } ~Adder() { Variable::hide(); } }; // bvar::Maxer max_value; // max_value << 1 << 2 << 3 << 4; // LOG(INFO) << max_value.get_value(); // 4 namespace detail { template struct MaxTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { // Use operator< as well. if (lhs < rhs) { lhs = rhs; } } }; class LatencyRecorderBase; } template class Maxer : public Reducer > { public: typedef Reducer > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Maxer() : Base(std::numeric_limits::min()) {} explicit Maxer(const butil::StringPiece& name) : Base(std::numeric_limits::min()) { this->expose(name); } Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(std::numeric_limits::min()) { this->expose_as(prefix, name); } ~Maxer() { Variable::hide(); } private: friend class detail::LatencyRecorderBase; // The following private funcition a now used in LatencyRecorder, // it's dangerous so we don't make them public explicit Maxer(T default_value) : Base(default_value) { } Maxer(T default_value, const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(default_value) { this->expose_as(prefix, name); } Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) { this->expose(name); } }; // bvar::Miner min_value; // min_value << 1 << 2 << 3 << 4; // LOG(INFO) << min_value.get_value(); // 1 namespace detail { template struct MinTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral::type rhs) const { if (rhs < lhs) { lhs = rhs; } } }; } // namespace detail template class Miner : public Reducer > { public: typedef Reducer > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Miner() : Base(std::numeric_limits::max()) {} explicit Miner(const butil::StringPiece& name) : Base(std::numeric_limits::max()) { this->expose(name); } Miner(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(std::numeric_limits::max()) { this->expose_as(prefix, name); } ~Miner() { Variable::hide(); } }; } // namespace bvar #endif //BVAR_REDUCER_H