// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include #include #include #if BRPC_WITH_RDMA #include #include "butil/endpoint.h" #include "butil/fd_guard.h" #include "butil/fd_utility.h" #include "butil/iobuf.h" #include "butil/sys_byteorder.h" #include "butil/files/temp_file.h" #include "brpc/acceptor.h" #include "brpc/channel.h" #include "brpc/controller.h" #include "brpc/server.h" #include "brpc/socket.h" #include "brpc/errno.pb.h" #include "brpc/parallel_channel.h" #include "brpc/selective_channel.h" #include "brpc/rdma/block_pool.h" #include "brpc/rdma/rdma_endpoint.h" #include "brpc/rdma/rdma_helper.h" #include "echo.pb.h" static const int PORT = 8713; using namespace brpc; namespace brpc { DECLARE_int64(socket_max_unwritten_bytes); DECLARE_bool(log_idle_connection_close); DEFINE_bool(rdma_test_enable, false, "Enable tests requring rdma runtime."); namespace rdma { struct HelloMessage { void Serialize(void* data) const; void Deserialize(void* data); uint16_t msg_len; uint16_t hello_ver; uint16_t impl_ver; uint16_t block_size; uint16_t sq_size; uint16_t rq_size; ibv_gid gid; uint32_t qp_num; }; DECLARE_bool(rdma_trace_verbose); DECLARE_int32(rdma_memory_pool_max_regions); extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int); extern int (*IbvDestroyCq)(ibv_cq*); extern ibv_qp* (*IbvCreateQp)(ibv_pd*, ibv_qp_init_attr*); extern int (*IbvModifyQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask); extern int (*IbvQueryQp)(ibv_qp*, ibv_qp_attr*, ibv_qp_attr_mask, ibv_qp_init_attr*); extern int (*IbvDestroyQp)(ibv_qp*); extern butil::atomic g_rdma_available; extern bool g_skip_rdma_init; } } static std::string g_ip = "127.0.0.1"; static butil::EndPoint g_ep; class MyEchoService : public ::test::EchoService { void Echo(google::protobuf::RpcController* cntl_base, const ::test::EchoRequest* req, ::test::EchoResponse* res, google::protobuf::Closure* done) { Controller* cntl = static_cast(cntl_base); ClosureGuard done_guard(done); if (req->server_fail()) { cntl->SetFailed(req->server_fail(), "Server fail1"); cntl->SetFailed(req->server_fail(), "Server fail2"); return; } if (req->close_fd()) { usleep(1); LOG(INFO) << "close fd..."; cntl->CloseConnection("Close connection according to request"); return; } if (req->sleep_us() > 0) { LOG(INFO) << "sleep " << req->sleep_us() << "us..."; bthread_usleep(req->sleep_us()); } res->set_message(req->message()); if (req->code() != 0) { res->add_code_list(req->code()); } cntl->response_attachment().append(cntl->request_attachment()); } }; class RdmaTest : public ::testing::Test { protected: RdmaTest() { butil::ip_t ip; EXPECT_EQ(0, butil::str2ip(g_ip.c_str(), &ip)); butil::EndPoint ep(ip, PORT); g_ep = ep; EXPECT_EQ(0, _server_list.save(butil::endpoint2str(g_ep).c_str())); _naming_url = std::string("File://") + _server_list.fname(); _server.AddService(&_svc, SERVER_DOESNT_OWN_SERVICE); } ~RdmaTest() { } virtual void SetUp() { } virtual void TearDown() { rdma::DumpMemoryPoolInfo(std::cout); } private: void StartServer(bool use_rdma = true) { ServerOptions options; options.use_rdma = use_rdma; options.idle_timeout_sec = 5; options.max_concurrency = 0; options.internal_port = -1; EXPECT_EQ(0, _server.Start(PORT, &options)); } void StopServer() { _server.Stop(0); _server.Join(); } Socket* GetSocketFromServer(size_t index) { std::vector sids; _server._am->ListConnections(&sids); if (index >= sids.size()) { return NULL; } SocketUniquePtr s; if (Socket::Address(sids[index], &s) == 0) { return s.get(); } return NULL; } butil::TempFile _server_list; std::string _naming_url; Server _server; MyEchoService _svc; }; TEST_F(RdmaTest, client_close_before_hello_send) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd >= 0); ASSERT_EQ(0, connect(sockfd, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg Socket* s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); close(sockfd); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd >= 0); ASSERT_EQ(0, connect(sockfd, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg Socket* s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); uint8_t data[38]; memcpy(data, "PRPC", 4); // send as normal baidu_std protocol memset(data + 4, 0, 32); ASSERT_EQ(38, write(sockfd, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); StopServer(); } TEST_F(RdmaTest, client_close_during_hello_send) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; uint8_t data[8]; butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RD", 2); ASSERT_EQ(2, write(sockfd1, data, 2)); // break in magic str usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state); close(sockfd1); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); ASSERT_EQ(4, write(sockfd2, data, 4)); // break after magic str usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state); close(sockfd2); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd3(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd3 >= 0); ASSERT_EQ(0, connect(sockfd3, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); memset(data + 4, 0, 4); ASSERT_EQ(8, write(sockfd3, data, 8)); // break after magic str usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_HELLO_WAIT, s->_rdma_ep->_state); close(sockfd3); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_hello_msg_invalid_len) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; uint8_t data[38]; butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); memset(data + 4, 0, 34); ASSERT_EQ(38, write(sockfd1, data, 38)); // write invalid length usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); uint16_t len = butil::HostToNet16(35); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); ASSERT_EQ(38, write(sockfd2, data, 38)); // write invalid length usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_hello_msg_invalid_version) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; uint8_t data[38]; uint16_t len = butil::HostToNet16(38); uint16_t ver = butil::HostToNet16(1); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); memcpy(data + 6, &ver, 2); // hello_ver == 1, impl_ver == 0 ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); memset(data, 0, 4); ASSERT_EQ(4, write(sockfd1, data, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); close(sockfd1); usleep(100000); ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); memcpy(data, "RDMA", 4); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); memcpy(data + 8, &ver, 2); // hello_ver == 0, impl_ver == 1 ASSERT_EQ(38, write(sockfd2, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); uint32_t flag = butil::HostToNet32(1); ASSERT_EQ(4, write(sockfd2, &flag, 4)); usleep(100000); ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_hello_msg_invalid_sq_rq_block_size) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; uint8_t data[38]; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 10; msg.rq_size = 16; msg.block_size = 8192; memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); memset(data, 0, 4); ASSERT_EQ(4, write(sockfd1, data, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); close(sockfd1); msg.sq_size = 16; msg.rq_size = 10; msg.block_size = 8192; memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd2, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); memset(data, 0, 4); ASSERT_EQ(4, write(sockfd1, data, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); close(sockfd2); msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 1000; memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd3(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd3 >= 0); ASSERT_EQ(0, connect(sockfd3, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd3, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); memset(data, 0, 4); ASSERT_EQ(4, write(sockfd3, data, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); StopServer(); } TEST_F(RdmaTest, client_close_after_qp_build) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; uint8_t data[38]; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); close(sockfd1); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_close_during_ack_send) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; uint8_t data[38]; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); uint32_t flags = butil::HostToNet32(0); ASSERT_EQ(2, write(sockfd1, &flags, 2)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); close(sockfd1); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_close_after_ack_send) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; uint8_t data[38]; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); uint32_t flags = butil::HostToNet32(0); ASSERT_EQ(4, write(sockfd1, &flags, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); close(sockfd1); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd2, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); flags = butil::HostToNet32(1); ASSERT_EQ(4, write(sockfd2, &flags, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); close(sockfd2); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) { StartServer(); sockaddr_in addr; bzero((char*)&addr, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(PORT); Socket* s = NULL; rdma::HelloMessage msg; uint8_t data[38]; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); butil::fd_guard sockfd1(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd1 >= 0); ASSERT_EQ(0, connect(sockfd1, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd1, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); uint32_t flags = butil::HostToNet32(0); ASSERT_EQ(4, write(sockfd1, &flags, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, write(sockfd1, &flags, 4)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); close(sockfd1); usleep(100000); // wait for server to handle the msg ASSERT_EQ(NULL, GetSocketFromServer(0)); butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0)); ASSERT_TRUE(sockfd2 >= 0); ASSERT_EQ(0, connect(sockfd2, (sockaddr*)&addr, sizeof(sockaddr))); usleep(100000); // wait for server to handle the msg s = GetSocketFromServer(0); ASSERT_EQ(rdma::RdmaEndpoint::UNINIT, s->_rdma_ep->_state); ASSERT_EQ(38, write(sockfd2, data, 38)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::S_ACK_WAIT, s->_rdma_ep->_state); flags = butil::HostToNet32(1); ASSERT_EQ(4, write(sockfd2, &flags, 4)); usleep(100000); // wait for server to handle the msg ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(4, write(sockfd1, &flags, 4)); usleep(100000); ASSERT_EQ(NULL, GetSocketFromServer(0)); StopServer(); } TEST_F(RdmaTest, server_miss_before_hello_send) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_close_before_hello_send) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); close(acc_fd); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(EEOF, cntl.ErrorCode()); } TEST_F(RdmaTest, server_miss_during_magic_str) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); ASSERT_EQ(2, write(acc_fd, "RD", 2)); usleep(100000); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_close_during_magic_str) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); ASSERT_EQ(2, write(acc_fd, "RD", 2)); close(acc_fd); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(EEOF, cntl.ErrorCode()); } TEST_F(RdmaTest, server_hello_invalid_magic_str) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); ASSERT_EQ(4, write(acc_fd, "ABCD", 4)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(EPROTO, cntl.ErrorCode()); } TEST_F(RdmaTest, server_miss_during_hello_msg) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); ASSERT_EQ(4, write(acc_fd, "RDMA", 4)); ASSERT_EQ(2, write(acc_fd, "00", 2)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_close_during_hello_msg) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); ASSERT_EQ(4, write(acc_fd, "RDMA", 4)); ASSERT_EQ(2, write(acc_fd, "00", 2)); close(acc_fd); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(EEOF, cntl.ErrorCode()); } TEST_F(RdmaTest, server_hello_invalid_msg_len) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); memcpy(data, "RDMA", 4); uint16_t len = butil::HostToNet16(35); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FAILED, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(EPROTO, cntl.ErrorCode()); } TEST_F(RdmaTest, server_hello_invalid_version) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); memcpy(data, "RDMA", 4); uint16_t len = butil::HostToNet16(38); memcpy(data + 4, &len, 2); memset(data + 6, 0, 32); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); uint32_t* tmp = (uint32_t*)data; ASSERT_EQ(0, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_hello_invalid_sq_rq_size) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); rdma::HelloMessage msg; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 0; msg.rq_size = 0; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); uint32_t* tmp = (uint32_t*)data; ASSERT_EQ(0, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_miss_after_ack) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); rdma::HelloMessage msg; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); uint32_t* tmp = (uint32_t*)data; ASSERT_EQ(1, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); } TEST_F(RdmaTest, server_close_after_ack) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); rdma::HelloMessage msg; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); uint32_t* tmp = (uint32_t*)data; ASSERT_EQ(1, butil::NetToHost32(*tmp)); close(acc_fd); bthread_id_join(cntl.call_id()); ASSERT_EQ(EEOF, cntl.ErrorCode()); } TEST_F(RdmaTest, server_send_data_on_tcp_after_ack) { butil::fd_guard sockfd(butil::tcp_listen(g_ep)); EXPECT_TRUE(sockfd >= 0); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::C_HELLO_WAIT, s->_rdma_ep->_state); butil::fd_guard acc_fd(accept(sockfd, NULL, NULL)); ASSERT_TRUE(acc_fd >= 0); uint8_t data[38]; ASSERT_EQ(38, read(acc_fd, data, 38)); rdma::HelloMessage msg; msg.msg_len = 38; msg.hello_ver = 1; msg.impl_ver = 1; msg.sq_size = 16; msg.rq_size = 16; msg.block_size = 8192; msg.qp_num = 0; msg.gid = rdma::GetRdmaGid(); memcpy(data, "RDMA", 4); msg.Serialize(data + 4); ASSERT_EQ(38, write(acc_fd, data, 38)); usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(38, write(acc_fd, data, 38)); bthread_id_join(cntl.call_id()); ASSERT_EQ(EPROTO, cntl.ErrorCode()); } TEST_F(RdmaTest, try_global_disable_rdma) { StartServer(); rdma::g_rdma_available.store(false, butil::memory_order_relaxed); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); req.set_sleep_us(200000); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); bthread_id_join(cntl.call_id()); ASSERT_EQ(0, cntl.ErrorCode()); StopServer(); rdma::g_rdma_available.store(true, butil::memory_order_relaxed); } TEST_F(RdmaTest, server_option_invalid) { Server server; ServerOptions options; options.use_rdma = true; // rtmp and rdma are incompatible options.rtmp_service = (RtmpService*)1; ASSERT_EQ(-1, server.Start(PORT, &options)); // nshead and rdma are incompatible options.rtmp_service = NULL; options.nshead_service = (NsheadService*)1; ASSERT_EQ(-1, server.Start(PORT, &options)); // mongo and rdma are incompatible options.nshead_service = NULL; options.mongo_service_adaptor = (MongoServiceAdaptor*)1; ASSERT_EQ(-1, server.Start(PORT, &options)); // ssl and rdma are incompatible options.mongo_service_adaptor = NULL; options.mutable_ssl_options()->default_cert.certificate = "test"; ASSERT_EQ(-1, server.Start(PORT, &options)); } TEST_F(RdmaTest, channel_option_invalid) { Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; // rtmp and rdma are incompatible chan_options.protocol = "rtmp"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); chan_options.protocol = "streaming_rpc"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // nshead and rdma are incompatible chan_options.protocol = "nshead"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); chan_options.protocol = "nshead_mcpack"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // nova_pbrpc and rdma are incompatible chan_options.protocol = "nova_pbrpc"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // public_pbrpc and rdma are incompatible chan_options.protocol = "public_pbrpc"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // redis and rdma are incompatible chan_options.protocol = "redis"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // memcache and rdma are incompatible chan_options.protocol = "memcache"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // ubrpc and rdma are incompatible chan_options.protocol = "ubrpc_compack"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // itp and rdma are incompatible chan_options.protocol = "itp"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // esp and rdma are incompatible chan_options.protocol = "esp"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // hulu_pbrpc and rdma are incompatible chan_options.protocol = "hulu_pbrpc"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // sofa_pbrpc and rdma are incompatible chan_options.protocol = "sofa_pbrpc"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // http and rdma are incompatible chan_options.protocol = "http"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); // ssl and rdma are incompatible chan_options.protocol = "baidu_std"; chan_options.mutable_ssl_options()->sni_name = "test"; ASSERT_EQ(-1, channel.Init(g_ep, &chan_options)); } TEST_F(RdmaTest, rdma_client_to_rdma_server) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); bthread_id_join(cntl.call_id()); ASSERT_EQ(0, cntl.ErrorCode()); StopServer(); } TEST_F(RdmaTest, tcp_client_to_tcp_server) { StartServer(false); Channel channel; ChannelOptions chan_options; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); bthread_id_join(cntl.call_id()); ASSERT_EQ(0, cntl.ErrorCode()); StopServer(); } TEST_F(RdmaTest, tcp_client_to_rdma_server) { StartServer(); Channel channel; ChannelOptions chan_options; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); bthread_id_join(cntl.call_id()); ASSERT_EQ(0, cntl.ErrorCode()); StopServer(); } TEST_F(RdmaTest, rdma_client_to_tcp_server) { StartServer(false); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); bthread_id_join(cntl.call_id()); ASSERT_EQ(EEOF, cntl.ErrorCode()); StopServer(); } static const int RPC_NUM = 1024; void DumpRdmaEndpointInfo(Socket* client, Socket* server) { std::cout << std::endl << "client:"; client->_rdma_ep->DebugInfo(std::cout); std::cout << std::endl << "server:"; server->_rdma_ep->DebugInfo(std::cout); } TEST_F(RdmaTest, send_rpcs_in_one_qp) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 3000; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; LOG(INFO) << "send 0 attachment"; for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); Socket* m = GetSocketFromServer(0); DumpRdmaEndpointInfo(s.get(), m); } ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; } LOG(INFO) << "send 4KB attachment"; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { cntl[i].Reset(); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); Socket* m = GetSocketFromServer(0); DumpRdmaEndpointInfo(s.get(), m); } ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; } LOG(INFO) << "send 1MB attachment"; attach.resize(1048576); for (int i = 0; i < RPC_NUM; ++i) { cntl[i].Reset(); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); Socket* m = GetSocketFromServer(0); DumpRdmaEndpointInfo(s.get(), m); } ASSERT_TRUE(0 == cntl[i].ErrorCode() || EOVERCROWDED == cntl[i].ErrorCode()) << "req[" << i << "]"; } StopServer(); } TEST_F(RdmaTest, send_rpc_in_many_qp) { if (!FLAGS_rdma_test_enable) { return; } Server server[100]; MyEchoService svc[100]; int num = 100; for (int i = 0; i < num; ++i) { ServerOptions options; options.use_rdma = true; options.idle_timeout_sec = 1; options.max_concurrency = 0; options.internal_port = -1; server[i].AddService(&svc[i], SERVER_DOESNT_OWN_SERVICE); EXPECT_EQ(0, server[i].Start(i + 8000, &options)); } int port = 0; butil::IOBuf attach; attach.resize(4096); ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; Channel channel[RPC_NUM]; Server* svr[RPC_NUM]; Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::ip_t ip; butil::str2ip(g_ip.c_str(), &ip); for (int i = 0; i < RPC_NUM; ++i) { svr[i] = &server[i % num]; butil::EndPoint ep(ip, 8000 + ((port++) % num)); ASSERT_EQ(0, channel[i].Init(ep, &chan_options)); req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel[i]).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); std::vector sids; svr[i]->_am->ListConnections(&sids); for (size_t i = 0; i < sids.size(); ++i) { SocketUniquePtr m; ASSERT_EQ(0, Socket::AddressFailedAsWell(sids[i], &m)); DumpRdmaEndpointInfo(s.get(), m.get()); } } ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; } for (int i = 0; i < num; ++i) { server[i].Stop(0); server[i].Join(); } } TEST_F(RdmaTest, send_rpcs_as_pooled_connection) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 30000; // it may very slow chan_options.timeout_ms = 30000; chan_options.max_retry = 0; chan_options.connection_type = "pooled"; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); Socket* m = GetSocketFromServer(0); DumpRdmaEndpointInfo(s.get(), m); } ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; } StopServer(); } TEST_F(RdmaTest, send_rpcs_as_short_connection) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 30000; // it may very slow chan_options.timeout_ms = 30000; chan_options.max_retry = 0; chan_options.connection_type = "short"; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (cntl[i].ErrorCode() == ERPCTIMEDOUT) { SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl[i]._single_server_id, &s)); Socket* m = GetSocketFromServer(0); DumpRdmaEndpointInfo(s.get(), m); } ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; } StopServer(); } TEST_F(RdmaTest, server_stop_during_rpc) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 3000; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); if (i == 0) StopServer(); int error_code = cntl[i].ErrorCode(); ASSERT_TRUE(error_code == 0 || error_code == EEOF || error_code == ELOGOFF || error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code; } } TEST_F(RdmaTest, server_close_during_rpc) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 3000; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); if (i == RPC_NUM / 2) { req[i].set_close_fd(true); } google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); int error_code = cntl[i].ErrorCode(); ASSERT_TRUE(error_code == 0 || error_code == EEOF || error_code == EFAILEDSOCKET || error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code; } StopServer(); } TEST_F(RdmaTest, client_close_during_rpc) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 3000; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; attach.resize(4096); for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } cntl[0].CloseConnection("Close connection"); for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); int error_code = cntl[i].ErrorCode(); ASSERT_TRUE(error_code == 0 || error_code == ECLOSE || error_code == EHOSTDOWN) << "req[" << i << "]: " << error_code; } StopServer(); } TEST_F(RdmaTest, verbs_error_handling) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); req.set_sleep_us(200000); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, done); usleep(100000); // wait for rdma handshake complete SocketUniquePtr s; ASSERT_EQ(0, Socket::Address(cntl._single_server_id, &s)); ibv_send_wr wr; memset(&wr, 0, sizeof(wr)); ibv_sge sge; void* buf = malloc(8192); sge.addr = (uint64_t)buf; sge.length = 8192; sge.lkey = 1; // incorrect lkey wr.sg_list = &sge; wr.num_sge = 1; ibv_send_wr* bad = NULL; ibv_post_send(s->_rdma_ep->_resource->qp, &wr, &bad); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERDMA, cntl.ErrorCode()); free(buf); StopServer(); } TEST_F(RdmaTest, rdma_use_parallel_channel) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); const size_t NCHANS = 8; Channel subchans[NCHANS]; ParallelChannel channel; ChannelOptions opts; opts.use_rdma = true; for (size_t i = 0; i < NCHANS; ++i) { ASSERT_EQ(0, subchans[i].Init(_naming_url.c_str(), "rR", &opts)); ASSERT_EQ(0, channel.AddChannel( &subchans[i], DOESNT_OWN_CHANNEL, NULL, NULL)); } Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, NULL); ASSERT_EQ(0, cntl.ErrorCode()); ASSERT_EQ(NCHANS, (size_t)cntl.sub_count()); StopServer(); } TEST_F(RdmaTest, rdma_use_selective_channel) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); const size_t NCHANS = 8; SelectiveChannel channel; ChannelOptions opts; opts.use_rdma = true; ASSERT_EQ(0, channel.Init("rr", &opts)); for (size_t i = 0; i < NCHANS; ++i) { Channel* subchan = new Channel; ASSERT_EQ(0, subchan->Init(_naming_url.c_str(), "rR", &opts)); ASSERT_EQ(0, channel.AddChannel(subchan, NULL)); } Controller cntl; test::EchoRequest req; test::EchoResponse res; req.set_message(__FUNCTION__); ::test::EchoService::Stub(&channel).Echo(&cntl, &req, &res, NULL); ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); ASSERT_EQ(1, cntl.sub_count()); StopServer(); } static void MockFree(void* buf) { } TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 500; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf attach; void* data = malloc(4096);; attach.append_user_data(data, 4096, NULL); req[0].set_message(__FUNCTION__); cntl[0].request_attachment().append(attach); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done); bthread_id_join(cntl[0].call_id()); ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode()); attach.clear(); sleep(2); // wait for client recover from EHOSTDOWN cntl[0].Reset(); char* mr[2 * RPC_NUM]; uint32_t lkey[2 * RPC_NUM]; for (size_t i = 0; i < RPC_NUM; ++i) { mr[2 * i] = (char*)malloc(4096); memset(mr[2 * i], i % 100, 4096); lkey[2 * i] = rdma::RegisterMemoryForRdma(mr[2 * i], 4096); ASSERT_TRUE(lkey[2 * i] != 0); cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i] + i, 4096 - i, MockFree, lkey[2 * i]); mr[2 * i + 1] = (char*)malloc(4096); memset(mr[2 * i + 1], i % 100, 4096); lkey[2 * i + 1] = rdma::RegisterMemoryForRdma(mr[2 * i + 1], 4096); ASSERT_TRUE(lkey[2 * i + 1] != 0); cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i + 1] + i, 4096 - i, MockFree, lkey[2 * i + 1]); req[i].set_message(__FUNCTION__); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (size_t i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; rdma::DeregisterMemoryForRdma(mr[i]); ASSERT_EQ(2 * (4096 - i), cntl[i].response_attachment().size()); char tmp[8192]; cntl[i].response_attachment().copy_to(tmp, 2 * (4096 - i)); ASSERT_EQ(0, memcmp(mr[2 * i] + i, tmp, 4096 - i)); ASSERT_EQ(0, memcmp(mr[2 * i + 1] + i, tmp + 4096 - i, 4096 - i)); free(mr[2 * i]); free(mr[2 * i + 1]); } StopServer(); } TEST_F(RdmaTest, try_memory_pool_empty) { if (!FLAGS_rdma_test_enable) { return; } StartServer(); Channel channel; ChannelOptions chan_options; chan_options.use_rdma = true; chan_options.connect_timeout_ms = 500; chan_options.timeout_ms = 60000; chan_options.max_retry = 0; ASSERT_EQ(0, channel.Init(g_ep, &chan_options)); Controller cntl[RPC_NUM]; test::EchoRequest req[RPC_NUM]; test::EchoResponse res[RPC_NUM]; butil::IOBuf iobuf[RPC_NUM]; for (int i = 0; i < 1024; ++i) { if (iobuf[i].resize(1048576 * 8)) { // 8MB for each iobuf break; } } for (int i = 0; i < RPC_NUM; ++i) { req[i].set_message(__FUNCTION__); cntl[i].request_attachment().append(iobuf[i]); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } for (int i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); } StopServer(); } #endif // if BRPC_WITH_RDMA int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); #if BRPC_WITH_RDMA rdma::FLAGS_rdma_trace_verbose = true; rdma::FLAGS_rdma_memory_pool_max_regions = 2; FLAGS_log_idle_connection_close = true; if (!FLAGS_rdma_test_enable) { // skip UT requiring rdma runtime environment rdma::g_rdma_available.store(true, butil::memory_order_relaxed); rdma::g_skip_rdma_init = true; } #endif // if BRPC_WITH_RDMA return RUN_ALL_TESTS(); }