329 lines
11 KiB
C++
329 lines
11 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
// Date 2014/09/24 16:01:08
|
|
|
|
#ifndef BVAR_REDUCER_H
|
|
#define BVAR_REDUCER_H
|
|
|
|
#include <limits> // std::numeric_limits
|
|
#include "butil/logging.h" // LOG()
|
|
#include "butil/type_traits.h" // butil::add_cr_non_integral
|
|
#include "butil/class_name.h" // class_name_str
|
|
#include "bvar/variable.h" // Variable
|
|
#include "bvar/detail/combiner.h" // detail::AgentCombiner
|
|
#include "bvar/detail/sampler.h" // ReducerSampler
|
|
#include "bvar/detail/series.h"
|
|
#include "bvar/window.h"
|
|
|
|
namespace bvar {
|
|
|
|
// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
|
|
// `Op' shall satisfy:
|
|
// - associative: a Op (b Op c) == (a Op b) Op c
|
|
// - commutative: a Op b == b Op a;
|
|
// - no side effects: a Op b never changes if a and b are fixed.
|
|
// otherwise the result is undefined.
|
|
//
|
|
// For performance issues, we don't let Op return value, instead it shall
|
|
// set the result to the first parameter in-place. Namely to add two values,
|
|
// "+=" should be implemented rather than "+".
|
|
//
|
|
// Reducer works for non-primitive T which satisfies:
|
|
// - T() should be the identity of Op.
|
|
// - stream << v should compile and put description of v into the stream
|
|
// Example:
|
|
// class MyType {
|
|
// friend std::ostream& operator<<(std::ostream& os, const MyType&);
|
|
// public:
|
|
// MyType() : _x(0) {}
|
|
// explicit MyType(int x) : _x(x) {}
|
|
// void operator+=(const MyType& rhs) const {
|
|
// _x += rhs._x;
|
|
// }
|
|
// private:
|
|
// int _x;
|
|
// };
|
|
// std::ostream& operator<<(std::ostream& os, const MyType& value) {
|
|
// return os << "MyType{" << value._x << "}";
|
|
// }
|
|
// bvar::Adder<MyType> my_type_sum;
|
|
// my_type_sum << MyType(1) << MyType(2) << MyType(3);
|
|
// LOG(INFO) << my_type_sum; // "MyType{6}"
|
|
|
|
template <typename T, typename Op, typename InvOp = detail::VoidOp>
|
|
class Reducer : public Variable {
|
|
public:
|
|
typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
|
|
typedef typename combiner_type::Agent agent_type;
|
|
typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
|
|
class SeriesSampler : public detail::Sampler {
|
|
public:
|
|
SeriesSampler(Reducer* owner, const Op& op)
|
|
: _owner(owner), _series(op) {}
|
|
~SeriesSampler() {}
|
|
void take_sample() override { _series.append(_owner->get_value()); }
|
|
void describe(std::ostream& os) { _series.describe(os, NULL); }
|
|
private:
|
|
Reducer* _owner;
|
|
detail::Series<T, Op> _series;
|
|
};
|
|
|
|
public:
|
|
// The `identify' must satisfy: identity Op a == a
|
|
Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
|
|
const Op& op = Op(),
|
|
const InvOp& inv_op = InvOp())
|
|
: _combiner(identity, identity, op)
|
|
, _sampler(NULL)
|
|
, _series_sampler(NULL)
|
|
, _inv_op(inv_op) {
|
|
}
|
|
|
|
~Reducer() {
|
|
// Calling hide() manually is a MUST required by Variable.
|
|
hide();
|
|
if (_sampler) {
|
|
_sampler->destroy();
|
|
_sampler = NULL;
|
|
}
|
|
if (_series_sampler) {
|
|
_series_sampler->destroy();
|
|
_series_sampler = NULL;
|
|
}
|
|
}
|
|
|
|
// Add a value.
|
|
// Returns self reference for chaining.
|
|
Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value);
|
|
|
|
// Get reduced value.
|
|
// Notice that this function walks through threads that ever add values
|
|
// into this reducer. You should avoid calling it frequently.
|
|
T get_value() const {
|
|
CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL)
|
|
<< "You should not call Reducer<" << butil::class_name_str<T>()
|
|
<< ", " << butil::class_name_str<Op>() << ">::get_value() when a"
|
|
<< " Window<> is used because the operator does not have inverse.";
|
|
return _combiner.combine_agents();
|
|
}
|
|
|
|
|
|
// Reset the reduced value to T().
|
|
// Returns the reduced value before reset.
|
|
T reset() { return _combiner.reset_all_agents(); }
|
|
|
|
void describe(std::ostream& os, bool quote_string) const override {
|
|
if (butil::is_same<T, std::string>::value && quote_string) {
|
|
os << '"' << get_value() << '"';
|
|
} else {
|
|
os << get_value();
|
|
}
|
|
}
|
|
|
|
#ifdef BAIDU_INTERNAL
|
|
void get_value(boost::any* value) const override { *value = get_value(); }
|
|
#endif
|
|
|
|
// True if this reducer is constructed successfully.
|
|
bool valid() const { return _combiner.valid(); }
|
|
|
|
// Get instance of Op.
|
|
const Op& op() const { return _combiner.op(); }
|
|
const InvOp& inv_op() const { return _inv_op; }
|
|
|
|
sampler_type* get_sampler() {
|
|
if (NULL == _sampler) {
|
|
_sampler = new sampler_type(this);
|
|
_sampler->schedule();
|
|
}
|
|
return _sampler;
|
|
}
|
|
|
|
int describe_series(std::ostream& os, const SeriesOptions& options) const override {
|
|
if (_series_sampler == NULL) {
|
|
return 1;
|
|
}
|
|
if (!options.test_only) {
|
|
_series_sampler->describe(os);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
protected:
|
|
int expose_impl(const butil::StringPiece& prefix,
|
|
const butil::StringPiece& name,
|
|
DisplayFilter display_filter) override {
|
|
const int rc = Variable::expose_impl(prefix, name, display_filter);
|
|
if (rc == 0 &&
|
|
_series_sampler == NULL &&
|
|
!butil::is_same<InvOp, detail::VoidOp>::value &&
|
|
!butil::is_same<T, std::string>::value &&
|
|
FLAGS_save_series) {
|
|
_series_sampler = new SeriesSampler(this, _combiner.op());
|
|
_series_sampler->schedule();
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
private:
|
|
combiner_type _combiner;
|
|
sampler_type* _sampler;
|
|
SeriesSampler* _series_sampler;
|
|
InvOp _inv_op;
|
|
};
|
|
|
|
template <typename T, typename Op, typename InvOp>
|
|
inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
|
|
typename butil::add_cr_non_integral<T>::type value) {
|
|
// It's wait-free for most time
|
|
agent_type* agent = _combiner.get_or_create_tls_agent();
|
|
if (__builtin_expect(!agent, 0)) {
|
|
LOG(FATAL) << "Fail to create agent";
|
|
return *this;
|
|
}
|
|
agent->element.modify(_combiner.op(), value);
|
|
return *this;
|
|
}
|
|
|
|
// =================== Common reducers ===================
|
|
|
|
// bvar::Adder<int> sum;
|
|
// sum << 1 << 2 << 3 << 4;
|
|
// LOG(INFO) << sum.get_value(); // 10
|
|
// Commonly used functors
|
|
namespace detail {
|
|
template <typename Tp>
|
|
struct AddTo {
|
|
void operator()(Tp & lhs,
|
|
typename butil::add_cr_non_integral<Tp>::type rhs) const
|
|
{ lhs += rhs; }
|
|
};
|
|
template <typename Tp>
|
|
struct MinusFrom {
|
|
void operator()(Tp & lhs,
|
|
typename butil::add_cr_non_integral<Tp>::type rhs) const
|
|
{ lhs -= rhs; }
|
|
};
|
|
}
|
|
template <typename T>
|
|
class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
|
|
public:
|
|
typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
|
|
typedef T value_type;
|
|
typedef typename Base::sampler_type sampler_type;
|
|
public:
|
|
Adder() : Base() {}
|
|
explicit Adder(const butil::StringPiece& name) : Base() {
|
|
this->expose(name);
|
|
}
|
|
Adder(const butil::StringPiece& prefix,
|
|
const butil::StringPiece& name) : Base() {
|
|
this->expose_as(prefix, name);
|
|
}
|
|
~Adder() { Variable::hide(); }
|
|
};
|
|
|
|
// bvar::Maxer<int> max_value;
|
|
// max_value << 1 << 2 << 3 << 4;
|
|
// LOG(INFO) << max_value.get_value(); // 4
|
|
namespace detail {
|
|
template <typename Tp>
|
|
struct MaxTo {
|
|
void operator()(Tp & lhs,
|
|
typename butil::add_cr_non_integral<Tp>::type rhs) const {
|
|
// Use operator< as well.
|
|
if (lhs < rhs) {
|
|
lhs = rhs;
|
|
}
|
|
}
|
|
};
|
|
class LatencyRecorderBase;
|
|
}
|
|
template <typename T>
|
|
class Maxer : public Reducer<T, detail::MaxTo<T> > {
|
|
public:
|
|
typedef Reducer<T, detail::MaxTo<T> > Base;
|
|
typedef T value_type;
|
|
typedef typename Base::sampler_type sampler_type;
|
|
public:
|
|
Maxer() : Base(std::numeric_limits<T>::min()) {}
|
|
explicit Maxer(const butil::StringPiece& name)
|
|
: Base(std::numeric_limits<T>::min()) {
|
|
this->expose(name);
|
|
}
|
|
Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name)
|
|
: Base(std::numeric_limits<T>::min()) {
|
|
this->expose_as(prefix, name);
|
|
}
|
|
~Maxer() { Variable::hide(); }
|
|
private:
|
|
friend class detail::LatencyRecorderBase;
|
|
// The following private funcition a now used in LatencyRecorder,
|
|
// it's dangerous so we don't make them public
|
|
explicit Maxer(T default_value) : Base(default_value) {
|
|
}
|
|
Maxer(T default_value, const butil::StringPiece& prefix,
|
|
const butil::StringPiece& name)
|
|
: Base(default_value) {
|
|
this->expose_as(prefix, name);
|
|
}
|
|
Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) {
|
|
this->expose(name);
|
|
}
|
|
};
|
|
|
|
// bvar::Miner<int> min_value;
|
|
// min_value << 1 << 2 << 3 << 4;
|
|
// LOG(INFO) << min_value.get_value(); // 1
|
|
namespace detail {
|
|
|
|
template <typename Tp>
|
|
struct MinTo {
|
|
void operator()(Tp & lhs,
|
|
typename butil::add_cr_non_integral<Tp>::type rhs) const {
|
|
if (rhs < lhs) {
|
|
lhs = rhs;
|
|
}
|
|
}
|
|
};
|
|
|
|
} // namespace detail
|
|
|
|
template <typename T>
|
|
class Miner : public Reducer<T, detail::MinTo<T> > {
|
|
public:
|
|
typedef Reducer<T, detail::MinTo<T> > Base;
|
|
typedef T value_type;
|
|
typedef typename Base::sampler_type sampler_type;
|
|
public:
|
|
Miner() : Base(std::numeric_limits<T>::max()) {}
|
|
explicit Miner(const butil::StringPiece& name)
|
|
: Base(std::numeric_limits<T>::max()) {
|
|
this->expose(name);
|
|
}
|
|
Miner(const butil::StringPiece& prefix, const butil::StringPiece& name)
|
|
: Base(std::numeric_limits<T>::max()) {
|
|
this->expose_as(prefix, name);
|
|
}
|
|
~Miner() { Variable::hide(); }
|
|
};
|
|
|
|
} // namespace bvar
|
|
|
|
#endif //BVAR_REDUCER_H
|