brpc/include/butil/details/extended_endpoint.hpp
2022-12-14 19:05:52 +08:00

384 lines
13 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef BUTIL_DETAILS_EXTENDED_ENDPOINT_H
#define BUTIL_DETAILS_EXTENDED_ENDPOINT_H
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/un.h>
#include <mutex>
#include <unordered_set>
#include "butil/endpoint.h"
#include "butil/logging.h"
#include "butil/strings/string_piece.h"
#include "butil/resource_pool.h"
#include "butil/memory/singleton_on_pthread_once.h"
namespace butil {
namespace details {
#if __cplusplus >= 201103L
static_assert(sizeof(EndPoint) == sizeof(EndPoint::ip) + sizeof(EndPoint::port),
"EndPoint size mismatch with the one in POD-style, may cause ABI problem");
#endif
// For ipv6/unix socket address.
//
// We have to keep butil::EndPoint ABI compatible because it is used so widely, and the size of butil::EndPoint is
// too small to store more information such as ipv6 address.
// We store enough information about endpoint in such tiny struct by putting real things in another big object
// holding by ResourcePool. The EndPoint::ip saves ResourceId, while EndPoint::port denotes if the EndPoint object
// is an old style ipv4 endpoint.
// Note that since ResourcePool has been implemented in bthread, we copy it into this repo and change its namespace to
// butil::details. Those two headers will not be published.
// If EndPoint.port equals to this value, we should get the extended endpoint in resource pool.
const static int EXTENDED_ENDPOINT_PORT = 123456789;
class ExtendedEndPoint;
// A global unordered set to dedup ExtendedEndPoint
// ExtendedEndPoints which have same ipv6/unix socket address must have same id,
// so that user can simply use the value of EndPoint for comparision.
class GlobalEndPointSet {
public:
ExtendedEndPoint* insert(ExtendedEndPoint* p);
void erase(ExtendedEndPoint* p);
static GlobalEndPointSet* instance() {
return ::butil::get_leaky_singleton<GlobalEndPointSet>();
}
private:
struct Hash {
size_t operator()(ExtendedEndPoint* const& p) const;
};
struct Equals {
bool operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const;
};
typedef std::unordered_set<ExtendedEndPoint*, Hash, Equals> SetType;
SetType _set;
std::mutex _mutex;
};
class ExtendedEndPoint {
public:
// Construct ExtendedEndPoint.
// User should use create() functions to get ExtendedEndPoint instance.
ExtendedEndPoint(void) {
_ref_count.store(0, butil::memory_order_relaxed);
_u.sa.sa_family = AF_UNSPEC;
}
public:
// Create ExtendedEndPoint.
// If creation is successful, create()s will embed the ExtendedEndPoint instance in the given EndPoint*,
// and return it as well. Or else, the given EndPoint* won't be touched.
//
// The format of the parameter is inspired by nginx.
// Valid forms are:
// - ipv6
// without port: [2400:da00::3b0b]
// with port: [2400:da00::3b0b]:8080
// - unix domain socket
// abslute path : unix:/path/to/file.sock
// relative path: unix:path/to/file.sock
static ExtendedEndPoint* create(StringPiece sp, EndPoint* ep) {
sp.trim_spaces();
if (sp.empty()) {
return NULL;
}
if (sp[0] == '[') {
size_t colon_pos = sp.find(']');
if (colon_pos == StringPiece::npos || colon_pos == 1 /* [] is invalid */ || ++colon_pos >= sp.size()) {
return NULL;
}
StringPiece port_sp = sp.substr(colon_pos);
if (port_sp.size() < 2 /* colon and at least one integer */ || port_sp[0] != ':') {
return NULL;
}
port_sp.remove_prefix(1); // remove `:'
if (port_sp.size() > 5) { // max 65535
return NULL;
}
char buf[6];
buf[port_sp.copy(buf, port_sp.size())] = '\0';
char* end = NULL;
int port = ::strtol(buf, &end, 10 /* base */);
if (end != buf + port_sp.size()) {
return NULL;
}
return create(sp.substr(0, colon_pos), port, ep);
} else if (sp.starts_with("unix:")) {
return create(sp, EXTENDED_ENDPOINT_PORT, ep);
}
return NULL;
}
static ExtendedEndPoint* create(StringPiece sp, int port, EndPoint* ep) {
sp.trim_spaces();
if (sp.empty()) {
return NULL;
}
ExtendedEndPoint* eep = NULL;
if (sp[0] == '[' && port >= 0 && port <= 65535) {
if (sp.back() != ']' || sp.size() == 2 || sp.size() - 2 >= INET6_ADDRSTRLEN) {
return NULL;
}
char buf[INET6_ADDRSTRLEN];
buf[sp.copy(buf, sp.size() - 2 /* skip `[' and `]' */, 1 /* skip `[' */)] = '\0';
in6_addr addr;
if (inet_pton(AF_INET6, buf, &addr) != 1 /* succ */) {
return NULL;
}
eep = new_extended_endpoint(AF_INET6);
if (eep) {
eep->_u.in6.sin6_addr = addr;
eep->_u.in6.sin6_port = htons(port);
eep->_u.in6.sin6_flowinfo = 0u;
eep->_u.in6.sin6_scope_id = 0u;
eep->_socklen = sizeof(_u.in6);
#if defined(OS_MACOSX)
eep->_u.in6.sin6_len = eep->_socklen;
#endif
}
} else if (sp.starts_with("unix:")) { // ignore port
sp.remove_prefix(5); // remove `unix:'
if (sp.empty() || sp.size() >= UDS_PATH_SIZE) {
return NULL;
}
eep = new_extended_endpoint(AF_UNIX);
if (eep) {
int size = sp.copy(eep->_u.un.sun_path, sp.size());
eep->_u.un.sun_path[size] = '\0';
eep->_socklen = offsetof(sockaddr_un, sun_path) + size + 1;
#if defined(OS_MACOSX)
eep->_u.un.sun_len = eep->_socklen;
#endif
}
}
if (eep) {
eep = dedup(eep);
eep->embed_to(ep);
}
return eep;
}
static ExtendedEndPoint* create(sockaddr_storage* ss, socklen_t size, EndPoint* ep) {
ExtendedEndPoint* eep = NULL;
if (ss->ss_family == AF_INET6 || ss->ss_family == AF_UNIX) {
eep = new_extended_endpoint(ss->ss_family);
}
if (eep) {
memcpy(&eep->_u.ss, ss, size);
eep->_socklen = size;
if (ss->ss_family == AF_UNIX && size == offsetof(sockaddr_un, sun_path)) {
// See unix(7): When the address of an unnamed socket is returned,
// its length is sizeof(sa_family_t), and sun_path should not be inspected.
eep->_u.un.sun_path[0] = '\0';
}
eep = dedup(eep);
eep->embed_to(ep);
}
return eep;
}
// Get ExtendedEndPoint instance from EndPoint
static ExtendedEndPoint* address(const EndPoint& ep) {
if (!is_extended(ep)) {
return NULL;
}
::butil::ResourceId<ExtendedEndPoint> id;
id.value = ep.ip.s_addr;
ExtendedEndPoint* eep = ::butil::address_resource<ExtendedEndPoint>(id);
CHECK(eep) << "fail to address ExtendedEndPoint from EndPoint";
return eep;
}
// Check if an EndPoint has embedded ExtendedEndPoint
static bool is_extended(const butil::EndPoint& ep) {
return ep.port == EXTENDED_ENDPOINT_PORT;
}
private:
friend class GlobalEndPointSet;
static GlobalEndPointSet* global_set() {
return GlobalEndPointSet::instance();
}
static ExtendedEndPoint* new_extended_endpoint(sa_family_t family) {
::butil::ResourceId<ExtendedEndPoint> id;
ExtendedEndPoint* eep = ::butil::get_resource(&id);
if (eep) {
int64_t old_ref = eep->_ref_count.load(butil::memory_order_relaxed);
CHECK(old_ref == 0) << "new ExtendedEndPoint has reference " << old_ref;
CHECK(eep->_u.sa.sa_family == AF_UNSPEC) << "new ExtendedEndPoint has family " << eep->_u.sa.sa_family << " set";
eep->_ref_count.store(1, butil::memory_order_relaxed);
eep->_id = id;
eep->_u.sa.sa_family = family;
}
return eep;
}
void embed_to(EndPoint* ep) const {
CHECK(0 == _id.value >> 32) << "ResourceId beyond index";
ep->reset();
ep->ip = ip_t{static_cast<uint32_t>(_id.value)};
ep->port = EXTENDED_ENDPOINT_PORT;
}
static ExtendedEndPoint* dedup(ExtendedEndPoint* eep) {
eep->_hash = std::hash<std::string>()(std::string((const char*)&eep->_u, eep->_socklen));
ExtendedEndPoint* first_eep = global_set()->insert(eep);
if (first_eep != eep) {
eep->_ref_count.store(0, butil::memory_order_relaxed);
eep->_u.sa.sa_family = AF_UNSPEC;
::butil::return_resource(eep->_id);
}
return first_eep;
}
public:
void dec_ref(void) {
int64_t old_ref = _ref_count.fetch_sub(1, butil::memory_order_relaxed);
CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref;
if (old_ref == 1) {
global_set()->erase(this);
_u.sa.sa_family = AF_UNSPEC;
::butil::return_resource(_id);
}
}
void inc_ref(void) {
int64_t old_ref = _ref_count.fetch_add(1, butil::memory_order_relaxed);
CHECK(old_ref >= 1) << "ExtendedEndPoint has unexpected reference " << old_ref;
}
sa_family_t family(void) const {
return _u.sa.sa_family;
}
int to(sockaddr_storage* ss) const {
memcpy(ss, &_u.ss, _socklen);
return _socklen;
}
void to(EndPointStr* ep_str) const {
if (_u.sa.sa_family == AF_UNIX) {
snprintf(ep_str->_buf, sizeof(ep_str->_buf), "unix:%s", _u.un.sun_path);
} else if (_u.sa.sa_family == AF_INET6) {
char buf[INET6_ADDRSTRLEN] = {0};
const char* ret = inet_ntop(_u.sa.sa_family, &_u.in6.sin6_addr, buf, sizeof(buf));
CHECK(ret) << "fail to do inet_ntop";
snprintf(ep_str->_buf, sizeof(ep_str->_buf), "[%s]:%d", buf, ntohs(_u.in6.sin6_port));
} else {
CHECK(0) << "family " << _u.sa.sa_family << " not supported";
}
}
int to_hostname(char* host, size_t host_len) const {
if (_u.sa.sa_family == AF_UNIX) {
snprintf(host, host_len, "unix:%s", _u.un.sun_path);
return 0;
} else if (_u.sa.sa_family == AF_INET6) {
sockaddr_in6 sa = _u.in6;
if (getnameinfo((const sockaddr*) &sa, sizeof(sa), host, host_len, NULL, 0, NI_NAMEREQD) != 0) {
return -1;
}
size_t len = ::strlen(host);
if (len + 1 < host_len) {
snprintf(host + len, host_len - len, ":%d", _u.in6.sin6_port);
}
return 0;
} else {
CHECK(0) << "family " << _u.sa.sa_family << " not supported";
return -1;
}
}
private:
static const size_t UDS_PATH_SIZE = sizeof(sockaddr_un::sun_path);
butil::atomic<int64_t> _ref_count;
butil::ResourceId<ExtendedEndPoint> _id;
size_t _hash; // pre-compute hash code of sockaddr for saving unordered_set query time
socklen_t _socklen; // valid data length of sockaddr
union {
sockaddr sa;
sockaddr_in6 in6;
sockaddr_un un;
sockaddr_storage ss;
} _u;
};
inline ExtendedEndPoint* GlobalEndPointSet::insert(ExtendedEndPoint* p) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _set.find(p);
if (it != _set.end()) {
if ((*it)->_ref_count.fetch_add(1, butil::memory_order_relaxed) == 0) {
// another thread is calling dec_ref(), do not reuse it
(*it)->_ref_count.fetch_sub(1, butil::memory_order_relaxed);
_set.erase(it);
_set.insert(p);
return p;
} else {
// the ExtendedEndPoint is valid, reuse it
return *it;
}
}
_set.insert(p);
return p;
}
inline void GlobalEndPointSet::erase(ExtendedEndPoint* p) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _set.find(p);
if (it == _set.end() || *it != p) {
// another thread has been erase it
return;
}
_set.erase(it);
}
inline size_t GlobalEndPointSet::Hash::operator()(ExtendedEndPoint* const& p) const {
return p->_hash;
}
inline bool GlobalEndPointSet::Equals::operator()(ExtendedEndPoint* const& p1, ExtendedEndPoint* const& p2) const {
return p1->_socklen == p2->_socklen
&& memcmp(&p1->_u, &p2->_u, p1->_socklen) == 0;
}
} // namespace details
} // namespace butil
#endif // BUTIL_DETAILS_EXTENDED_ENDPOINT_H