398 lines
12 KiB
C++
398 lines
12 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include <gtest/gtest.h>
|
|
#include "butil/atomicops.h"
|
|
#include "butil/time.h"
|
|
#include "butil/macros.h"
|
|
#include "butil/logging.h"
|
|
#include "bthread/butex.h"
|
|
#include "bthread/task_control.h"
|
|
#include "bthread/task_group.h"
|
|
#include "bthread/bthread.h"
|
|
#include "bthread/unstable.h"
|
|
|
|
namespace bthread {
|
|
extern butil::atomic<TaskControl*> g_task_control;
|
|
inline TaskControl* get_task_control() {
|
|
return g_task_control.load(butil::memory_order_consume);
|
|
}
|
|
} // namespace bthread
|
|
|
|
namespace {
|
|
TEST(ButexTest, wait_on_already_timedout_butex) {
|
|
uint32_t* butex = bthread::butex_create_checked<uint32_t>();
|
|
ASSERT_TRUE(butex);
|
|
timespec now;
|
|
ASSERT_EQ(0, clock_gettime(CLOCK_REALTIME, &now));
|
|
*butex = 1;
|
|
ASSERT_EQ(-1, bthread::butex_wait(butex, 1, &now));
|
|
ASSERT_EQ(ETIMEDOUT, errno);
|
|
}
|
|
|
|
void* sleeper(void* arg) {
|
|
bthread_usleep((uint64_t)arg);
|
|
return NULL;
|
|
}
|
|
|
|
void* joiner(void* arg) {
|
|
const long t1 = butil::gettimeofday_us();
|
|
for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
|
|
if (0 != bthread_join(*th, NULL)) {
|
|
LOG(FATAL) << "fail to join thread_" << th - (bthread_t*)arg;
|
|
}
|
|
long elp = butil::gettimeofday_us() - t1;
|
|
EXPECT_LE(labs(elp - (th - (bthread_t*)arg + 1) * 100000L), 15000L)
|
|
<< "timeout when joining thread_" << th - (bthread_t*)arg;
|
|
LOG(INFO) << "Joined thread " << *th << " at " << elp << "us ["
|
|
<< bthread_self() << "]";
|
|
}
|
|
for (bthread_t* th = (bthread_t*)arg; *th; ++th) {
|
|
EXPECT_EQ(0, bthread_join(*th, NULL));
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
struct A {
|
|
uint64_t a;
|
|
char dummy[0];
|
|
};
|
|
|
|
struct B {
|
|
uint64_t a;
|
|
};
|
|
|
|
|
|
TEST(ButexTest, with_or_without_array_zero) {
|
|
ASSERT_EQ(sizeof(B), sizeof(A));
|
|
}
|
|
|
|
|
|
TEST(ButexTest, join) {
|
|
const size_t N = 6;
|
|
const size_t M = 6;
|
|
bthread_t th[N+1];
|
|
bthread_t jth[M];
|
|
pthread_t pth[M];
|
|
for (size_t i = 0; i < N; ++i) {
|
|
bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
ASSERT_EQ(0, bthread_start_urgent(
|
|
&th[i], &attr, sleeper,
|
|
(void*)(100000L/*100ms*/ * (i + 1))));
|
|
}
|
|
th[N] = 0; // joiner will join tids in `th' until seeing 0.
|
|
for (size_t i = 0; i < M; ++i) {
|
|
ASSERT_EQ(0, bthread_start_urgent(&jth[i], NULL, joiner, th));
|
|
}
|
|
for (size_t i = 0; i < M; ++i) {
|
|
ASSERT_EQ(0, pthread_create(&pth[i], NULL, joiner, th));
|
|
}
|
|
|
|
for (size_t i = 0; i < M; ++i) {
|
|
ASSERT_EQ(0, bthread_join(jth[i], NULL))
|
|
<< "i=" << i << " error=" << berror();
|
|
}
|
|
for (size_t i = 0; i < M; ++i) {
|
|
ASSERT_EQ(0, pthread_join(pth[i], NULL));
|
|
}
|
|
}
|
|
|
|
|
|
struct WaiterArg {
|
|
int expected_result;
|
|
int expected_value;
|
|
butil::atomic<int> *butex;
|
|
const timespec *ptimeout;
|
|
};
|
|
|
|
void* waiter(void* arg) {
|
|
WaiterArg * wa = (WaiterArg*)arg;
|
|
const long t1 = butil::gettimeofday_us();
|
|
const int rc = bthread::butex_wait(
|
|
wa->butex, wa->expected_value, wa->ptimeout);
|
|
const long t2 = butil::gettimeofday_us();
|
|
if (rc == 0) {
|
|
EXPECT_EQ(wa->expected_result, 0) << bthread_self();
|
|
} else {
|
|
EXPECT_EQ(wa->expected_result, errno) << bthread_self();
|
|
}
|
|
LOG(INFO) << "after wait, time=" << (t2-t1) << "us";
|
|
return NULL;
|
|
}
|
|
|
|
TEST(ButexTest, sanity) {
|
|
const size_t N = 5;
|
|
WaiterArg args[N * 4];
|
|
pthread_t t1, t2;
|
|
butil::atomic<int>* b1 =
|
|
bthread::butex_create_checked<butil::atomic<int> >();
|
|
ASSERT_TRUE(b1);
|
|
bthread::butex_destroy(b1);
|
|
|
|
b1 = bthread::butex_create_checked<butil::atomic<int> >();
|
|
*b1 = 1;
|
|
ASSERT_EQ(0, bthread::butex_wake(b1));
|
|
|
|
WaiterArg *unmatched_arg = new WaiterArg;
|
|
unmatched_arg->expected_value = *b1 + 1;
|
|
unmatched_arg->expected_result = EWOULDBLOCK;
|
|
unmatched_arg->butex = b1;
|
|
unmatched_arg->ptimeout = NULL;
|
|
pthread_create(&t2, NULL, waiter, unmatched_arg);
|
|
bthread_t th;
|
|
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, unmatched_arg));
|
|
|
|
const timespec abstime = butil::seconds_from_now(1);
|
|
for (size_t i = 0; i < 4*N; ++i) {
|
|
args[i].expected_value = *b1;
|
|
args[i].butex = b1;
|
|
if ((i % 2) == 0) {
|
|
args[i].expected_result = 0;
|
|
args[i].ptimeout = NULL;
|
|
} else {
|
|
args[i].expected_result = ETIMEDOUT;
|
|
args[i].ptimeout = &abstime;
|
|
}
|
|
if (i < 2*N) {
|
|
pthread_create(&t1, NULL, waiter, &args[i]);
|
|
} else {
|
|
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, &args[i]));
|
|
}
|
|
}
|
|
|
|
sleep(2);
|
|
for (size_t i = 0; i < 2*N; ++i) {
|
|
ASSERT_EQ(1, bthread::butex_wake(b1));
|
|
}
|
|
ASSERT_EQ(0, bthread::butex_wake(b1));
|
|
sleep(1);
|
|
bthread::butex_destroy(b1);
|
|
}
|
|
|
|
|
|
struct ButexWaitArg {
|
|
int* butex;
|
|
int expected_val;
|
|
long wait_msec;
|
|
int error_code;
|
|
};
|
|
|
|
void* wait_butex(void* void_arg) {
|
|
ButexWaitArg* arg = static_cast<ButexWaitArg*>(void_arg);
|
|
const timespec ts = butil::milliseconds_from_now(arg->wait_msec);
|
|
int rc = bthread::butex_wait(arg->butex, arg->expected_val, &ts);
|
|
int saved_errno = errno;
|
|
if (arg->error_code) {
|
|
EXPECT_EQ(-1, rc);
|
|
EXPECT_EQ(arg->error_code, saved_errno);
|
|
} else {
|
|
EXPECT_EQ(0, rc);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
TEST(ButexTest, wait_without_stop) {
|
|
int* butex = bthread::butex_create_checked<int>();
|
|
*butex = 7;
|
|
butil::Timer tm;
|
|
const long WAIT_MSEC = 500;
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ETIMEDOUT };
|
|
bthread_t th;
|
|
|
|
tm.start();
|
|
ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
|
|
ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 250);
|
|
}
|
|
bthread::butex_destroy(butex);
|
|
}
|
|
|
|
TEST(ButexTest, stop_after_running) {
|
|
int* butex = bthread::butex_create_checked<int>();
|
|
*butex = 7;
|
|
butil::Timer tm;
|
|
const long WAIT_MSEC = 500;
|
|
const long SLEEP_MSEC = 10;
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
bthread_t th;
|
|
ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
|
|
|
|
tm.start();
|
|
ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
|
|
ASSERT_EQ(0, bthread_usleep(SLEEP_MSEC * 1000L));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
|
|
ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 25);
|
|
// ASSERT_TRUE(bthread::get_task_control()->
|
|
// timer_thread()._idset.empty());
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
}
|
|
bthread::butex_destroy(butex);
|
|
}
|
|
|
|
TEST(ButexTest, stop_before_running) {
|
|
int* butex = bthread::butex_create_checked<int>();
|
|
*butex = 7;
|
|
butil::Timer tm;
|
|
const long WAIT_MSEC = 500;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
|
|
bthread_t th;
|
|
ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
|
|
|
|
tm.start();
|
|
ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
bthread_flush();
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
|
|
ASSERT_LT(tm.m_elapsed(), 5);
|
|
// ASSERT_TRUE(bthread::get_task_control()->
|
|
// timer_thread()._idset.empty());
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
}
|
|
bthread::butex_destroy(butex);
|
|
}
|
|
|
|
void* join_the_waiter(void* arg) {
|
|
EXPECT_EQ(0, bthread_join((bthread_t)arg, NULL));
|
|
return NULL;
|
|
}
|
|
|
|
TEST(ButexTest, join_cant_be_wakeup) {
|
|
const long WAIT_MSEC = 100;
|
|
int* butex = bthread::butex_create_checked<int>();
|
|
*butex = 7;
|
|
butil::Timer tm;
|
|
ButexWaitArg arg = { butex, *butex, 1000, EINTR };
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
tm.start();
|
|
bthread_t th, th2;
|
|
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, wait_butex, &arg));
|
|
ASSERT_EQ(0, bthread_start_urgent(&th2, &attr, join_the_waiter, (void*)th));
|
|
ASSERT_EQ(0, bthread_stop(th2));
|
|
ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
|
|
ASSERT_TRUE(bthread::TaskGroup::exists(th));
|
|
ASSERT_TRUE(bthread::TaskGroup::exists(th2));
|
|
ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
ASSERT_EQ(0, bthread_join(th2, NULL));
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
ASSERT_LT(tm.m_elapsed(), WAIT_MSEC + 15);
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
ASSERT_EQ(EINVAL, bthread_stop(th2));
|
|
}
|
|
bthread::butex_destroy(butex);
|
|
}
|
|
|
|
TEST(ButexTest, stop_after_slept) {
|
|
butil::Timer tm;
|
|
const long SLEEP_MSEC = 100;
|
|
const long WAIT_MSEC = 10;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
tm.start();
|
|
bthread_t th;
|
|
ASSERT_EQ(0, bthread_start_urgent(
|
|
&th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
|
|
ASSERT_EQ(0, bthread_usleep(WAIT_MSEC * 1000L));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
|
|
ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
|
|
} else {
|
|
ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 15);
|
|
}
|
|
// ASSERT_TRUE(bthread::get_task_control()->
|
|
// timer_thread()._idset.empty());
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
}
|
|
}
|
|
|
|
TEST(ButexTest, stop_just_when_sleeping) {
|
|
butil::Timer tm;
|
|
const long SLEEP_MSEC = 100;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
|
|
tm.start();
|
|
bthread_t th;
|
|
ASSERT_EQ(0, bthread_start_urgent(
|
|
&th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L)));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
|
|
ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15);
|
|
} else {
|
|
ASSERT_LT(tm.m_elapsed(), 15);
|
|
}
|
|
// ASSERT_TRUE(bthread::get_task_control()->
|
|
// timer_thread()._idset.empty());
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
}
|
|
}
|
|
|
|
TEST(ButexTest, stop_before_sleeping) {
|
|
butil::Timer tm;
|
|
const long SLEEP_MSEC = 100;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
bthread_t th;
|
|
const bthread_attr_t attr =
|
|
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
|
|
|
|
tm.start();
|
|
ASSERT_EQ(0, bthread_start_background(&th, &attr, sleeper,
|
|
(void*)(SLEEP_MSEC*1000L)));
|
|
ASSERT_EQ(0, bthread_stop(th));
|
|
bthread_flush();
|
|
ASSERT_EQ(0, bthread_join(th, NULL));
|
|
tm.stop();
|
|
|
|
if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) {
|
|
ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 10);
|
|
} else {
|
|
ASSERT_LT(tm.m_elapsed(), 10);
|
|
}
|
|
// ASSERT_TRUE(bthread::get_task_control()->
|
|
// timer_thread()._idset.empty());
|
|
ASSERT_EQ(EINVAL, bthread_stop(th));
|
|
}
|
|
}
|
|
} // namespace
|