/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef _THRIFT_PROCESSOR_TEST_HANDLERS_H_ #define _THRIFT_PROCESSOR_TEST_HANDLERS_H_ 1 #include "EventLog.h" #include "gen-cpp/ParentService.h" #include "gen-cpp/ChildService.h" namespace apache { namespace thrift { namespace test { class ParentHandler : virtual public ParentServiceIf { public: ParentHandler(const std::shared_ptr& log) : triggerMonitor(&mutex_), generation_(0), wait_(false), log_(log) {} int32_t incrementGeneration() override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_INCREMENT_GENERATION, 0, 0); return ++generation_; } int32_t getGeneration() override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_GET_GENERATION, 0, 0); return generation_; } void addString(const std::string& s) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_ADD_STRING, 0, 0); strings_.push_back(s); } void getStrings(std::vector& _return) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_GET_STRINGS, 0, 0); _return = strings_; } void getDataWait(std::string& _return, const int32_t length) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_GET_DATA_WAIT, 0, 0); blockUntilTriggered(); _return.append(length, 'a'); } void onewayWait() override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_ONEWAY_WAIT, 0, 0); blockUntilTriggered(); } void exceptionWait(const std::string& message) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_EXCEPTION_WAIT, 0, 0); blockUntilTriggered(); MyError e; e.message = message; throw e; } void unexpectedExceptionWait(const std::string& message) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, 0, 0); blockUntilTriggered(); MyError e; e.message = message; throw e; } /** * After prepareTriggeredCall() is invoked, calls to any of the *Wait() * functions won't return until triggerPendingCalls() is invoked * * This has to be a separate function invoked by the main test thread * in order to to avoid race conditions. */ void prepareTriggeredCall() { concurrency::Guard g(mutex_); wait_ = true; } /** * Wake up all calls waiting in blockUntilTriggered() */ void triggerPendingCalls() { concurrency::Guard g(mutex_); wait_ = false; triggerMonitor.notifyAll(); } protected: /** * blockUntilTriggered() won't return until triggerPendingCalls() is invoked * in another thread. * * This should only be called when already holding mutex_. */ void blockUntilTriggered() { while (wait_) { triggerMonitor.waitForever(); } // Log an event when we return log_->append(EventLog::ET_WAIT_RETURN, 0, 0); } concurrency::Mutex mutex_; concurrency::Monitor triggerMonitor; int32_t generation_; bool wait_; std::vector strings_; std::shared_ptr log_; }; #ifdef _MSC_VER #pragma warning( push ) #pragma warning (disable : 4250 ) //inheriting methods via dominance #endif class ChildHandler : public ParentHandler, virtual public ChildServiceIf { public: ChildHandler(const std::shared_ptr& log) : ParentHandler(log), value_(0) {} int32_t setValue(const int32_t value) override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_SET_VALUE, 0, 0); int32_t oldValue = value_; value_ = value; return oldValue; } int32_t getValue() override { concurrency::Guard g(mutex_); log_->append(EventLog::ET_CALL_GET_VALUE, 0, 0); return value_; } protected: int32_t value_; }; #ifdef _MSC_VER #pragma warning( pop ) #endif struct ConnContext { public: ConnContext(std::shared_ptr in, std::shared_ptr out, uint32_t id) : input(in), output(out), id(id) {} std::shared_ptr input; std::shared_ptr output; uint32_t id; }; struct CallContext { public: CallContext(ConnContext* context, uint32_t id, const std::string& name) : connContext(context), name(name), id(id) {} ConnContext* connContext; std::string name; uint32_t id; }; class ServerEventHandler : public server::TServerEventHandler { public: ServerEventHandler(const std::shared_ptr& log) : nextId_(1), log_(log) {} void preServe() override {} void* createContext(std::shared_ptr input, std::shared_ptr output) override { ConnContext* context = new ConnContext(input, output, nextId_); ++nextId_; log_->append(EventLog::ET_CONN_CREATED, context->id, 0); return context; } void deleteContext(void* serverContext, std::shared_ptr input, std::shared_ptr output) override { auto* context = reinterpret_cast(serverContext); if (input != context->input) { abort(); } if (output != context->output) { abort(); } log_->append(EventLog::ET_CONN_DESTROYED, context->id, 0); delete context; } void processContext(void* serverContext, std::shared_ptr transport) override { // TODO: We currently don't test the behavior of the processContext() // calls. The various server implementations call processContext() at // slightly different times, and it is too annoying to try and account for // their various differences. // // TThreadedServer, TThreadPoolServer, and TSimpleServer usually wait until // they see the first byte of a request before calling processContext(). // However, they don't wait for the first byte of the very first request, // and instead immediately call processContext() before any data is // received. // // TNonblockingServer always waits until receiving the full request before // calling processContext(). #if 0 ConnContext* context = reinterpret_cast(serverContext); log_->append(EventLog::ET_PROCESS, context->id, 0); #else THRIFT_UNUSED_VARIABLE(serverContext); THRIFT_UNUSED_VARIABLE(transport); #endif } protected: uint32_t nextId_; std::shared_ptr log_; }; class ProcessorEventHandler : public TProcessorEventHandler { public: ProcessorEventHandler(const std::shared_ptr& log) : nextId_(1), log_(log) {} void* getContext(const char* fnName, void* serverContext) override { auto* connContext = reinterpret_cast(serverContext); CallContext* context = new CallContext(connContext, nextId_, fnName); ++nextId_; log_->append(EventLog::ET_CALL_STARTED, connContext->id, context->id, fnName); return context; } void freeContext(void* ctx, const char* fnName) override { auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_CALL_FINISHED, context->connContext->id, context->id, fnName); delete context; } void preRead(void* ctx, const char* fnName) override { auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_PRE_READ, context->connContext->id, context->id, fnName); } void postRead(void* ctx, const char* fnName, uint32_t bytes) override { THRIFT_UNUSED_VARIABLE(bytes); auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_POST_READ, context->connContext->id, context->id, fnName); } void preWrite(void* ctx, const char* fnName) override { auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_PRE_WRITE, context->connContext->id, context->id, fnName); } void postWrite(void* ctx, const char* fnName, uint32_t bytes) override { THRIFT_UNUSED_VARIABLE(bytes); auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_POST_WRITE, context->connContext->id, context->id, fnName); } void asyncComplete(void* ctx, const char* fnName) override { auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_ASYNC_COMPLETE, context->connContext->id, context->id, fnName); } void handlerError(void* ctx, const char* fnName) override { auto* context = reinterpret_cast(ctx); checkName(context, fnName); log_->append(EventLog::ET_HANDLER_ERROR, context->connContext->id, context->id, fnName); } protected: void checkName(const CallContext* context, const char* fnName) { // Note: we can't use BOOST_CHECK_EQUAL here, since the handler runs in a // different thread from the test functions. Just abort if the names are // different if (context->name != fnName) { fprintf(stderr, "call context name mismatch: \"%s\" != \"%s\"\n", context->name.c_str(), fnName); fflush(stderr); abort(); } } uint32_t nextId_; std::shared_ptr log_; }; } } } // apache::thrift::test #endif // _THRIFT_PROCESSOR_TEST_HANDLERS_H_