diff --git a/mcrouter/lib/IOBufUtil.cpp b/mcrouter/lib/IOBufUtil.cpp index adf9c3dd6..3b94b6e80 100644 --- a/mcrouter/lib/IOBufUtil.cpp +++ b/mcrouter/lib/IOBufUtil.cpp @@ -79,5 +79,6 @@ coalesceIovecs(const struct iovec* iov, size_t iovcnt, size_t destCapacity) { } return coalesceSlow(iov, iovcnt, destCapacity); } + } } // facebook::memcache diff --git a/mcrouter/lib/IOBufUtil.h b/mcrouter/lib/IOBufUtil.h index e29dd27e8..11973a6dd 100644 --- a/mcrouter/lib/IOBufUtil.h +++ b/mcrouter/lib/IOBufUtil.h @@ -104,5 +104,38 @@ copyAsString(const folly::IOBuf& source, const uint8_t* begin, size_t size) { */ folly::IOBuf coalesceIovecs(const struct iovec* iov, size_t iovcnt, size_t destCapacity); + +/** + * Trim IOBuf to reference only data from range [posStart, posEnd). + */ +inline void trimIOBufToRange( + folly::IOBuf& buffer, + const char* posStart, + const char* posEnd) { + buffer.trimStart(posStart - reinterpret_cast(buffer.data())); + buffer.trimEnd(buffer.length() - (posEnd - posStart)); +} + + +inline void appendKeyPiece( + const folly::IOBuf& from, + folly::IOBuf& to, + const char* posStart, + const char* posEnd) { + // No need to process empty piece. + if (UNLIKELY(posEnd == posStart)) { + return; + } + + if (LIKELY(to.length() == 0)) { + from.cloneOneInto(to); + trimIOBufToRange(to, posStart, posEnd); + } else { + auto nextPiece = from.cloneOne(); + trimIOBufToRange(*nextPiece, posStart, posEnd); + to.prependChain(std::move(nextPiece)); + } +} + } } // facebook::memcache diff --git a/mcrouter/lib/Makefile.am b/mcrouter/lib/Makefile.am index 9d5f4598b..690f30c6b 100644 --- a/mcrouter/lib/Makefile.am +++ b/mcrouter/lib/Makefile.am @@ -164,6 +164,8 @@ libmcrouter_a_SOURCES = \ network/McAsciiParser-inl.h \ network/McAsciiParser.cpp \ network/McAsciiParser.h \ + network/McBinaryParser.cpp \ + network/McBinaryParser.h \ network/McClientRequestContext-inl.h \ network/McClientRequestContext.cpp \ network/McClientRequestContext.h \ diff --git a/mcrouter/lib/carbon/RequestCommon.h b/mcrouter/lib/carbon/RequestCommon.h index 0143b5a1b..8a7357f7d 100644 --- a/mcrouter/lib/carbon/RequestCommon.h +++ b/mcrouter/lib/carbon/RequestCommon.h @@ -89,6 +89,19 @@ class RequestCommon { fbtraceInfo_ = McFbtraceRef::moveRef(carbonFbtraceInfo); } #endif + /* + bool quiet() const { + return quiet_; + } + bool& quiet() { + return quiet_; + } + bool returnKey() const { + return returnKey_; + } + bool& returnKey() { + return returnKey_; + }*/ /** * Tells whether or not "serializedBuffer()" is dirty, in which case it can't @@ -130,6 +143,10 @@ class RequestCommon { private: static constexpr size_t kTraceIdSize = 11; + // bool quiet_{false}; + // bool returnKey_{false}; +}; + const folly::IOBuf* serializedBuffer_{nullptr}; diff --git a/mcrouter/lib/mc/msg.h b/mcrouter/lib/mc/msg.h index 8ce6e1e6e..73bcdbbb7 100644 --- a/mcrouter/lib/mc/msg.h +++ b/mcrouter/lib/mc/msg.h @@ -406,4 +406,69 @@ const char* mc_req_err_to_string(const mc_req_err_t err); */ const char* mc_res_to_response_string(const mc_res_t result); +typedef enum mc_opcode_e : uint8_t { + mc_opcode_noop = 0x0a, + mc_opcode_set = 0x01, + mc_opcode_setq = 0x11, + mc_opcode_add = 0x02, + mc_opcode_addq = 0x12, + mc_opcode_replace = 0x03, + mc_opcode_replaceq = 0x13, + mc_opcode_append = 0x0e, + mc_opcode_appendq = 0x19, + mc_opcode_prepend = 0x0f, + mc_opcode_prependq = 0x1a, + mc_opcode_get = 0x00, + mc_opcode_getq = 0x09, + mc_opcode_getk = 0x0c, + mc_opcode_getkq = 0x0d, + mc_opcode_delete = 0x04, + mc_opcode_deleteq = 0x14, + mc_opcode_increment = 0x05, + mc_opcode_incrementq = 0x15, + mc_opcode_decrement = 0x06, + mc_opcode_decrementq = 0x16, + mc_opcode_touch = 0x1c, + mc_opcode_gat = 0x1d, + mc_opcode_gatq = 0x1e, + mc_opcode_stat = 0x10, + mc_opcode_version = 0x0b, + mc_opcode_quit = 0x07, + mc_opcode_quitq = 0x17, + mc_opcode_flush = 0x08, + mc_opcode_flushq = 0x18, + // SASL commands + mc_opcode_sasllistmechs = 0x20, + mc_opcode_saslauth = 0x21, + mc_opcode_saslstep = 0x22, + // Range commands + mc_opcode_rset = 0x31, + mc_opcode_rsetq = 0x32, + mc_opcode_rappend = 0x33, + mc_opcode_rappendq = 0x34, + mc_opcode_rprepend = 0x35, + mc_opcode_rprependq = 0x36, + mc_opcode_rget = 0x30, + mc_opcode_rdelete = 0x37, + mc_opcode_rdeleteq = 0x38, + mc_opcode_rincr = 0x39, + mc_opcode_rincrq = 0x3a, + mc_opcode_rdecr = 0x3b, + mc_opcode_rdecrq = 0x3c, + // v1.6 proposed commands + mc_opcode_setvbucket = 0x3d, + mc_opcode_tapvbucketset = 0x45, + mc_opcode_getvbucket = 0x3e, + mc_opcode_tapdelete = 0x42, + mc_opcode_verbosity = 0x1b, + mc_opcode_tapflush = 0x43, + mc_opcode_delvbucket = 0x3f, + mc_opcode_tapconnect = 0x40, + mc_opcode_tapmutation = 0x41, + mc_opcode_tapopaque = 0x44, + mc_opcode_tapcheckpointstart = 0x46, + mc_opcode_tapcheckpointend = 0x47, +} mc_opcode_t; + + __END_DECLS diff --git a/mcrouter/lib/network/ClientMcParser-inl.h b/mcrouter/lib/network/ClientMcParser-inl.h index d31b15fff..cf1ff9d47 100644 --- a/mcrouter/lib/network/ClientMcParser-inl.h +++ b/mcrouter/lib/network/ClientMcParser-inl.h @@ -275,6 +275,11 @@ void ClientMcParser::handleAscii(folly::IOBuf& readBuffer) { } } +template +void ClientMcParser::handleBinary(folly::IOBuf& readBuffer) { + LOG(ERROR) << "handleBinary() not available to client parser"; +} + template void ClientMcParser::parseError( mc_res_t result, diff --git a/mcrouter/lib/network/ClientMcParser.h b/mcrouter/lib/network/ClientMcParser.h index 360506463..e2093879e 100644 --- a/mcrouter/lib/network/ClientMcParser.h +++ b/mcrouter/lib/network/ClientMcParser.h @@ -113,6 +113,7 @@ class ClientMcParser : private McParser::ParserCallback { const UmbrellaMessageInfo& headerInfo, const folly::IOBuf& buffer) final; void handleAscii(folly::IOBuf& readBuffer) final; + void handleBinary(folly::IOBuf& readBuffer) final; void parseError(mc_res_t result, folly::StringPiece reason) final; bool shouldReadToAsciiBuffer() const; diff --git a/mcrouter/lib/network/McAsciiParser-inl.h b/mcrouter/lib/network/McAsciiParser-inl.h index ca1fae35d..ca7e61e25 100644 --- a/mcrouter/lib/network/McAsciiParser-inl.h +++ b/mcrouter/lib/network/McAsciiParser-inl.h @@ -8,6 +8,7 @@ #include #include "mcrouter/lib/fbi/cpp/util.h" +#include "mcrouter/lib/IOBufUtil.h" namespace facebook { namespace memcache { @@ -136,40 +137,6 @@ void McClientAsciiParser::initializeReplyParser() { typeid(Request).name()); } -/** - * Append piece of IOBuf in range [posStart, posEnd) to destination IOBuf. - */ -inline void McAsciiParserBase::appendKeyPiece( - const folly::IOBuf& from, - folly::IOBuf& to, - const char* posStart, - const char* posEnd) { - // No need to process empty piece. - if (UNLIKELY(posEnd == posStart)) { - return; - } - - if (LIKELY(to.length() == 0)) { - from.cloneOneInto(to); - trimIOBufToRange(to, posStart, posEnd); - } else { - auto nextPiece = from.cloneOne(); - trimIOBufToRange(*nextPiece, posStart, posEnd); - to.prependChain(std::move(nextPiece)); - } -} - -/** - * Trim IOBuf to reference only data from range [posStart, posEnd). - */ -inline void McAsciiParserBase::trimIOBufToRange( - folly::IOBuf& buffer, - const char* posStart, - const char* posEnd) { - buffer.trimStart(posStart - reinterpret_cast(buffer.data())); - buffer.trimEnd(buffer.length() - (posEnd - posStart)); -} - template McServerAsciiParser::McServerAsciiParser(Callback& callback) : callback_( diff --git a/mcrouter/lib/network/McAsciiParser.h b/mcrouter/lib/network/McAsciiParser.h index 516745214..98c7666d3 100644 --- a/mcrouter/lib/network/McAsciiParser.h +++ b/mcrouter/lib/network/McAsciiParser.h @@ -72,16 +72,6 @@ class McAsciiParserBase { bool readValue(folly::IOBuf& buffer, folly::IOBuf& to); bool readValue(folly::IOBuf& buffer, folly::Optional& to); - static void appendKeyPiece( - const folly::IOBuf& from, - folly::IOBuf& to, - const char* posStart, - const char* posEnd); - static void trimIOBufToRange( - folly::IOBuf& buffer, - const char* posStart, - const char* posEnd); - std::string currentErrorDescription_; uint64_t currentUInt_{0}; @@ -177,6 +167,9 @@ class McClientAsciiParser : public McAsciiParserBase { namespace detail { template class CallbackBase; + +template +class CallbackWrapper; } // detail class McServerAsciiParser : public McAsciiParserBase { diff --git a/mcrouter/lib/network/McAsciiParser.rl b/mcrouter/lib/network/McAsciiParser.rl index 552299fdf..669249092 100644 --- a/mcrouter/lib/network/McAsciiParser.rl +++ b/mcrouter/lib/network/McAsciiParser.rl @@ -10,6 +10,7 @@ #include "mcrouter/lib/mc/msg.h" #include "mcrouter/lib/McOperation.h" #include "mcrouter/lib/network/gen/Memcache.h" +#include "mcrouter/lib/IOBufUtil.h" namespace facebook { namespace memcache { diff --git a/mcrouter/lib/network/McBinaryParser.cpp b/mcrouter/lib/network/McBinaryParser.cpp new file mode 100644 index 000000000..a53804ba1 --- /dev/null +++ b/mcrouter/lib/network/McBinaryParser.cpp @@ -0,0 +1,304 @@ +/* + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#include "McBinaryParser.h" + +#include + +#include "mcrouter/lib/fbi/cpp/LogFailure.h" +#include "mcrouter/lib/IOBufUtil.h" + +namespace facebook { +namespace memcache { + +McServerBinaryParser::State McServerBinaryParser::consume(folly::IOBuf& buffer) { + assert(state_ != State::ERROR); + assert(state_ != State::COMPLETE); + + uint64_t avaiBytes = buffer.length(); + const char *p_ = reinterpret_cast(buffer.data()); + + if (state_ == State::UNINIT) { + header_ = nullptr; + sectionLength_ = HeaderLength; + sectionStart_ = p_; + state_ = State::PARTIAL_HEADER; + } else { + while (state_ != State::ERROR && state_ != State::COMPLETE + && avaiBytes >= sectionLength_) { + switch (state_) { + case State::PARTIAL_HEADER: + if (!parseHeader(p_)) { + state_ = State::ERROR; + } else { + sectionStart_ += sectionLength_; + sectionLength_ = getExtrasLength(); + state_ = State::PARTIAL_EXTRA; + } + break; + + case State::PARTIAL_EXTRA: + appendKeyPiece( + buffer, currentValue_, + sectionStart_, sectionStart_ + sectionLength_); + sectionStart_ += sectionLength_; + sectionLength_ = getKeyLength(); + state_ = State::PARTIAL_KEY; + break; + + case State::PARTIAL_KEY: + appendKeyPiece( + buffer, currentValue_, + sectionStart_, sectionStart_ + sectionLength_); + sectionStart_ += sectionLength_; + sectionLength_ = getValueLength(); + state_ = State::PARTIAL_VALUE; + break; + + case State::PARTIAL_VALUE: + appendKeyPiece( + buffer, currentValue_, + sectionStart_, sectionStart_ + sectionLength_); + sectionStart_ += sectionLength_; + state_ = State::COMPLETE; + (this->*consumer_)(); + break; + + default: + CHECK(false); + } + } + } + buffer.trimStart(sectionStart_ - p_); + + return state_; +} + +bool McServerBinaryParser::parseHeader(const char * bytes) { + header_ = reinterpret_cast(bytes); + + if (getMagic() != 0x80 || getDataType() != 0x00) { + return false; + } + + // TODO validate command constraint (i.e. no extras, no value) + switch (getOpCode()) { + case mc_opcode_set: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_setq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_add: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_addq + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_replace: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_replaceq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeSetLike; + return true; + case mc_opcode_append: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeAppendLike; + return true; + case mc_opcode_appendq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeAppendLike; + return true; + case mc_opcode_prepend: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeAppendLike; + return true; + case mc_opcode_prependq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeAppendLike; + return true; + case mc_opcode_get: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_getq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_getk: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_getkq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_delete: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_deleteq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_increment: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeArithLike; + return true; + case mc_opcode_incrementq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeArithLike; + return true; + case mc_opcode_decrement: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeArithLike; + return true; + case mc_opcode_decrementq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeArithLike; + return true; + case mc_opcode_touch: + case mc_opcode_gat: + case mc_opcode_gatq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_stat: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeGetLike; + return true; + case mc_opcode_version: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeVersion; + return true; + case mc_opcode_quit: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeQuit; + return true; + case mc_opcode_quitq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeQuit; + return true; + case mc_opcode_flush: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeFlush; + return true; + case mc_opcode_flushq: + currentMessage_.emplace(); + consumer_ = &McServerBinaryParser::consumeFlush; + return true; + case mc_opcode_noop: + // SASL commands + case mc_opcode_sasllistmechs: + case mc_opcode_saslauth: + case mc_opcode_saslstep: + // Range commands + case mc_opcode_rset: + case mc_opcode_rsetq: + case mc_opcode_rappend: + case mc_opcode_rappendq: + case mc_opcode_rprepend: + case mc_opcode_rprependq: + case mc_opcode_rget: + case mc_opcode_rdelete: + case mc_opcode_rdeleteq: + case mc_opcode_rincr: + case mc_opcode_rincrq: + case mc_opcode_rdecr: + case mc_opcode_rdecrq: + // v1.6 proposed commands + case mc_opcode_setvbucket: + case mc_opcode_tapvbucketset: + case mc_opcode_getvbucket: + case mc_opcode_tapdelete: + case mc_opcode_verosity: + case mc_opcode_tapflush: + case mc_opcode_delvbucket: + case mc_opcode_tapconnect: + case mc_opcode_tapmutation: + case mc_opcode_tapopaque: + case mc_opcode_tapcheckpointstart: + case mc_opcode_tapcheckpointend:: + default: + return false; + } +} + +template +void McServerBinaryParser::consumeSetLike() { + auto extras = reinterpret_cast(currentExtras_.data()); + auto& message = currentMessage_.get(); + message.key() = std::move(currentKey_); + message.exptime() = ntohl(extras->exptime); + // message.quiet() = quiet; + callback_->onRequest(std::move(message)); +} + +template +void McServerBinaryParser::consumeAppendLike() { + auto& message = currentMessage_.get(); + message.key() = std::move(currentKey_); + message.value() = std::move(currentValue_); + // message.quiet() = quiet; + callback_->onRequest(std::move(message)); +} + +template +void McServerBinaryParser::consumeGetLike() { + auto& message = currentMessage_.get(); + message.key() = std::move(currentKey_); + // message.quiet() = quiet; + // message.returnKey() = returnKey; + callback_->onRequest(std::move(message)); +} + +template +void McServerBinaryParser::consumeArithLike() { + auto extras = reinterpret_cast(currentExtras_.data()); + auto& message = currentMessage_.get(); + message.key() = std::move(currentKey_); + message.delta() = ntohl(extras->delta); + // These fields are for binary protocol only, we cannot forward them to + // upstream servers because we use the ASCII protocol for upstreams + // message.initialValue() = ntohl(extras->initialValue); + // message.exptime() = ntohl(extras->exptime); + // message.quiet() = quiet; + callback_->onRequest(std::move(message)); +} + +template +void McServerBinaryParser::consumeQuit() { + auto& message = currentMessage_.get(); + // message.quiet() = quiet; + callback_->onRequest(std::move(message)); +} + +void McServerBinaryParser::consumeVersion() { + auto& message = currentMessage_.get(); + callback_->onRequest(std::move(message)); +} + +template +void McServerBinaryParser::consumeFlush() { + // auto extras = reinterpret_cast(currentExtras_.data()); + auto& message = currentMessage_.get(); + // Binary protocol only fields + // message.exptime() = ntohl(extras->exptime); + // message.quiet() = quiet; + callback_->onRequest(std::move(message)); +} + +} +} // facebook::memcache diff --git a/mcrouter/lib/network/McBinaryParser.h b/mcrouter/lib/network/McBinaryParser.h new file mode 100644 index 000000000..5a621944c --- /dev/null +++ b/mcrouter/lib/network/McBinaryParser.h @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ +#pragma once + +#include +#include + +#include +#include + +#include "mcrouter/lib/Operation.h" +#include "mcrouter/lib/carbon/Variant.h" +#include "mcrouter/lib/fbi/cpp/TypeList.h" +#include "mcrouter/lib/network/CarbonMessageList.h" +#include "mcrouter/lib/network/gen/Memcache.h" +#include "mcrouter/lib/network/McAsciiParser.h" + +namespace facebook { +namespace memcache { + +namespace detail { +template +class CallbackBase; +} // detail + +class McServerBinaryParser { + public: + enum class State { + // The parser is not initialized to parse any messages. + UNINIT, + // Have partial message, and need more data to complete it. + PARTIAL_HEADER, + PARTIAL_EXTRA, + PARTIAL_KEY, + PARTIAL_VALUE, + // There was an error on the protocol level. + ERROR, + // Complete message had been parsed and ready to be returned. + COMPLETE, + }; + + McServerBinaryParser() = default; + + McServerBinaryParser(const McServerBinaryParser&) = delete; + McServerBinaryParser& operator=(const McServerBinaryParser&) = delete; + + template + McServerBinaryParser(Callback& callback) + : callback_( + std::make_unique>( + callback)) {} + + State getCurrentState() const noexcept { + return state_; + } + + State consume(folly::IOBuf& buffer); + std::unique_ptr> callback_; + + protected: + + std::string currentErrorDescription_; + + uint64_t currentUInt_{0}; + + folly::IOBuf* currentIOBuf_{nullptr}; + size_t remainingIOBufLength_{0}; + State state_{State::UNINIT}; + bool negative_{false}; + + const char *sectionStart_; + uint64_t sectionLength_; + + uint32_t getMagic() { + return header_->magic; + } + uint8_t getOpCode() { + return header_->opCode; + } + uint16_t getKeyLength() { + return ntohs(header_->keyLen); + } + uint8_t getExtrasLength() { + return header_->extrasLen; + } + uint16_t getValueLength() { + return getTotalBodyLength() - getKeyLength() - getExtrasLength(); + } + uint8_t getDataType() { + return header_->dataType; + } + uint16_t getVBucketId() { + return ntohs(header_->vBucketId); + } + uint64_t getTotalBodyLength() { + return ntohl(header_->totalBodyLen); + } + uint32_t getOpaque() { + return ntohl(header_->opaque); + } + uint32_t getCAS() { + return ntohl(header_->cas); + } + + static constexpr uint64_t HeaderLength = 24; + + typedef struct RequestHeader { + uint8_t magic; + uint8_t opCode; + uint16_t keyLen; + uint8_t extrasLen; + uint8_t dataType; + uint16_t vBucketId; + uint32_t totalBodyLen; + uint32_t opaque; + uint32_t cas; + } __attribute__((__packed__)) RequestHeader_t; + + typedef struct SetExtras { + uint32_t flags; + uint32_t exptime; + } __attribute__((__packed__)) SetExtras_t; + + typedef struct ArithExtras { + uint32_t delta; + uint32_t initialValue; + uint32_t exptime; + } __attribute__((__packed__)) ArithExtras_t; + + typedef struct TouchExtras { + uint32_t exptime; + } __attribute__((__packed__)) TouchExtras_t; + + typedef struct FlushExtras { + uint32_t exptime; + } __attribute__((__packed__)) FlushExtras_t; + + bool parseHeader(const char* bytes); + + template + void consumeSetLike(); + + template + void consumeAppendLike(); + + template + void consumeGetLike(); + + template + void consumeArithLike(); + + template + void consumeQuit(); + + void consumeVersion(); + + template + void consumeFlush(); + + // Network byte-ordered fields + const RequestHeader_t *header_; + + folly::IOBuf currentExtras_; + folly::IOBuf currentKey_; + folly::IOBuf currentValue_; + + using ConsumerFunPtr = void (McServerBinaryParser::*)(); + ConsumerFunPtr consumer_{nullptr}; + + using RequestVariant = carbon::makeVariantFromList; + RequestVariant currentMessage_; +}; + +} +} // facebook::memcache diff --git a/mcrouter/lib/network/McParser.cpp b/mcrouter/lib/network/McParser.cpp index e6c3008f0..f0c8fb8ae 100644 --- a/mcrouter/lib/network/McParser.cpp +++ b/mcrouter/lib/network/McParser.cpp @@ -232,6 +232,10 @@ bool McParser::readDataAvailable(size_t len) { callback_.handleAscii(readBuffer_); return true; } + if (protocol_ == mc_binary_protocol) { + callback_.handleBinary(readBuffer_); + return true; + } return readUmbrellaOrCaretData(); } diff --git a/mcrouter/lib/network/McParser.h b/mcrouter/lib/network/McParser.h index 68914516b..41bc30bf2 100644 --- a/mcrouter/lib/network/McParser.h +++ b/mcrouter/lib/network/McParser.h @@ -25,6 +25,9 @@ inline mc_protocol_t determineProtocol(uint8_t firstByte) { return mc_caret_protocol; case ENTRY_LIST_MAGIC_BYTE: return mc_umbrella_protocol_DONOTUSE; + case 0x80: + case 0x81: + return mc_binary_protocol; default: return mc_ascii_protocol; } @@ -70,6 +73,14 @@ class McParser { */ virtual void handleAscii(folly::IOBuf& readBuffer) = 0; + /** + * Handle binary data read. + * The user is responsible for clearing or advancing the readBuffer. + * + * @param readBuffer buffer with newly read data that needs to be parsed. + */ + virtual void handleBinary(folly::IOBuf& readBuffer) = 0; + /** * Called on fatal parse error (the stream should normally be closed) */ diff --git a/mcrouter/lib/network/ServerMcParser-inl.h b/mcrouter/lib/network/ServerMcParser-inl.h index a9fe450e6..72e124096 100644 --- a/mcrouter/lib/network/ServerMcParser-inl.h +++ b/mcrouter/lib/network/ServerMcParser-inl.h @@ -136,6 +136,26 @@ void ServerMcParser::handleAscii(folly::IOBuf& readBuffer) { } } +template +void ServerMcParser::handleBinary(folly::IOBuf& readBuffer) { + if (UNLIKELY(parser_.protocol() != mc_binary_protocol)) { + std::string reason(folly::sformat( + "Expected {} protocol, but received binary!", + mc_protocol_to_string(parser_.protocol()))); + callback_.parseError(mc_res_local_error, reason); + return; + } + + // Note: McParser never chains IOBufs. + auto result = binaryParser_.consume(readBuffer); + + if (result == McServerBinaryParser::State::ERROR) { + // Note: we could include actual parsing error instead of + // "malformed request" (e.g. asciiParser_.getErrorDescription()). + callback_.parseError(mc_res_client_error, "malformed request"); + } +} + template void ServerMcParser::parseError( mc_res_t result, diff --git a/mcrouter/lib/network/ServerMcParser.h b/mcrouter/lib/network/ServerMcParser.h index 03e384995..ba54a47e4 100644 --- a/mcrouter/lib/network/ServerMcParser.h +++ b/mcrouter/lib/network/ServerMcParser.h @@ -9,6 +9,7 @@ #include "mcrouter/lib/network/AsciiSerialized.h" #include "mcrouter/lib/network/McAsciiParser.h" +#include "mcrouter/lib/network/McBinaryParser.h" #include "mcrouter/lib/network/McParser.h" namespace facebook { @@ -62,6 +63,7 @@ class ServerMcParser : private McParser::ParserCallback { private: McParser parser_; McServerAsciiParser asciiParser_; + McServerBinaryParser binaryParser_; Callback& callback_; @@ -81,6 +83,7 @@ class ServerMcParser : private McParser::ParserCallback { const UmbrellaMessageInfo& headerInfo, const folly::IOBuf& buffer) final; void handleAscii(folly::IOBuf& readBuffer) final; + void handleBinary(folly::IOBuf& readBuffer) final; void parseError(mc_res_t result, folly::StringPiece reason) final; bool shouldReadToAsciiBuffer() const; diff --git a/mcrouter/lib/network/WriteBuffer-inl.h b/mcrouter/lib/network/WriteBuffer-inl.h index 5031e666a..bbc026fca 100644 --- a/mcrouter/lib/network/WriteBuffer-inl.h +++ b/mcrouter/lib/network/WriteBuffer-inl.h @@ -37,12 +37,16 @@ WriteBuffer::prepareTyped( typeId_ = static_cast(Reply::typeId); // The current congestion control only supports mc_caret_protocol. - // May extend to other protocals in the future. + // May extend to other protocols in the future. switch (protocol_) { case mc_ascii_protocol: return asciiReply_.prepare( std::move(reply), ctx_->asciiKey(), iovsBegin_, iovsCount_); + case mc_binary_protocol: + return binaryReply_.prepare( + std::move(reply), ctx_->asciiKey(), iovsBegin_, iovsCount_); + case mc_umbrella_protocol_DONOTUSE: return umbrellaReply_.prepare( std::move(reply), ctx_->reqid_, iovsBegin_, iovsCount_); diff --git a/mcrouter/lib/network/WriteBuffer.cpp b/mcrouter/lib/network/WriteBuffer.cpp index f9e0f054e..92356b706 100644 --- a/mcrouter/lib/network/WriteBuffer.cpp +++ b/mcrouter/lib/network/WriteBuffer.cpp @@ -22,6 +22,10 @@ WriteBuffer::WriteBuffer(mc_protocol_t protocol) : protocol_(protocol) { new (&asciiReply_) AsciiSerializedReply; break; + case mc_binary_protocol: + new (&binaryReply_) BinarySerializedReply; + break; + case mc_umbrella_protocol_DONOTUSE: new (&umbrellaReply_) UmbrellaSerializedMessage; break; @@ -41,6 +45,10 @@ WriteBuffer::~WriteBuffer() { asciiReply_.~AsciiSerializedReply(); break; + case mc_binary_protocol: + binaryReply_.~BinarySerializedReply(); + break; + case mc_umbrella_protocol_DONOTUSE: umbrellaReply_.~UmbrellaSerializedMessage(); break; @@ -65,6 +73,10 @@ void WriteBuffer::clear() { asciiReply_.clear(); break; + case mc_binary_protocol: + binaryReply_.clear(); + break; + case mc_umbrella_protocol_DONOTUSE: umbrellaReply_.clear(); break; diff --git a/mcrouter/lib/network/WriteBuffer.h b/mcrouter/lib/network/WriteBuffer.h index e9a3bec06..681bc865d 100644 --- a/mcrouter/lib/network/WriteBuffer.h +++ b/mcrouter/lib/network/WriteBuffer.h @@ -15,6 +15,7 @@ #include "mcrouter/lib/mc/protocol.h" #include "mcrouter/lib/mc/umbrella.h" #include "mcrouter/lib/network/AsciiSerialized.h" +#include "mcrouter/lib/network/BinarySerialized.h" #include "mcrouter/lib/network/CaretSerializedMessage.h" #include "mcrouter/lib/network/McServerRequestContext.h" #include "mcrouter/lib/network/UmbrellaProtocol.h" @@ -118,6 +119,7 @@ class WriteBuffer { /* Write buffers */ union { AsciiSerializedReply asciiReply_; + BinarySerializedReply binaryeply_; UmbrellaSerializedMessage umbrellaReply_; CaretSerializedMessage caretReply_; }; diff --git a/mcrouter/lib/network/gen/CommonMessages.h b/mcrouter/lib/network/gen/CommonMessages.h index b7e6bdbee..de9bd1ca6 100644 --- a/mcrouter/lib/network/gen/CommonMessages.h +++ b/mcrouter/lib/network/gen/CommonMessages.h @@ -79,7 +79,7 @@ class McVersionRequest : public carbon::RequestCommon { void visitFields(V&& v) const; private: - carbon::Keys key_; + carbon::Keys key_; // TODO: Why? }; class McVersionReply : public carbon::ReplyCommon { diff --git a/mcrouter/lib/network/gen/MemcacheMessages.h b/mcrouter/lib/network/gen/MemcacheMessages.h index b0cdcacfc..e47a877a4 100644 --- a/mcrouter/lib/network/gen/MemcacheMessages.h +++ b/mcrouter/lib/network/gen/MemcacheMessages.h @@ -149,7 +149,7 @@ class McGetReply : public carbon::ReplyCommon { private: folly::Optional value_; - uint64_t flags_{0}; + uint64_t flags_{0}; // FIXME Shouldn't this be 32 bits? std::string message_; carbon::Result result_{mc_res_unknown}; int16_t appSpecificErrorCode_{0};