brpc/include/butil/recordio.h
2022-12-14 19:05:52 +08:00

144 lines
4.4 KiB
C++

// recordio - A binary format to transport data from end to end.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Date: Thu Nov 22 13:57:56 CST 2012
#ifndef BUTIL_RECORDIO_H
#define BUTIL_RECORDIO_H
#include "butil/iobuf.h"
#include <memory>
namespace butil {
// 0-or-1 Payload + 0-or-multiple Metas.
// Payload and metas are often serialized form of protobuf messages. As a
// correspondence, the implementation is not optimized for very small blobs,
// which should be batched properly before inserting(e.g. using repeated
// field in pb)
class Record {
public:
struct NamedMeta {
std::string name;
std::shared_ptr<butil::IOBuf> data;
};
// Number of metas. Could be 0.
size_t MetaCount() const { return _metas.size(); }
// Get i-th Meta, out-of-range accesses may crash.
// This method is mainly for iterating all metas.
const NamedMeta& MetaAt(size_t i) const { return _metas[i]; }
// Get meta by |name|. NULL on not found.
const butil::IOBuf* Meta(const char* name) const;
// Returns a mutable pointer to the meta with |name|. If the meta does
// not exist, add it first.
// If |null_on_found| is true and meta with |name| is present, NULL is
// returned. This is useful for detecting uniqueness of meta names in some
// scenarios.
// NOTE: With the assumption that there won't be many metas, the impl.
// tests presence by scaning all fields, which may perform badly when metas
// are a lot.
butil::IOBuf* MutableMeta(const char* name, bool null_on_found = false);
butil::IOBuf* MutableMeta(const std::string& name, bool null_on_found = false);
// Remove meta with the name. The impl. may scan all fields.
// Returns true on erased, false on absent.
bool RemoveMeta(const butil::StringPiece& name);
// Get the payload. Empty by default.
const butil::IOBuf& Payload() const { return _payload; }
// Get a mutable pointer to the payload.
butil::IOBuf* MutablePayload() { return &_payload; }
// Clear payload and remove all meta.
void Clear();
// Serialized size of this record.
size_t ByteSize() const;
private:
butil::IOBuf _payload;
std::vector<NamedMeta> _metas;
};
// Parse records from the IReader, corrupted records will be skipped.
// Example:
// RecordReader rd(...);
// Record rec;
// while (rd.ReadNext(&rec)) {
// // Handle the rec
// }
// if (rd.last_error() != RecordReader::END_OF_READER) {
// LOG(FATAL) << "Critical error occurred";
// }
class RecordReader {
public:
// A special error code to mark end of input data.
static const int END_OF_READER = -1;
explicit RecordReader(IReader* reader);
// Returns true on success and |out| is overwritten by the record.
// False otherwise and last_error() is the error which is treated as permanent.
bool ReadNext(Record* out);
// 0 means no error.
// END_OF_READER means all data in the IReader are successfully consumed.
int last_error() const { return _last_error; }
// Total bytes consumed.
// NOTE: this value may not equal to read bytes from the IReader even if
// the reader runs out, due to parsing errors.
size_t offset() const { return _ncut; }
private:
bool CutUntilNextRecordCandidate();
int CutRecord(Record* rec);
private:
IReader* _reader;
IOPortal _portal;
IOBufCutter _cutter;
size_t _ncut;
int _last_error;
};
// Write records into the IWriter.
class RecordWriter {
public:
explicit RecordWriter(IWriter* writer);
// Serialize |record| into internal buffer and NOT flush into the IWriter.
int WriteWithoutFlush(const Record& record);
// Serialize |record| into internal buffer and flush into the IWriter.
int Write(const Record& record);
// Flush internal buffer into the IWriter.
// Returns 0 on success, error code otherwise.
int Flush();
private:
IOBuf _buf;
IWriter* _writer;
};
} // namespace butil
#endif // BUTIL_RECORDIO_H