1105 lines
41 KiB
C++
1105 lines
41 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
// brpc - A framework to host and access services throughout Baidu.
|
|
|
|
// Date: Sun Jul 13 15:04:18 CST 2014
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <map>
|
|
#include <gtest/gtest.h>
|
|
#include "bthread/bthread.h"
|
|
#include "butil/gperftools_profiler.h"
|
|
#include "butil/time.h"
|
|
#include "butil/fast_rand.h"
|
|
#include "butil/containers/doubly_buffered_data.h"
|
|
#include "brpc/describable.h"
|
|
#include "brpc/socket.h"
|
|
#include "brpc/socket_map.h"
|
|
#include "brpc/global.h"
|
|
#include "brpc/details/load_balancer_with_naming.h"
|
|
#include "butil/strings/string_number_conversions.h"
|
|
#include "brpc/excluded_servers.h"
|
|
#include "brpc/policy/weighted_round_robin_load_balancer.h"
|
|
#include "brpc/policy/round_robin_load_balancer.h"
|
|
#include "brpc/policy/weighted_randomized_load_balancer.h"
|
|
#include "brpc/policy/randomized_load_balancer.h"
|
|
#include "brpc/policy/locality_aware_load_balancer.h"
|
|
#include "brpc/policy/consistent_hashing_load_balancer.h"
|
|
#include "brpc/policy/hasher.h"
|
|
#include "brpc/errno.pb.h"
|
|
#include "echo.pb.h"
|
|
#include "brpc/channel.h"
|
|
#include "brpc/controller.h"
|
|
#include "brpc/server.h"
|
|
|
|
namespace brpc {
|
|
DECLARE_int32(health_check_interval);
|
|
DECLARE_int64(detect_available_server_interval_ms);
|
|
namespace policy {
|
|
extern uint32_t CRCHash32(const char *key, size_t len);
|
|
extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
|
|
}}
|
|
|
|
namespace {
|
|
void initialize_random() {
|
|
srand(time(0));
|
|
}
|
|
pthread_once_t initialize_random_control = PTHREAD_ONCE_INIT;
|
|
|
|
class LoadBalancerTest : public ::testing::Test{
|
|
protected:
|
|
LoadBalancerTest(){
|
|
pthread_once(&initialize_random_control, initialize_random);
|
|
};
|
|
virtual ~LoadBalancerTest(){};
|
|
virtual void SetUp() {
|
|
};
|
|
virtual void TearDown() {
|
|
};
|
|
};
|
|
|
|
size_t TLS_ctor = 0;
|
|
size_t TLS_dtor = 0;
|
|
struct TLS {
|
|
TLS() {
|
|
++TLS_ctor;
|
|
}
|
|
~TLS() {
|
|
++TLS_dtor;
|
|
}
|
|
|
|
};
|
|
|
|
struct Foo {
|
|
Foo() : x(0) {}
|
|
int x;
|
|
};
|
|
|
|
bool AddN(Foo& f, int n) {
|
|
f.x += n;
|
|
return true;
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, doubly_buffered_data) {
|
|
// test doubly_buffered_data TLS limits
|
|
{
|
|
std::cout << "current PTHREAD_KEYS_MAX: " << PTHREAD_KEYS_MAX << std::endl;
|
|
butil::DoublyBufferedData<Foo> data[PTHREAD_KEYS_MAX + 1];
|
|
butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
|
|
ASSERT_EQ(0, data[PTHREAD_KEYS_MAX].Read(&ptr));
|
|
ASSERT_EQ(0, ptr->x);
|
|
}
|
|
|
|
butil::DoublyBufferedData<Foo> d;
|
|
{
|
|
butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
|
|
ASSERT_EQ(0, d.Read(&ptr));
|
|
ASSERT_EQ(0, ptr->x);
|
|
}
|
|
{
|
|
butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
|
|
ASSERT_EQ(0, d.Read(&ptr));
|
|
ASSERT_EQ(0, ptr->x);
|
|
}
|
|
|
|
d.Modify(AddN, 10);
|
|
{
|
|
butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
|
|
ASSERT_EQ(0, d.Read(&ptr));
|
|
ASSERT_EQ(10, ptr->x);
|
|
}
|
|
}
|
|
|
|
typedef brpc::policy::LocalityAwareLoadBalancer LALB;
|
|
|
|
static void ValidateWeightTree(
|
|
std::vector<LALB::ServerInfo> & weight_tree) {
|
|
std::vector<int64_t> weight_sum;
|
|
weight_sum.resize(weight_tree.size());
|
|
for (ssize_t i = weight_tree.size() - 1; i >= 0; --i) {
|
|
const size_t left_child = i * 2 + 1;
|
|
const size_t right_child = i * 2 + 2;
|
|
weight_sum[i] = weight_tree[i].weight->volatile_value();
|
|
if (left_child < weight_sum.size()) {
|
|
weight_sum[i] += weight_sum[left_child];
|
|
}
|
|
if (right_child < weight_sum.size()) {
|
|
weight_sum[i] += weight_sum[right_child];
|
|
}
|
|
}
|
|
for (size_t i = 0; i < weight_tree.size(); ++i) {
|
|
const int64_t left = weight_tree[i].left->load(butil::memory_order_relaxed);
|
|
size_t left_child = i * 2 + 1;
|
|
if (left_child < weight_tree.size()) {
|
|
ASSERT_EQ(weight_sum[left_child], left) << "i=" << i;
|
|
} else {
|
|
ASSERT_EQ(0, left);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void ValidateLALB(LALB& lalb, size_t N) {
|
|
LALB::Servers* d = lalb._db_servers._data;
|
|
for (size_t R = 0; R < 2; ++R) {
|
|
ASSERT_EQ(d[R].weight_tree.size(), N);
|
|
ASSERT_EQ(d[R].server_map.size(), N);
|
|
}
|
|
ASSERT_EQ(lalb._left_weights.size(), N);
|
|
int64_t total = 0;
|
|
for (size_t i = 0; i < N; ++i) {
|
|
ASSERT_EQ(d[0].weight_tree[i].server_id, d[1].weight_tree[i].server_id);
|
|
ASSERT_EQ(d[0].weight_tree[i].weight, d[1].weight_tree[i].weight);
|
|
for (size_t R = 0; R < 2; ++R) {
|
|
ASSERT_EQ((int64_t*)d[R].weight_tree[i].left, &lalb._left_weights[i]);
|
|
size_t* pindex = d[R].server_map.seek(d[R].weight_tree[i].server_id);
|
|
ASSERT_TRUE(pindex != NULL && *pindex == i);
|
|
}
|
|
total += d[0].weight_tree[i].weight->volatile_value();
|
|
}
|
|
ValidateWeightTree(d[0].weight_tree);
|
|
ASSERT_EQ(total, lalb._total.load());
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, la_sanity) {
|
|
LALB lalb;
|
|
ASSERT_EQ(0, lalb._total.load());
|
|
std::vector<brpc::ServerId> ids;
|
|
const size_t N = 256;
|
|
size_t cur_count = 0;
|
|
|
|
for (int REP = 0; REP < 5; ++REP) {
|
|
const size_t before_adding = cur_count;
|
|
for (; cur_count < N; ++cur_count) {
|
|
char addr[32];
|
|
snprintf(addr, sizeof(addr), "192.168.1.%d:8080", (int)cur_count);
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
ids.push_back(id);
|
|
ASSERT_TRUE(lalb.AddServer(id));
|
|
}
|
|
std::cout << "Added " << cur_count - before_adding << std::endl;
|
|
ValidateLALB(lalb, cur_count);
|
|
|
|
const size_t before_removal = cur_count;
|
|
std::random_shuffle(ids.begin(), ids.end());
|
|
for (size_t i = 0; i < N / 2; ++i) {
|
|
const brpc::ServerId id = ids.back();
|
|
ids.pop_back();
|
|
--cur_count;
|
|
ASSERT_TRUE(lalb.RemoveServer(id)) << "i=" << i;
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(id.id));
|
|
}
|
|
std::cout << "Removed " << before_removal - cur_count << std::endl;
|
|
ValidateLALB(lalb, cur_count);
|
|
}
|
|
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
|
|
}
|
|
}
|
|
|
|
typedef std::map<brpc::SocketId, int> CountMap;
|
|
volatile bool global_stop = false;
|
|
|
|
struct SelectArg {
|
|
brpc::LoadBalancer *lb;
|
|
uint32_t (*hash)(const void*, size_t);
|
|
};
|
|
|
|
void* select_server(void* arg) {
|
|
SelectArg *sa = (SelectArg *)arg;
|
|
brpc::LoadBalancer* c = sa->lb;
|
|
brpc::SocketUniquePtr ptr;
|
|
CountMap *selected_count = new CountMap;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
uint32_t rand_seed = rand();
|
|
if (sa->hash) {
|
|
uint32_t rd = ++rand_seed;
|
|
in.has_request_code = true;
|
|
in.request_code = sa->hash((const char *)&rd, sizeof(uint32_t));
|
|
}
|
|
int ret = 0;
|
|
while (!global_stop && (ret = c->SelectServer(in, &out)) == 0) {
|
|
if (sa->hash) {
|
|
uint32_t rd = ++rand_seed;
|
|
in.has_request_code = true;
|
|
in.request_code = sa->hash((const char *)&rd, sizeof(uint32_t));
|
|
}
|
|
++(*selected_count)[ptr->id()];
|
|
}
|
|
LOG_IF(INFO, ret != 0) << "select_server[" << pthread_self()
|
|
<< "] quits before of " << berror(ret);
|
|
return selected_count;
|
|
}
|
|
|
|
brpc::SocketId recycled_sockets[1024];
|
|
butil::atomic<size_t> nrecycle(0);
|
|
class SaveRecycle : public brpc::SocketUser {
|
|
void BeforeRecycle(brpc::Socket* s) {
|
|
recycled_sockets[nrecycle.fetch_add(1, butil::memory_order_relaxed)] = s->id();
|
|
delete this;
|
|
}
|
|
};
|
|
|
|
TEST_F(LoadBalancerTest, update_while_selection) {
|
|
for (size_t round = 0; round < 5; ++round) {
|
|
brpc::LoadBalancer* lb = NULL;
|
|
SelectArg sa = { NULL, NULL};
|
|
bool is_lalb = false;
|
|
if (round == 0) {
|
|
lb = new brpc::policy::RoundRobinLoadBalancer;
|
|
} else if (round == 1) {
|
|
lb = new brpc::policy::RandomizedLoadBalancer;
|
|
} else if (round == 2) {
|
|
lb = new LALB;
|
|
is_lalb = true;
|
|
} else if (round == 3) {
|
|
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
|
|
} else {
|
|
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
|
|
sa.hash = ::brpc::policy::MurmurHash32;
|
|
}
|
|
sa.lb = lb;
|
|
|
|
// Accessing empty lb should result in error.
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
|
|
|
|
nrecycle = 0;
|
|
global_stop = false;
|
|
pthread_t th[8];
|
|
std::vector<brpc::ServerId> ids;
|
|
brpc::SocketId wrr_sid_logoff = -1;
|
|
for (int i = 0; i < 256; ++i) {
|
|
char addr[32];
|
|
snprintf(addr, sizeof(addr), "192.%d.1.%d:8080", i, i);
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
if (3 == round) {
|
|
if (i < 255) {
|
|
id.tag = "1";
|
|
} else {
|
|
id.tag = "200000000";
|
|
}
|
|
}
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
ids.push_back(id);
|
|
ASSERT_TRUE(lb->AddServer(id));
|
|
if (round == 3 && i == 255) {
|
|
wrr_sid_logoff = id.id;
|
|
// In case of wrr, set 255th socket with huge weight logoff.
|
|
brpc::SocketUniquePtr ptr;
|
|
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
|
|
ptr->SetLogOff();
|
|
}
|
|
}
|
|
std::cout << "Time " << butil::class_name_str(*lb) << " ..." << std::endl;
|
|
butil::Timer tm;
|
|
tm.start();
|
|
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
|
|
ASSERT_EQ(0, pthread_create(&th[i], NULL, select_server, &sa));
|
|
}
|
|
std::vector<brpc::ServerId> removed;
|
|
const size_t REP = 200;
|
|
for (size_t k = 0; k < REP; ++k) {
|
|
if (round != 3) {
|
|
removed = ids;
|
|
} else {
|
|
removed.assign(ids.begin(), ids.begin() + 255);
|
|
}
|
|
std::random_shuffle(removed.begin(), removed.end());
|
|
removed.pop_back();
|
|
ASSERT_EQ(removed.size(), lb->RemoveServersInBatch(removed));
|
|
ASSERT_EQ(removed.size(), lb->AddServersInBatch(removed));
|
|
// // 1: Don't remove first server, otherwise select_server would quit.
|
|
// for (size_t i = 1/*1*/; i < removed.size(); ++i) {
|
|
// ASSERT_TRUE(lb->RemoveServer(removed[i]));
|
|
// }
|
|
// for (size_t i = 1; i < removed.size(); ++i) {
|
|
// ASSERT_TRUE(lb->AddServer(removed[i]));
|
|
// }
|
|
if (is_lalb) {
|
|
LALB* lalb = (LALB*)lb;
|
|
ValidateLALB(*lalb, ids.size());
|
|
ASSERT_GT(lalb->_total.load(), 0);
|
|
}
|
|
}
|
|
global_stop = true;
|
|
LOG(INFO) << "Stop all...";
|
|
|
|
void* retval[ARRAY_SIZE(th)];
|
|
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
|
|
ASSERT_EQ(0, pthread_join(th[i], &retval[i]));
|
|
}
|
|
tm.stop();
|
|
|
|
CountMap total_count;
|
|
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
|
|
CountMap* selected_count = (CountMap*)retval[i];
|
|
size_t count = 0;
|
|
for (CountMap::const_iterator it = selected_count->begin();
|
|
it != selected_count->end(); ++it) {
|
|
total_count[it->first] += it->second;
|
|
count += it->second;
|
|
}
|
|
delete selected_count;
|
|
|
|
std::cout << "thread " << i << " selected "
|
|
<< count * 1000000L / tm.u_elapsed() << " times/s"
|
|
<< std::endl;
|
|
}
|
|
size_t id_num = ids.size();
|
|
if (round == 3) {
|
|
// Do not include the logoff socket.
|
|
id_num -= 1;
|
|
}
|
|
ASSERT_EQ(id_num, total_count.size());
|
|
for (size_t i = 0; i < id_num; ++i) {
|
|
ASSERT_NE(0, total_count[ids[i].id]) << "i=" << i;
|
|
std::cout << i << "=" << total_count[ids[i].id] << " ";
|
|
}
|
|
std::cout << std::endl;
|
|
|
|
for (size_t i = 0; i < id_num; ++i) {
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
|
|
}
|
|
ASSERT_EQ(ids.size(), nrecycle);
|
|
brpc::SocketId id = -1;
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
id = recycled_sockets[i];
|
|
if (id != wrr_sid_logoff) {
|
|
ASSERT_EQ(1UL, total_count.erase(id));
|
|
} else {
|
|
ASSERT_EQ(0UL, total_count.erase(id));
|
|
}
|
|
}
|
|
delete lb;
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, fairness) {
|
|
for (size_t round = 0; round < 6; ++round) {
|
|
brpc::LoadBalancer* lb = NULL;
|
|
SelectArg sa = { NULL, NULL};
|
|
if (round == 0) {
|
|
lb = new brpc::policy::RoundRobinLoadBalancer;
|
|
} else if (round == 1) {
|
|
lb = new brpc::policy::RandomizedLoadBalancer;
|
|
} else if (round == 2) {
|
|
lb = new LALB;
|
|
} else if (3 == round || 4 == round) {
|
|
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
|
|
} else {
|
|
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
|
|
sa.hash = brpc::policy::MurmurHash32;
|
|
}
|
|
sa.lb = lb;
|
|
|
|
std::string lb_name = butil::class_name_str(*lb);
|
|
// Remove namespace
|
|
size_t ns_pos = lb_name.find_last_of(':');
|
|
if (ns_pos != std::string::npos) {
|
|
lb_name = lb_name.substr(ns_pos + 1);
|
|
}
|
|
|
|
nrecycle = 0;
|
|
global_stop = false;
|
|
pthread_t th[8];
|
|
std::vector<brpc::ServerId> ids;
|
|
for (int i = 0; i < 256; ++i) {
|
|
char addr[32];
|
|
snprintf(addr, sizeof(addr), "192.168.1.%d:8080", i);
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
if (3 == round) {
|
|
id.tag = "100";
|
|
} else if (4 == round) {
|
|
if ( i % 50 == 0) {
|
|
id.tag = std::to_string(i*2 + butil::fast_rand_less_than(40) + 80);
|
|
} else {
|
|
id.tag = std::to_string(butil::fast_rand_less_than(40) + 80);
|
|
}
|
|
}
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
ids.push_back(id);
|
|
lb->AddServer(id);
|
|
}
|
|
|
|
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
|
|
ASSERT_EQ(0, pthread_create(&th[i], NULL, select_server, &sa));
|
|
}
|
|
bthread_usleep(10000);
|
|
ProfilerStart((lb_name + ".prof").c_str());
|
|
bthread_usleep(300000);
|
|
ProfilerStop();
|
|
|
|
global_stop = true;
|
|
|
|
CountMap total_count;
|
|
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
|
|
void* retval;
|
|
ASSERT_EQ(0, pthread_join(th[i], &retval));
|
|
CountMap* selected_count = (CountMap*)retval;
|
|
ASSERT_TRUE(selected_count);
|
|
int first_count = 0;
|
|
for (CountMap::const_iterator it = selected_count->begin();
|
|
it != selected_count->end(); ++it) {
|
|
if (round == 0) {
|
|
if (first_count == 0) {
|
|
first_count = it->second;
|
|
} else {
|
|
// Load is not ensured to be fair inside each thread
|
|
// ASSERT_LE(abs(first_count - it->second), 1);
|
|
}
|
|
}
|
|
total_count[it->first] += it->second;
|
|
}
|
|
delete selected_count;
|
|
}
|
|
ASSERT_EQ(ids.size(), total_count.size());
|
|
size_t count_sum = 0;
|
|
size_t count_squared_sum = 0;
|
|
std::cout << lb_name << ':' << '\n';
|
|
|
|
if (round != 3 && round !=4) {
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
size_t count = total_count[ids[i].id];
|
|
ASSERT_NE(0ul, count) << "i=" << i;
|
|
std::cout << i << '=' << count << ' ';
|
|
count_sum += count;
|
|
count_squared_sum += count * count;
|
|
}
|
|
|
|
std::cout << '\n'
|
|
<< ": average=" << count_sum/ids.size()
|
|
<< " deviation=" << sqrt(count_squared_sum * ids.size()
|
|
- count_sum * count_sum) / ids.size() << std::endl;
|
|
} else { // for weighted round robin load balancer
|
|
std::cout << "configured weight: " << std::endl;
|
|
std::ostringstream os;
|
|
brpc::DescribeOptions opt;
|
|
lb->Describe(os, opt);
|
|
std::cout << os.str() << std::endl;
|
|
double scaling_count_sum = 0.0;
|
|
double scaling_count_squared_sum = 0.0;
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
size_t count = total_count[ids[i].id];
|
|
ASSERT_NE(0ul, count) << "i=" << i;
|
|
std::cout << i << '=' << count << ' ';
|
|
double scaling_count = static_cast<double>(count) / std::stoi(ids[i].tag);
|
|
scaling_count_sum += scaling_count;
|
|
scaling_count_squared_sum += scaling_count * scaling_count;
|
|
}
|
|
std::cout << '\n'
|
|
<< ": scaling average=" << scaling_count_sum/ids.size()
|
|
<< " scaling deviation=" << sqrt(scaling_count_squared_sum * ids.size()
|
|
- scaling_count_sum * scaling_count_sum) / ids.size() << std::endl;
|
|
}
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
|
|
}
|
|
ASSERT_EQ(ids.size(), nrecycle);
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
ASSERT_EQ(1UL, total_count.erase(recycled_sockets[i]));
|
|
}
|
|
delete lb;
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, consistent_hashing) {
|
|
::brpc::policy::HashFunc hashs[::brpc::policy::CONS_HASH_LB_LAST] = {
|
|
::brpc::policy::MurmurHash32,
|
|
::brpc::policy::MD5Hash32,
|
|
::brpc::policy::MD5Hash32
|
|
// ::brpc::policy::CRCHash32 crc is a bad hash function in test
|
|
};
|
|
|
|
::brpc::policy::ConsistentHashingLoadBalancerType hash_type[::brpc::policy::CONS_HASH_LB_LAST] = {
|
|
::brpc::policy::CONS_HASH_LB_MURMUR3,
|
|
::brpc::policy::CONS_HASH_LB_MD5,
|
|
::brpc::policy::CONS_HASH_LB_KETAMA
|
|
};
|
|
|
|
const char* servers[] = {
|
|
"10.92.115.19:8833",
|
|
"10.42.108.25:8833",
|
|
"10.36.150.32:8833",
|
|
"10.92.149.48:8833",
|
|
"10.42.122.201:8833",
|
|
"[2408:871a:2100:3:0:ff:b025:348d]:8833",
|
|
"unix:test.sock",
|
|
};
|
|
for (size_t round = 0; round < ARRAY_SIZE(hashs); ++round) {
|
|
brpc::policy::ConsistentHashingLoadBalancer chlb(hash_type[round]);
|
|
std::vector<brpc::ServerId> ids;
|
|
std::vector<butil::EndPoint> addrs;
|
|
for (int j = 0;j < 5; ++j) {
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
const char *addr = servers[i];
|
|
//snprintf(addr, sizeof(addr), "192.168.1.%d:8080", i);
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
ids.push_back(id);
|
|
addrs.push_back(dummy);
|
|
chlb.AddServer(id);
|
|
}
|
|
}
|
|
std::cout << chlb;
|
|
for (int i = 0; i < 5; ++i) {
|
|
std::vector<brpc::ServerId> empty;
|
|
chlb.AddServersInBatch(empty);
|
|
chlb.RemoveServersInBatch(empty);
|
|
std::cout << chlb;
|
|
}
|
|
const size_t SELECT_TIMES = 1000000;
|
|
std::map<butil::EndPoint, size_t> times;
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
::brpc::LoadBalancer::SelectOut out(&ptr);
|
|
for (size_t i = 0; i < SELECT_TIMES; ++i) {
|
|
in.has_request_code = true;
|
|
in.request_code = hashs[round]((const char *)&i, sizeof(i));
|
|
chlb.SelectServer(in, &out);
|
|
++times[ptr->remote_side()];
|
|
}
|
|
std::map<butil::EndPoint, double> load_map;
|
|
chlb.GetLoads(&load_map);
|
|
ASSERT_EQ(times.size(), load_map.size());
|
|
double load_sum = 0;;
|
|
double load_sqr_sum = 0;
|
|
for (size_t i = 0; i < addrs.size(); ++i) {
|
|
double normalized_load =
|
|
(double)times[addrs[i]] / SELECT_TIMES / load_map[addrs[i]];
|
|
std::cout << i << '=' << normalized_load << ' ';
|
|
load_sum += normalized_load;
|
|
load_sqr_sum += normalized_load * normalized_load;
|
|
}
|
|
std::cout << '\n';
|
|
std::cout << "average_normalized_load=" << load_sum / addrs.size()
|
|
<< " deviation="
|
|
<< sqrt(load_sqr_sum * addrs.size() - load_sum * load_sum) / addrs.size()
|
|
<< '\n';
|
|
for (size_t i = 0; i < ids.size(); ++i) {
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, weighted_round_robin) {
|
|
const char* servers[] = {
|
|
"10.92.115.19:8831",
|
|
"10.42.108.25:8832",
|
|
"10.36.150.32:8833",
|
|
"10.36.150.32:8899",
|
|
"10.92.149.48:8834",
|
|
"10.42.122.201:8835",
|
|
"10.42.122.202:8836"
|
|
};
|
|
std::string weight[] = {"3", "2", "7", "200000000", "1ab", "-1", "0"};
|
|
std::map<butil::EndPoint, int> configed_weight;
|
|
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
|
|
|
|
// Add server to selected list. The server with invalid weight will be skipped.
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
const char *addr = servers[i];
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
id.tag = weight[i];
|
|
if (i == 3) {
|
|
brpc::SocketUniquePtr ptr;
|
|
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
|
|
ptr->SetLogOff();
|
|
}
|
|
if ( i < 4 ) {
|
|
int weight_num = 0;
|
|
ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
|
|
configed_weight[dummy] = weight_num;
|
|
EXPECT_TRUE(wrrlb.AddServer(id));
|
|
} else {
|
|
EXPECT_FALSE(wrrlb.AddServer(id));
|
|
}
|
|
}
|
|
|
|
// Select the best server according to weight configured.
|
|
// There are 3 valid servers with weight 3, 2 and 7 respectively.
|
|
// We run SelectServer for 12 times. The result number of each server seleted should be
|
|
// consistent with weight configured.
|
|
std::map<butil::EndPoint, size_t> select_result;
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
int total_weight = 12;
|
|
std::vector<butil::EndPoint> select_servers;
|
|
for (int i = 0; i != total_weight; ++i) {
|
|
EXPECT_EQ(0, wrrlb.SelectServer(in, &out));
|
|
select_servers.emplace_back(ptr->remote_side());
|
|
++select_result[ptr->remote_side()];
|
|
}
|
|
|
|
for (const auto& s : select_servers) {
|
|
std::cout << "1=" << s << ", ";
|
|
}
|
|
std::cout << std::endl;
|
|
// Check whether slected result is consistent with expected.
|
|
EXPECT_EQ((size_t)3, select_result.size());
|
|
for (const auto& result : select_result) {
|
|
std::cout << result.first << " result=" << result.second
|
|
<< " configured=" << configed_weight[result.first] << std::endl;
|
|
EXPECT_EQ(result.second, (size_t)configed_weight[result.first]);
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
|
|
const char* servers[] = {
|
|
"10.92.115.19:8831",
|
|
"10.42.108.25:8832",
|
|
"10.36.150.32:8833"
|
|
};
|
|
std::string weight[] = {"200000000", "2", "600000"};
|
|
std::map<butil::EndPoint, int> configed_weight;
|
|
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
|
|
brpc::ExcludedServers* exclude = brpc::ExcludedServers::Create(3);
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
const char *addr = servers[i];
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
id.tag = weight[i];
|
|
if (i < 2) {
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
}
|
|
EXPECT_TRUE(wrrlb.AddServer(id));
|
|
if (i == 0) {
|
|
exclude->Add(id.id);
|
|
}
|
|
if (i == 1) {
|
|
brpc::SocketUniquePtr ptr;
|
|
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
|
|
ptr->SetLogOff();
|
|
}
|
|
}
|
|
// The first socket is excluded. The second socket is logfoff.
|
|
// The third socket is invalid.
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
|
|
brpc::ExcludedServers::Destroy(exclude);
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, weighted_randomized) {
|
|
const char* servers[] = {
|
|
"10.92.115.19:8831",
|
|
"10.42.108.25:8832",
|
|
"10.36.150.31:8833",
|
|
"10.36.150.32:8899",
|
|
"10.92.149.48:8834",
|
|
"10.42.122.201:8835",
|
|
"10.42.122.202:8836"
|
|
};
|
|
std::string weight[] = {"3", "2", "5", "10", "1ab", "-1", "0"};
|
|
std::map<butil::EndPoint, int> configed_weight;
|
|
uint64_t configed_weight_sum = 0;
|
|
brpc::policy::WeightedRandomizedLoadBalancer wrlb;
|
|
size_t valid_weight_num = 4;
|
|
|
|
// Add server to selected list. The server with invalid weight will be skipped.
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
const char *addr = servers[i];
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(addr, &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
options.user = new SaveRecycle;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
id.tag = weight[i];
|
|
if (i < valid_weight_num) {
|
|
int weight_num = 0;
|
|
ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
|
|
configed_weight[dummy] = weight_num;
|
|
configed_weight_sum += weight_num;
|
|
EXPECT_TRUE(wrlb.AddServer(id));
|
|
} else {
|
|
EXPECT_FALSE(wrlb.AddServer(id));
|
|
}
|
|
}
|
|
|
|
// Select the best server according to weight configured.
|
|
// There are 4 valid servers with weight 3, 2, 5 and 10 respectively.
|
|
// We run SelectServer for multiple times. The result number of each server seleted should be
|
|
// weight randomized with weight configured.
|
|
std::map<butil::EndPoint, size_t> select_result;
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
int run_times = configed_weight_sum * 10;
|
|
std::vector<butil::EndPoint> select_servers;
|
|
for (int i = 0; i < run_times; ++i) {
|
|
EXPECT_EQ(0, wrlb.SelectServer(in, &out));
|
|
select_servers.emplace_back(ptr->remote_side());
|
|
++select_result[ptr->remote_side()];
|
|
}
|
|
|
|
for (const auto& server : select_servers) {
|
|
std::cout << "weight randomized=" << server << ", ";
|
|
}
|
|
std::cout << std::endl;
|
|
|
|
// Check whether selected result is weight with expected.
|
|
EXPECT_EQ(valid_weight_num, select_result.size());
|
|
std::cout << "configed_weight_sum=" << configed_weight_sum << " run_times=" << run_times << std::endl;
|
|
for (const auto& result : select_result) {
|
|
double actual_rate = result.second * 1.0 / run_times;
|
|
double expect_rate = configed_weight[result.first] * 1.0 / configed_weight_sum;
|
|
std::cout << result.first << " weight=" << configed_weight[result.first]
|
|
<< " select_times=" << result.second
|
|
<< " actual_rate=" << actual_rate << " expect_rate=" << expect_rate
|
|
<< " expect_rate/2=" << expect_rate/2 << " expect_rate*2=" << expect_rate*2
|
|
<< std::endl;
|
|
// actual_rate >= expect_rate / 2
|
|
ASSERT_GE(actual_rate, expect_rate / 2);
|
|
// actual_rate <= expect_rate * 2
|
|
ASSERT_LE(actual_rate, expect_rate * 2);
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, health_check_no_valid_server) {
|
|
const char* servers[] = {
|
|
"10.92.115.19:8832",
|
|
"10.42.122.201:8833",
|
|
};
|
|
std::vector<brpc::LoadBalancer*> lbs;
|
|
lbs.push_back(new brpc::policy::RoundRobinLoadBalancer);
|
|
lbs.push_back(new brpc::policy::RandomizedLoadBalancer);
|
|
lbs.push_back(new brpc::policy::WeightedRoundRobinLoadBalancer);
|
|
|
|
for (int i = 0; i < (int)lbs.size(); ++i) {
|
|
brpc::LoadBalancer* lb = lbs[i];
|
|
std::vector<brpc::ServerId> ids;
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
|
|
brpc::ServerId id(8888);
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
id.tag = "50";
|
|
ids.push_back(id);
|
|
lb->AddServer(id);
|
|
}
|
|
|
|
// Without setting anything, the lb should work fine
|
|
for (int i = 0; i < 4; ++i) {
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
ASSERT_EQ(0, lb->SelectServer(in, &out));
|
|
}
|
|
|
|
brpc::SocketUniquePtr ptr;
|
|
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
|
|
ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
|
|
for (int i = 0; i < 4; ++i) {
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
ASSERT_EQ(0, lb->SelectServer(in, &out));
|
|
// After putting server[0] into health check state, the only choice is servers[1]
|
|
ASSERT_EQ(ptr->remote_side().port, 8833);
|
|
}
|
|
|
|
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
|
|
ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
|
|
for (int i = 0; i < 4; ++i) {
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
// There is no server available
|
|
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
|
|
}
|
|
|
|
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
|
|
ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
|
|
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
|
|
ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
|
|
// After reset health check state, the lb should work fine
|
|
bool get_server1 = false;
|
|
bool get_server2 = false;
|
|
for (int i = 0; i < 20; ++i) {
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
ASSERT_EQ(0, lb->SelectServer(in, &out));
|
|
if (ptr->remote_side().port == 8832) {
|
|
get_server1 = true;
|
|
} else {
|
|
get_server2 = true;
|
|
}
|
|
}
|
|
ASSERT_TRUE(get_server1 && get_server2);
|
|
delete lb;
|
|
}
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
|
|
const char* servers[] = {
|
|
"10.92.115.19:8832",
|
|
"10.42.122.201:8833",
|
|
};
|
|
brpc::LoadBalancer* lb = NULL;
|
|
int rand = butil::fast_rand_less_than(2);
|
|
if (rand == 0) {
|
|
brpc::policy::RandomizedLoadBalancer rlb;
|
|
lb = rlb.New("min_working_instances=2 hold_seconds=2");
|
|
} else if (rand == 1) {
|
|
brpc::policy::RoundRobinLoadBalancer rrlb;
|
|
lb = rrlb.New("min_working_instances=2 hold_seconds=2");
|
|
}
|
|
brpc::SocketUniquePtr ptr[2];
|
|
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
|
|
butil::EndPoint dummy;
|
|
ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
|
|
brpc::SocketOptions options;
|
|
options.remote_side = dummy;
|
|
brpc::ServerId id(8888);
|
|
id.tag = "50";
|
|
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
|
|
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr[i]));
|
|
lb->AddServer(id);
|
|
}
|
|
brpc::SocketUniquePtr sptr;
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL };
|
|
brpc::LoadBalancer::SelectOut out(&sptr);
|
|
ASSERT_EQ(0, lb->SelectServer(in, &out));
|
|
|
|
ptr[0]->SetFailed();
|
|
ptr[1]->SetFailed();
|
|
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
|
|
// should reject all request since there is no available server
|
|
for (int i = 0; i < 10; ++i) {
|
|
ASSERT_EQ(brpc::EREJECT, lb->SelectServer(in, &out));
|
|
}
|
|
{
|
|
brpc::SocketUniquePtr dummy_ptr;
|
|
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr));
|
|
dummy_ptr->Revive();
|
|
}
|
|
bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
|
|
// After one server is revived, the reject rate should be 50%
|
|
int num_ereject = 0;
|
|
int num_ok = 0;
|
|
for (int i = 0; i < 100; ++i) {
|
|
int rc = lb->SelectServer(in, &out);
|
|
if (rc == brpc::EREJECT) {
|
|
num_ereject++;
|
|
} else if (rc == 0) {
|
|
num_ok++;
|
|
} else {
|
|
ASSERT_TRUE(false);
|
|
}
|
|
}
|
|
ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
|
|
bthread_usleep((2000 /* hold_seconds */ + 10) * 1000);
|
|
|
|
// After enough waiting time, traffic should be sent to all available servers.
|
|
for (int i = 0; i < 10; ++i) {
|
|
ASSERT_EQ(0, lb->SelectServer(in, &out));
|
|
}
|
|
}
|
|
|
|
class EchoServiceImpl : public test::EchoService {
|
|
public:
|
|
EchoServiceImpl()
|
|
: _num_request(0) {}
|
|
virtual ~EchoServiceImpl() {}
|
|
virtual void Echo(google::protobuf::RpcController* cntl_base,
|
|
const test::EchoRequest* req,
|
|
test::EchoResponse* res,
|
|
google::protobuf::Closure* done) {
|
|
//brpc::Controller* cntl =
|
|
// static_cast<brpc::Controller*>(cntl_base);
|
|
brpc::ClosureGuard done_guard(done);
|
|
int p = _num_request.fetch_add(1, butil::memory_order_relaxed);
|
|
// concurrency in normal case is 50
|
|
if (p < 70) {
|
|
bthread_usleep(100 * 1000);
|
|
_num_request.fetch_sub(1, butil::memory_order_relaxed);
|
|
res->set_message("OK");
|
|
} else {
|
|
_num_request.fetch_sub(1, butil::memory_order_relaxed);
|
|
bthread_usleep(1000 * 1000);
|
|
}
|
|
return;
|
|
}
|
|
|
|
butil::atomic<int> _num_request;
|
|
};
|
|
|
|
butil::atomic<int32_t> num_failed(0);
|
|
butil::atomic<int32_t> num_reject(0);
|
|
|
|
class Done : public google::protobuf::Closure {
|
|
public:
|
|
void Run() {
|
|
if (cntl.Failed()) {
|
|
num_failed.fetch_add(1, butil::memory_order_relaxed);
|
|
if (cntl.ErrorCode() == brpc::EREJECT) {
|
|
num_reject.fetch_add(1, butil::memory_order_relaxed);
|
|
}
|
|
}
|
|
delete this;
|
|
}
|
|
brpc::Controller cntl;
|
|
test::EchoRequest req;
|
|
test::EchoResponse res;
|
|
};
|
|
|
|
TEST_F(LoadBalancerTest, invalid_lb_params) {
|
|
const char* lb_algo[] = { "random:mi_working_instances=2 hold_seconds=2",
|
|
"rr:min_working_instances=2 hold_secon=2" };
|
|
brpc::Channel channel;
|
|
brpc::ChannelOptions options;
|
|
options.protocol = "http";
|
|
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
|
|
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
|
|
&options), -1);
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
|
|
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20");
|
|
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
|
|
// Those two lines force the interval of first hc to 3s
|
|
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "3000");
|
|
GFLAGS_NS::SetCommandLineOption("circuit_breaker_min_isolation_duration_ms", "3000");
|
|
|
|
const char* lb_algo[] = { "random:min_working_instances=2 hold_seconds=2",
|
|
"rr:min_working_instances=2 hold_seconds=2" };
|
|
brpc::Channel channel;
|
|
brpc::ChannelOptions options;
|
|
options.protocol = "http";
|
|
options.timeout_ms = 300;
|
|
options.enable_circuit_breaker = true;
|
|
// Disable retry to make health check happen one by one
|
|
options.max_retry = 0;
|
|
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
|
|
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
|
|
&options), 0);
|
|
test::EchoRequest req;
|
|
req.set_message("123");
|
|
test::EchoResponse res;
|
|
test::EchoService_Stub stub(&channel);
|
|
{
|
|
// trigger one server to health check
|
|
brpc::Controller cntl;
|
|
stub.Echo(&cntl, &req, &res, NULL);
|
|
}
|
|
// This sleep make one server revived 700ms earlier than the other server, which
|
|
// can make the server down again if no request limit policy are applied here.
|
|
bthread_usleep(700000);
|
|
{
|
|
// trigger the other server to health check
|
|
brpc::Controller cntl;
|
|
stub.Echo(&cntl, &req, &res, NULL);
|
|
}
|
|
|
|
butil::EndPoint point(butil::IP_ANY, 7777);
|
|
brpc::Server server;
|
|
EchoServiceImpl service;
|
|
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
|
|
ASSERT_EQ(0, server.Start(point, NULL));
|
|
|
|
butil::EndPoint point2(butil::IP_ANY, 7778);
|
|
brpc::Server server2;
|
|
EchoServiceImpl service2;
|
|
ASSERT_EQ(0, server2.AddService(&service2, brpc::SERVER_DOESNT_OWN_SERVICE));
|
|
ASSERT_EQ(0, server2.Start(point2, NULL));
|
|
|
|
int64_t start_ms = butil::gettimeofday_ms();
|
|
while ((butil::gettimeofday_ms() - start_ms) < 3500) {
|
|
Done* done = new Done;
|
|
done->req.set_message("123");
|
|
stub.Echo(&done->cntl, &done->req, &done->res, done);
|
|
bthread_usleep(1000);
|
|
}
|
|
// All error code should be equal to EREJECT, except when the situation
|
|
// all servers are down, the very first call that trigger recovering would
|
|
// fail with EHOSTDOWN instead of EREJECT. This is where the number 1 comes
|
|
// in following ASSERT.
|
|
ASSERT_TRUE(num_failed.load(butil::memory_order_relaxed) -
|
|
num_reject.load(butil::memory_order_relaxed) == 1);
|
|
num_failed.store(0, butil::memory_order_relaxed);
|
|
|
|
// should recover now
|
|
for (int i = 0; i < 1000; ++i) {
|
|
Done* done = new Done;
|
|
done->req.set_message("123");
|
|
stub.Echo(&done->cntl, &done->req, &done->res, done);
|
|
bthread_usleep(1000);
|
|
}
|
|
bthread_usleep(500000 /* sleep longer than timeout of channel */);
|
|
ASSERT_EQ(0, num_failed.load(butil::memory_order_relaxed));
|
|
}
|
|
|
|
TEST_F(LoadBalancerTest, la_selection_too_long) {
|
|
brpc::GlobalInitializeOrDie();
|
|
brpc::LoadBalancerWithNaming lb;
|
|
CHECK_EQ(0, lb.Init("list://127.0.0.1:8888", "la", nullptr, nullptr));
|
|
char addr[] = "127.0.0.1:8888";
|
|
butil::EndPoint ep;
|
|
ASSERT_EQ(0, str2endpoint(addr, &ep));
|
|
brpc::SocketId id;
|
|
ASSERT_EQ(0, brpc::SocketMapFind(brpc::SocketMapKey(ep), &id));
|
|
ASSERT_EQ(0, brpc::Socket::SetFailed(id));
|
|
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, nullptr };
|
|
brpc::SocketUniquePtr ptr;
|
|
brpc::LoadBalancer::SelectOut out(&ptr);
|
|
ASSERT_EQ(EHOSTDOWN, lb.SelectServer(in, &out));
|
|
}
|
|
|
|
} //namespace
|