223 lines
5.4 KiB
C++
223 lines
5.4 KiB
C++
// Copyright (C) 2011-2013 Tim Blechmann
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See
|
|
// accompanying file LICENSE_1_0.txt or copy at
|
|
// http://www.boost.org/LICENSE_1_0.txt)
|
|
|
|
#include <boost/lockfree/spsc_queue.hpp>
|
|
#include <boost/thread.hpp>
|
|
|
|
#define BOOST_TEST_MAIN
|
|
#ifdef BOOST_LOCKFREE_INCLUDE_TESTS
|
|
#include <boost/test/included/unit_test.hpp>
|
|
#else
|
|
#include <boost/test/unit_test.hpp>
|
|
#endif
|
|
|
|
#include <iostream>
|
|
#include <memory>
|
|
|
|
#include "test_helpers.hpp"
|
|
#include "test_common.hpp"
|
|
|
|
using namespace boost;
|
|
using namespace boost::lockfree;
|
|
using namespace std;
|
|
|
|
#ifndef BOOST_LOCKFREE_STRESS_TEST
|
|
static const boost::uint32_t nodes_per_thread = 100000;
|
|
#else
|
|
static const boost::uint32_t nodes_per_thread = 100000000;
|
|
#endif
|
|
|
|
struct spsc_queue_tester
|
|
{
|
|
spsc_queue<int, capacity<128> > sf;
|
|
|
|
boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
|
|
|
|
// In VxWorks one RTP just supports 65535 objects
|
|
#ifndef __VXWORKS__
|
|
static_hashed_set<int, 1<<16 > working_set;
|
|
#else
|
|
static_hashed_set<int, 1<<15 > working_set;
|
|
#endif
|
|
|
|
spsc_queue_tester(void):
|
|
spsc_queue_cnt(0), received_nodes(0)
|
|
{}
|
|
|
|
void add(void)
|
|
{
|
|
for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
|
|
int id = generate_id<int>();
|
|
working_set.insert(id);
|
|
|
|
while (sf.push(id) == false)
|
|
{}
|
|
|
|
++spsc_queue_cnt;
|
|
}
|
|
running = false;
|
|
}
|
|
|
|
bool get_element(void)
|
|
{
|
|
int data;
|
|
bool success = sf.pop(data);
|
|
|
|
if (success) {
|
|
++received_nodes;
|
|
--spsc_queue_cnt;
|
|
bool erased = working_set.erase(data);
|
|
assert(erased);
|
|
return true;
|
|
} else
|
|
return false;
|
|
}
|
|
|
|
boost::lockfree::detail::atomic<bool> running;
|
|
|
|
void get(void)
|
|
{
|
|
for(;;) {
|
|
bool success = get_element();
|
|
if (!running && !success)
|
|
break;
|
|
}
|
|
|
|
while ( get_element() );
|
|
}
|
|
|
|
void run(void)
|
|
{
|
|
running = true;
|
|
|
|
BOOST_REQUIRE(sf.empty());
|
|
|
|
boost::thread reader(boost::bind(&spsc_queue_tester::get, this));
|
|
boost::thread writer(boost::bind(&spsc_queue_tester::add, this));
|
|
cout << "reader and writer threads created" << endl;
|
|
|
|
writer.join();
|
|
cout << "writer threads joined. waiting for readers to finish" << endl;
|
|
|
|
reader.join();
|
|
|
|
BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
|
|
BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
|
|
BOOST_REQUIRE(sf.empty());
|
|
BOOST_REQUIRE(working_set.count_nodes() == 0);
|
|
}
|
|
};
|
|
|
|
BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
|
|
{
|
|
boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
|
|
test1->run();
|
|
}
|
|
|
|
struct spsc_queue_tester_buffering
|
|
{
|
|
spsc_queue<int, capacity<128> > sf;
|
|
|
|
boost::lockfree::detail::atomic<long> spsc_queue_cnt;
|
|
|
|
// In VxWorks one RTP just supports 65535 objects
|
|
#ifndef __VXWORKS__
|
|
static_hashed_set<int, 1<<16 > working_set;
|
|
#else
|
|
static_hashed_set<int, 1<<15 > working_set;
|
|
#endif
|
|
|
|
boost::lockfree::detail::atomic<size_t> received_nodes;
|
|
|
|
spsc_queue_tester_buffering(void):
|
|
spsc_queue_cnt(0), received_nodes(0)
|
|
{}
|
|
|
|
static const size_t buf_size = 5;
|
|
|
|
void add(void)
|
|
{
|
|
boost::array<int, buf_size> input_buffer;
|
|
for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
|
|
for (size_t i = 0; i != buf_size; ++i) {
|
|
int id = generate_id<int>();
|
|
working_set.insert(id);
|
|
input_buffer[i] = id;
|
|
}
|
|
|
|
size_t pushed = 0;
|
|
|
|
do {
|
|
pushed += sf.push(input_buffer.c_array() + pushed,
|
|
input_buffer.size() - pushed);
|
|
} while (pushed != buf_size);
|
|
|
|
spsc_queue_cnt+=buf_size;
|
|
}
|
|
running = false;
|
|
}
|
|
|
|
bool get_elements(void)
|
|
{
|
|
boost::array<int, buf_size> output_buffer;
|
|
|
|
size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
|
|
|
|
if (popd) {
|
|
received_nodes += popd;
|
|
spsc_queue_cnt -= popd;
|
|
|
|
for (size_t i = 0; i != popd; ++i) {
|
|
bool erased = working_set.erase(output_buffer[i]);
|
|
assert(erased);
|
|
}
|
|
|
|
return true;
|
|
} else
|
|
return false;
|
|
}
|
|
|
|
boost::lockfree::detail::atomic<bool> running;
|
|
|
|
void get(void)
|
|
{
|
|
for(;;) {
|
|
bool success = get_elements();
|
|
if (!running && !success)
|
|
break;
|
|
}
|
|
|
|
while ( get_elements() );
|
|
}
|
|
|
|
void run(void)
|
|
{
|
|
running = true;
|
|
|
|
boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
|
|
boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
|
|
cout << "reader and writer threads created" << endl;
|
|
|
|
writer.join();
|
|
cout << "writer threads joined. waiting for readers to finish" << endl;
|
|
|
|
reader.join();
|
|
|
|
BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
|
|
BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
|
|
BOOST_REQUIRE(sf.empty());
|
|
BOOST_REQUIRE(working_set.count_nodes() == 0);
|
|
}
|
|
};
|
|
|
|
|
|
BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
|
|
{
|
|
boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
|
|
test1->run();
|
|
}
|
|
|