diff --git a/.github/workflows/buildci.yml b/.github/workflows/buildci.yml index 38616ae2..4cc6c35d 100644 --- a/.github/workflows/buildci.yml +++ b/.github/workflows/buildci.yml @@ -39,7 +39,7 @@ jobs: run: | sudo rm -rf /home/ubuntu/.cache/bazel - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: install dependency run: | ./bazel/setup_clang.sh /home/ubuntu/clang+llvm-10.0.0-linux-gnu diff --git a/BUILD b/BUILD index 79b94c94..f4ce3eea 100644 --- a/BUILD +++ b/BUILD @@ -13,6 +13,7 @@ envoy_cc_binary( "//src/application_protocols/dubbo:config", "//src/application_protocols/thrift:config", "//src/application_protocols/brpc:config", + "//src/application_protocols/trpc:config", "@io_istio_proxy//extensions/access_log_policy:access_log_policy_lib", "@io_istio_proxy//extensions/metadata_exchange:metadata_exchange_lib", "@io_istio_proxy//extensions/stackdriver:stackdriver_plugin", diff --git a/clang.bazelrc b/clang.bazelrc index ca15b531..5113fbf4 100644 --- a/clang.bazelrc +++ b/clang.bazelrc @@ -1,16 +1,16 @@ # Generated file, do not edit. If you want to disable clang, just delete this file. -build:clang --action_env='PATH=/root/clang+llvm-10.0.0-linux-gnu/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/root/clang+llvm-10.0.0-linux-gnu/bin' +build:clang --action_env='PATH=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin:/home/linuxbrew/.linuxbrew/bin:/home/linuxbrew/.linuxbrew/sbin:/home/ubuntu/clang+llvm-14/bin:/usr/local/go/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/local/go/bin:/home/ubuntu/go/bin/' build:clang --action_env=CC=clang build:clang --action_env=CXX=clang++ -build:clang --action_env='LLVM_CONFIG=/root/clang+llvm-10.0.0-linux-gnu//bin/llvm-config' -build:clang --repo_env='LLVM_CONFIG=/root/clang+llvm-10.0.0-linux-gnu//bin/llvm-config' -build:clang --linkopt='-L/root/clang+llvm-10.0.0-linux-gnu/lib' -build:clang --linkopt='-Wl,-rpath,/root/clang+llvm-10.0.0-linux-gnu/lib' +build:clang --action_env='LLVM_CONFIG=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin/llvm-config' +build:clang --repo_env='LLVM_CONFIG=/home/ubuntu/clang+llvm-10.0.0-linux-gnu/bin/llvm-config' +build:clang --linkopt='-L/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib' +build:clang --linkopt='-Wl,-rpath,/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib' build:clang-asan --action_env=ENVOY_UBSAN_VPTR=1 build:clang-asan --copt=-fsanitize=vptr,function build:clang-asan --linkopt=-fsanitize=vptr,function -build:clang-asan --linkopt='-L/root/clang+llvm-10.0.0-linux-gnu/lib/clang/10.0.0/lib/x86_64-unknown-linux-gnu' +build:clang-asan --linkopt='-L/home/ubuntu/clang+llvm-10.0.0-linux-gnu/lib/clang/10.0.0/lib/x86_64-unknown-linux-gnu' build:clang-asan --linkopt=-l:libclang_rt.ubsan_standalone.a build:clang-asan --linkopt=-l:libclang_rt.ubsan_standalone_cxx.a diff --git a/src/application_protocols/trpc/BUILD b/src/application_protocols/trpc/BUILD new file mode 100644 index 00000000..ccc53565 --- /dev/null +++ b/src/application_protocols/trpc/BUILD @@ -0,0 +1,86 @@ +load( + "@envoy//bazel:envoy_build_system.bzl", + "envoy_cc_library", +) + +# compile proto +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) + +envoy_cc_library( + name = "config", + visibility = ["//:__pkg__"], + repository = "@envoy", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":pkg_cc_proto", + ":codec_lib", + "@envoy//envoy/registry", + "//src/meta_protocol_proxy/codec:factory_lib", + ], +) + +envoy_cc_library( + name = "codec_lib", + repository = "@envoy", + srcs = ["trpc_codec.cc"], + hdrs = ["trpc_codec.h"], + deps = [ + "@envoy//envoy/buffer:buffer_interface", + "@envoy//source/common/common:logger_lib", + "@envoy//source/common/buffer:buffer_lib", + "//src/meta_protocol_proxy/codec:codec_interface", + ":codec_checker", + ":protocol", + ], +) + +envoy_cc_library( + name = "codec_checker", + srcs = [ + "codec_checker.cc", + ], + hdrs = [ + "codec_checker.h", + ], + repository = "@envoy", + deps = [ + ":pkg_cc_proto", + ":protocol", + "@envoy//envoy/buffer:buffer_interface", + "@envoy//envoy/server:filter_config_interface", + "@envoy//source/common/buffer:buffer_lib", + "@envoy//source/common/common:assert_lib", + "@envoy//source/common/common:minimal_logger_lib", + ], +) + +envoy_cc_library( + name = "metadata", + srcs = ["metadata.cc"], + hdrs = ["metadata.h"], + repository = "@envoy", + deps = [ + ":pkg_cc_proto", + "@envoy//source/common/http:header_map_lib", + ], +) + +envoy_cc_library( + name = "protocol", + srcs = ["protocol.cc"], + hdrs = ["protocol.h"], + repository = "@envoy", + deps = [ + ":metadata", + ":pkg_cc_proto", + "@envoy//source/common/buffer:buffer_lib", + "@envoy//source/common/common:minimal_logger_lib", + "//src/meta_protocol_proxy/codec:codec_interface", + ], +) + diff --git a/src/application_protocols/trpc/codec_checker.cc b/src/application_protocols/trpc/codec_checker.cc new file mode 100644 index 00000000..9443bbf4 --- /dev/null +++ b/src/application_protocols/trpc/codec_checker.cc @@ -0,0 +1,135 @@ +#include "src/application_protocols/trpc/codec_checker.h" + +#include +#include +#include + +#include "source/common/common/assert.h" + +#include "src/application_protocols/trpc/protocol.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +CodecChecker::DecodeStage CodecChecker::onData(Buffer::Instance& buffer) { + ENVOY_LOG(debug, "decoder onData: {}", buffer.length()); + + while (decode_stage_ != DecodeStage::kDecodeDone) { + auto state = handleState(buffer); + if (state == DecodeStage::kWaitForData) { + return DecodeStage::kWaitForData; + } + decode_stage_ = state; + } + + ASSERT(decode_stage_ == DecodeStage::kDecodeDone); + + reset(); + ENVOY_LOG(debug, "trpc decoder: data length {}", buffer.length()); + return DecodeStage::kDecodeDone; +} + +CodecChecker::DecodeStage CodecChecker::handleState(Buffer::Instance& buffer) { + switch (decode_stage_) { + case DecodeStage::kDecodeFixedHeader: + return decodeFixedHeader(buffer); + case DecodeStage::kDecodeUnaryProtocolHeader: + return decodeUnaryProtocolHeader(buffer); + case DecodeStage::KDecodeStreamFrame: + return decodeStreamFrame(buffer); + case DecodeStage::kDecodePayload: + return decodePayload(buffer); + default: + PANIC("not reached"); + } + return DecodeStage::kDecodeDone; +} + +CodecChecker::DecodeStage CodecChecker::decodeFixedHeader(Buffer::Instance& buffer) { + ENVOY_LOG(debug, "decoder FixedHeader: {}", buffer.length()); + if (buffer.length() < TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE) { + ENVOY_LOG(debug, "continue {}", buffer.length()); + return DecodeStage::kWaitForData; + } + + std::unique_ptr fixed_header = std::make_unique(); + if (!fixed_header->decode(buffer, false)) { + throw EnvoyException(fmt::format("protocol invalid")); + } + + total_size_ = fixed_header->data_frame_size; + protocol_header_size_ = fixed_header->pb_header_size; + + auto frame_type = fixed_header->stream_frame_type; + call_backs_.onFixedHeaderDecoded(std::move(fixed_header)); + if (frame_type == trpc::TrpcStreamFrameType::TRPC_UNARY) { + return DecodeStage::kDecodeUnaryProtocolHeader; + } + + return DecodeStage::KDecodeStreamFrame; +} + +CodecChecker::DecodeStage CodecChecker::decodeUnaryProtocolHeader(Buffer::Instance& buffer) { + ENVOY_LOG(debug, "decoder ProtocolHeader: {}", buffer.length()); + + // 数据不全,继续收包 + if (buffer.length() < TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE + protocol_header_size_) { + ENVOY_LOG(debug, "continue {}", buffer.length()); + return DecodeStage::kWaitForData; + } + std::string header_raw; + header_raw.reserve(protocol_header_size_); + header_raw.resize(protocol_header_size_); + buffer.copyOut(TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE, protocol_header_size_, &(header_raw[0])); + + if (!call_backs_.onUnaryHeader(std::move(header_raw))) { + throw EnvoyException("parse header failed"); + } + + return DecodeStage::kDecodePayload; +} + +CodecChecker::DecodeStage CodecChecker::decodeStreamFrame(Buffer::Instance& buffer) { + ENVOY_LOG(debug, "decoder stream frame {} ? {}", total_size_, buffer.length()); + + if (buffer.length() < total_size_) { + ENVOY_LOG(debug, "continue {}", buffer.length()); + return DecodeStage::kWaitForData; + } + + std::string header_raw; + auto frame_size = total_size_ - TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE; + header_raw.reserve(frame_size); + header_raw.resize(frame_size); + buffer.copyOut(TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE, frame_size, &(header_raw[0])); + + if (!call_backs_.onStreamFrame(std::move(header_raw))) { + throw EnvoyException("parse header failed"); + } + + return DecodeStage::kDecodePayload; +} + +CodecChecker::DecodeStage CodecChecker::decodePayload(Buffer::Instance& buffer) { + ENVOY_LOG(debug, "decoder payload {} ? {}", total_size_, buffer.length()); + + if (buffer.length() < total_size_) { + ENVOY_LOG(debug, "continue {}", buffer.length()); + return DecodeStage::kWaitForData; + } + std::unique_ptr msg = std::make_unique(); + msg->move(buffer, static_cast(total_size_)); + + call_backs_.onCompleted(std::move(msg)); + + return DecodeStage::kDecodeDone; +} + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/codec_checker.h b/src/application_protocols/trpc/codec_checker.h new file mode 100644 index 00000000..56dd7ac2 --- /dev/null +++ b/src/application_protocols/trpc/codec_checker.h @@ -0,0 +1,141 @@ + +#pragma once + +#include +#include +#include + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/logger.h" +#include "envoy/network/filter.h" +#include "envoy/server/filter_config.h" + +#include "src/application_protocols/trpc/protocol.h" +#include "src/application_protocols/trpc/trpc.pb.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +class CodecCheckerCallBacks { +public: + virtual ~CodecCheckerCallBacks() = default; + + /** + * 当帧头解析成功后,会回调该函数。 + * @param fixed_header_ptr 帧头 + */ + virtual void onFixedHeaderDecoded(std::unique_ptr /*fixed_header_ptr*/) {} + + /** + * 当数据满足包头长度后,会回调该函数,用户需要自己执行pb的反序列化。 + * @param str 存储包头序列化后的二进制数据。 + * @return + */ + virtual bool onUnaryHeader(std::string&& /*str*/) { return true; } + + /** + * 当数据满足流式帧长度后,会回调该函数,用户需要自己执行pb的反序列化。 + * @param str 存储包头序列化后的二进制数据。 + * @return + */ + virtual bool onStreamFrame(std::string&& /*str*/) { return true; } + + /** + * 当一个完整的数据包被接收后,返回整个包的原始数据。 + * @param msg 存储一个完整的原始数据包(包括帧头、包头、包体)。 + */ + virtual void onCompleted(std::unique_ptr /*msg*/) {} +}; + +// 数据包检查类 +class CodecChecker : public Logger::Loggable { +public: + // tRPC协议 = 帧头 + 包头 + 包体 + // kDecodeFixedHeader 解析帧头 + // kDecodeUnaryProtocolHeader 解析一元请求包头 + // KDecodeStreamFrame 解析流式请求 + // kDecodePayload 解析包体 + enum class DecodeStage { + kDecodeFixedHeader, + kDecodeUnaryProtocolHeader, + KDecodeStreamFrame, + kDecodePayload, + kDecodeDone, + kWaitForData + }; + +public: + explicit CodecChecker(CodecCheckerCallBacks& call_backs) : call_backs_(call_backs) {} + ~CodecChecker() = default; + + /** + * 对外提供的接口,如果不是trpc协议,则throw EnvoyException。 + * @param data 输入数据。 + * @return DecodeStage + */ + DecodeStage onData(Buffer::Instance& data); + +private: + /** + * + * @param buffer 输入数据 + * @return + */ + DecodeStage handleState(Buffer::Instance& buffer); + + /** + * 检查帧头。 + * @param buffer 输入数据 + * @return + */ + DecodeStage decodeFixedHeader(Buffer::Instance& buffer); + + /** + * 检查一元调用包头。 + * @param buffer 输入数据 + * @return + */ + DecodeStage decodeUnaryProtocolHeader(Buffer::Instance& buffer); + + /** + * 检查流式调用。 + * @param buffer 输入数据 + * @return + */ + DecodeStage decodeStreamFrame(Buffer::Instance& buffer); + + /** + * 检查包体。 + * @param buffer 输入数据 + * @return + */ + DecodeStage decodePayload(Buffer::Instance& buffer); + + /** + * 重置状态;执行后,才可以进行下一个包的解析。 + */ + void reset() { + decode_stage_ = DecodeStage::kDecodeFixedHeader; + total_size_ = 0; + protocol_header_size_ = 0; + } + +private: + // 状态 + DecodeStage decode_stage_{DecodeStage::kDecodeFixedHeader}; + // 总大小 + uint32_t total_size_{0}; + // 包头大小 + uint16_t protocol_header_size_{0}; + // 回调函数 + CodecCheckerCallBacks& call_backs_; +}; + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/config.cc b/src/application_protocols/trpc/config.cc new file mode 100644 index 00000000..429a272e --- /dev/null +++ b/src/application_protocols/trpc/config.cc @@ -0,0 +1,26 @@ +#include "envoy/registry/registry.h" + +#include "src/meta_protocol_proxy/codec/factory.h" +#include "src/application_protocols/trpc/config.h" +#include "src/application_protocols/trpc/trpc_codec.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +MetaProtocolProxy::CodecPtr TrpcCodecConfig::createCodec(const Protobuf::Message&) { + return std::make_unique(); +}; + +/** + * Static registration for the trpc codec. @see RegisterFactory. + */ +REGISTER_FACTORY(TrpcCodecConfig, MetaProtocolProxy::NamedCodecConfigFactory); + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/config.h b/src/application_protocols/trpc/config.h new file mode 100644 index 00000000..cbfc78be --- /dev/null +++ b/src/application_protocols/trpc/config.h @@ -0,0 +1,24 @@ +#pragma once + +#include "src/meta_protocol_proxy/codec/factory.h" +#include "src/application_protocols/trpc/trpc_codec.pb.h" +#include "src/application_protocols/trpc/trpc_codec.pb.validate.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +class TrpcCodecConfig + : public MetaProtocolProxy::CodecFactoryBase { +public: + TrpcCodecConfig() : CodecFactoryBase("aeraki.meta_protocol.codec.trpc") {} + MetaProtocolProxy::CodecPtr createCodec(const Protobuf::Message& config) override; +}; + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/metadata.cc b/src/application_protocols/trpc/metadata.cc new file mode 100644 index 00000000..7c7a29b5 --- /dev/null +++ b/src/application_protocols/trpc/metadata.cc @@ -0,0 +1,61 @@ + +#include "src/application_protocols/trpc/metadata.h" + +#include + +namespace { + +inline std::string convertCalleeToHost(std::string const& callee) { + // tRPC的被调服务的路由名称如下: + // 规范格式,trpc.应用名.服务名.pb的service名[.接口名] + // 其中接口可选 + // 如果有接口的名话,需要去除,只保留到service + if (std::count(callee.begin(), callee.end(), '.') != 4) { + return callee; + } + + std::string host_copy(callee); + host_copy.resize(host_copy.find_last_of('.')); + return host_copy; // copy elision +} + +constexpr char kUserAgentValues[] = "trpc-proxy"; +constexpr char kProtocolValues[] = "trpc"; +constexpr char kContentTypeValues[] = "application/trpc"; + +} // namespace + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +// convert trpc meta to http headers +void MessageMetadata::buildHttpHeaders() { + http_request_headers = Http::RequestHeaderMapImpl::create(); + http_request_headers->addReference(Http::Headers::get().Method, + Http::Headers::get().MethodValues.Post); + http_request_headers->addReference(Http::Headers::get().ForwardedProto, + Http::Headers::get().SchemeValues.Http); + http_request_headers->addReference(Http::Headers::get().UserAgent, kUserAgentValues); + http_request_headers->addReference(Http::Headers::get().Protocol, kProtocolValues); + http_request_headers->addReference(Http::Headers::get().Scheme, kProtocolValues); + http_request_headers->addReference(Http::Headers::get().ContentType, kContentTypeValues); + + http_request_headers->addReferenceKey(Http::Headers::get().Host, + convertCalleeToHost(request_protocol.callee())); + http_request_headers->addReferenceKey(Http::Headers::get().Path, request_protocol.func()); + http_request_headers->addReferenceKey(Http::Headers::get().RequestId, + request_protocol.request_id()); + + for (auto const& kv : request_protocol.trans_info()) { + http_request_headers->addCopy(Http::LowerCaseString(kv.first), kv.second); + } +} + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/metadata.h b/src/application_protocols/trpc/metadata.h new file mode 100644 index 00000000..47af76a0 --- /dev/null +++ b/src/application_protocols/trpc/metadata.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include + +#include "source/common/http/header_map_impl.h" + +#include "src/application_protocols/trpc/trpc.pb.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +struct MessageMetadata { + uint32_t pkg_size{0}; + // tRPC数据包的包头 + trpc::RequestProtocol request_protocol; + // http头,由tRPC协议转换而来的,用于适配http的RDS。 + Http::RequestHeaderMapPtr http_request_headers; + + std::string service_name() const { return request_protocol.callee(); } + + uint32_t request_id() const { return request_protocol.request_id(); } + + void buildHttpHeaders(); + + Http::RequestHeaderMap const* requestHttpHeaders() { + if (!http_request_headers) { + buildHttpHeaders(); + } + return http_request_headers.get(); + } +}; + +using MessageMetadataSharedPtr = std::shared_ptr; + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/protocol.cc b/src/application_protocols/trpc/protocol.cc new file mode 100644 index 00000000..09724852 --- /dev/null +++ b/src/application_protocols/trpc/protocol.cc @@ -0,0 +1,88 @@ + +#include "src/application_protocols/trpc/protocol.h" + +namespace { + +// 使用函数模板类型推导,防止手写出现类型不匹配 +template +inline void getIntFromInstance(T* t, Envoy::Buffer::Instance& buff, uint64_t pos) { + *t = buff.peekBEInt(pos); +} + +template +inline void writeIntToInstance(T* t, Envoy::Buffer::Instance& buff) { + buff.writeBEInt(*t); +} +} + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +const uint32_t TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE = 16; +const uint32_t TrpcFixedHeader::TRPC_PROTO_MAGIC_SPACE = 2; +const uint32_t TrpcFixedHeader::TRPC_PROTO_DATAFRAME_TYPE_SPACE = 1; +const uint32_t TrpcFixedHeader::TRPC_PROTO_DATAFRAME_STATE_SPACE = 1; +const uint32_t TrpcFixedHeader::TRPC_PROTO_DATAFRAME_SIZE_SPACE = 4; +const uint32_t TrpcFixedHeader::TRPC_PROTO_HEADER_SIZE_SPACE = 2; +const uint32_t TrpcFixedHeader::TRPC_PROTO_STREAM_ID_SPACE = 4; +const uint32_t TrpcFixedHeader::TRPC_PROTO_REVERSED_SPACE = 2; + +bool TrpcFixedHeader::decode(Buffer::Instance& buff, bool drain) { + if (buff.length() < TRPC_PROTO_PREFIX_SPACE) { + ENVOY_LOG(debug, "decode buff.length:{} < {}.", buff.length(), TRPC_PROTO_PREFIX_SPACE); + return false; + } + // get magic_value + uint64_t pos = 0; + getIntFromInstance(&magic_value, buff, pos); + // 非trpc协议,断开连接 + if (magic_value != trpc::TrpcMagic::TRPC_MAGIC_VALUE) { + ENVOY_LOG(debug, "decode magic_value:{} != {}.", magic_value, + trpc::TrpcMagic::TRPC_MAGIC_VALUE); + return false; + } + pos += TRPC_PROTO_MAGIC_SPACE; + // get data_frame_type + getIntFromInstance(&data_frame_type, buff, pos); + pos += TRPC_PROTO_DATAFRAME_TYPE_SPACE; + // get stream_frame_type + getIntFromInstance(&stream_frame_type, buff, pos); + pos += TRPC_PROTO_DATAFRAME_STATE_SPACE; + // get data_frame_size + getIntFromInstance(&data_frame_size, buff, pos); + pos += TRPC_PROTO_DATAFRAME_SIZE_SPACE; + // get pb_header_size + getIntFromInstance(&pb_header_size, buff, pos); + pos += TRPC_PROTO_HEADER_SIZE_SPACE; + // get stream_id + getIntFromInstance(&stream_id, buff, pos); + pos += TRPC_PROTO_STREAM_ID_SPACE; + // get reserved + buff.copyOut(pos, sizeof reserved, reserved); + pos += TRPC_PROTO_REVERSED_SPACE; + + if (drain) { + buff.drain(pos); + } + return true; +} + +bool TrpcFixedHeader::encode(Buffer::Instance& buffer) const { + writeIntToInstance(&magic_value, buffer); + writeIntToInstance(&data_frame_type, buffer); + writeIntToInstance(&stream_frame_type, buffer); + writeIntToInstance(&data_frame_size, buffer); + writeIntToInstance(&pb_header_size, buffer); + writeIntToInstance(&stream_id, buffer); + buffer.add(reserved, sizeof reserved); + return true; +} + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/protocol.h b/src/application_protocols/trpc/protocol.h new file mode 100644 index 00000000..835355d1 --- /dev/null +++ b/src/application_protocols/trpc/protocol.h @@ -0,0 +1,242 @@ +#pragma once + +#include +#include + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/logger.h" + +#include "src/meta_protocol_proxy/codec/codec.h" +#include "src/application_protocols/trpc/metadata.h" +#include "src/application_protocols/trpc/trpc.pb.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +class DirectResponse : public Logger::Loggable { +public: + virtual ~DirectResponse() = default; + + // 用于统计 + enum class ResponseType { + // DirectResponse encodes MessageType::Reply with success payload + SuccessReply, + + // DirectResponse encodes MessageType::Reply with an empty payload + ErrorReply, + }; + + /** + * Encodes the response + * @param metadata the MessageMetadata for the request that generated this response + * @param buffer the Buffer into which the message should be encoded + * @return ResponseType indicating whether the message is a successful or error reply + */ + virtual ResponseType encode(MessageMetadata const& meta, Buffer::Instance& data) const = 0; + + virtual int32_t err_code() const = 0; +}; + +struct TrpcFixedHeader : public Logger::Loggable { + static const uint32_t TRPC_PROTO_PREFIX_SPACE; + static const uint32_t TRPC_PROTO_MAGIC_SPACE; + static const uint32_t TRPC_PROTO_DATAFRAME_TYPE_SPACE; + static const uint32_t TRPC_PROTO_DATAFRAME_STATE_SPACE; + static const uint32_t TRPC_PROTO_DATAFRAME_SIZE_SPACE; + static const uint32_t TRPC_PROTO_HEADER_SIZE_SPACE; + static const uint32_t TRPC_PROTO_STREAM_ID_SPACE; + static const uint32_t TRPC_PROTO_REVERSED_SPACE; + + // 魔数 + uint16_t magic_value = trpc::TrpcMagic::TRPC_MAGIC_VALUE; + + // trpc协议的二进制数据帧类型 + // 计划支持两种类型的二进制数据帧: + // 1. 一应一答模式的二进制数据帧类型 + // 2. 流式模式的二进制数据帧类型 + uint8_t data_frame_type = 0; + + // trpc协议的流式帧的类型 + uint8_t stream_frame_type = 0; + + // trpc协议请求体或响应体的二进制数据总大小 + // 由(固定头 + 请求包头或响应包头 + 业务序列化数据)的大小组成 + uint32_t data_frame_size = 0; + + // 请求包头或响应包头的大小 + uint16_t pb_header_size = 0; + + // 流id + uint32_t stream_id = 0; + + // 保留字段 + char reserved[2] = {0}; + + // tRPC协议头部固定帧头数据的解码 + bool decode(Buffer::Instance& buff, bool drain = true); + + // tRPC协议头部固定帧头数据的编码 + bool encode(Buffer::Instance& buffer) const; + + // 获取头部长度(帧头 + pb协议头) + [[nodiscard]] uint32_t getHeaderSize() const { return pb_header_size + TRPC_PROTO_PREFIX_SPACE; } + + // 获取协议包体长度 + [[nodiscard]] uint32_t getPayloadSize() const { return data_frame_size - getHeaderSize(); } +}; + +template class Protocol : public Logger::Loggable { +public: + Protocol() = default; + virtual ~Protocol() = default; + + /** + * 从协议中获取request id + * @param req_id + * @return + */ + bool getRequestId(uint32_t& req_id) const { + req_id = protocol_header_.request_id(); + return true; + } + + /** + * 设置协议的request id + * @param req_id + * @return + */ + bool setRequestId(uint32_t req_id) { + protocol_header_.set_request_id(req_id); + return true; + } + + /** + * 协议请求的解码(只能解码一个完整的消息) + * @param buff 存储待解码的消息 + * @return 成功则返回true,反之返回false + */ + bool decode(Buffer::Instance& buff) { + auto ptr_size = buff.length(); + + if (!fixed_header_.decode(buff, true)) { + return false; + } + + if (ptr_size < fixed_header_.data_frame_size) { + ENVOY_LOG(error, "decode ptr_size:{} < {}.", ptr_size, fixed_header_.data_frame_size); + return false; + } + std::string header_raw; + header_raw.reserve(fixed_header_.pb_header_size); + header_raw.resize(fixed_header_.pb_header_size); + buff.copyOut(TrpcFixedHeader::TRPC_PROTO_PREFIX_SPACE, fixed_header_.pb_header_size, + &(header_raw[0])); + buff.drain(fixed_header_.pb_header_size); + if (!protocol_header_.ParseFromString(header_raw)) { + ENVOY_LOG(error, "decode req_header parse error."); + return false; + } + + body_.move(buff, static_cast(fixed_header_.getPayloadSize())); + + return true; + } + + /** + * 协议请求的编码 (会计算固定帧头的pb_header_size和data_frame_size) + * @param buff 存储编码后会的消息 + * @return 成功则返回true,反之返回false + */ + bool encode(Buffer::Instance& buff) { + // 计算长度 + fixed_header_.magic_value = trpc::TrpcMagic::TRPC_MAGIC_VALUE; + fixed_header_.pb_header_size = protocol_header_.ByteSizeLong(); + fixed_header_.data_frame_size = fixed_header_.getHeaderSize() + body_.length(); + // encode + fixed_header_.encode(buff); + buff.add(protocol_header_.SerializeAsString()); + if (body_.length() > 0) { + buff.add(body_); + } + return true; + } + +inline unsigned int to_uint(char ch) +{ + // EDIT: multi-cast fix as per David Hammen's comment + return static_cast(static_cast(ch)); +} + /** + * 修改 Header + * @param buff + */ + bool mutateHeader(Buffer::Instance& buff, const MetaProtocolProxy::Mutation& mutation) { + auto ptr_size = buff.length(); + + // 解析原始消息 + // 解析fix header + if (!fixed_header_.decode(buff, true)) { + return false; + } + + if (ptr_size < fixed_header_.data_frame_size) { + ENVOY_LOG(error, "decode ptr_size:{} < {}.", ptr_size, fixed_header_.data_frame_size); + return false; + } + + // 解析 protocol header + std::string header_raw; + header_raw.reserve(fixed_header_.pb_header_size); + header_raw.resize(fixed_header_.pb_header_size); + buff.copyOut(0, fixed_header_.pb_header_size, + &(header_raw[0])); + buff.drain(fixed_header_.pb_header_size); + if (!protocol_header_.ParseFromString(header_raw)) { + ENVOY_LOG(error, "decode protocol header parse error."); + return false; + } + + // 保留 body + Buffer::OwnedImpl body; + body.move(buff, static_cast(fixed_header_.getPayloadSize())); + + // 修改/添加消息头 + auto trans_info = protocol_header_.mutable_trans_info(); + for (const auto& keyValue : mutation) { + (*trans_info)[keyValue.first] = keyValue.second; + } + + // 修改 fixed header 中的协议头长度 + fixed_header_.pb_header_size = protocol_header_.ByteSizeLong(); + fixed_header_.data_frame_size = fixed_header_.getHeaderSize() + body.length(); + + // 编码修改后的消息 + fixed_header_.encode(buff); + buff.add(protocol_header_.SerializeAsString()); + buff.add(body); + + return true; + } + +public: + // tRPC协议头部固定帧头 + TrpcFixedHeader fixed_header_; + // tRPC协议请求包头 + T protocol_header_; + // 业务序列化的二进制数据 + Buffer::OwnedImpl body_; +}; + +using TrpcRequestProtocol = Protocol; +using TrpcResponseProtocol = Protocol; +using TrpcStreamInitMeta = Protocol; + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy + diff --git a/src/application_protocols/trpc/trpc.proto b/src/application_protocols/trpc/trpc.proto new file mode 100644 index 00000000..39182e8e --- /dev/null +++ b/src/application_protocols/trpc/trpc.proto @@ -0,0 +1,529 @@ +// Tencent is pleased to support the open source community by making tRPC available. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. All rights reserved. +// If you have downloaded a copy of the tRPC source code from Tencent, +// please note that tRPC source code is licensed under the Apache 2.0 License, +// A copy of the Apache 2.0 License is included in this file. + +syntax = "proto3"; + +package trpc; + +option go_package = "trpc.group/trpc/trpc-protocol/pb/go/trpc"; +option java_package = "com.tencent.trpc.proto.standard.common"; +option java_outer_classname = "TRPCProtocol"; + +// The magic value of trpc protocol +enum TrpcMagic { + // trpc does not use this value, it is used by the pb gen-code tool + TRPC_DEFAULT_NONE = 0x00; + + // The magic value used by trpc protocol + TRPC_MAGIC_VALUE = 0x930; +} + +// The data frame type of the trpc protocol packet +// Two types are currently supported: +// 1. The data frame type for unary(one-response-one-response) +// 2. The data frame type for stream +enum TrpcDataFrameType { + TRPC_UNARY_FRAME = 0x00; + + TRPC_STREAM_FRAME = 0x01; +} + +// The specific frame type of trpc streaming data frame +// Four types are currently supported: +// `INIT` frame: FIXHEADER + TrpcStreamInitMeta +// `DATA` frame: FIXHEADER + body (business serialized data) +// `FEEDBACK` frame: FIXHEADER + TrpcStreamFeedBackMeta (triggered strategy: high/low water level and timer) +// `CLOSE` frame: FIXHEADER + TrpcStreamCloseMeta +enum TrpcStreamFrameType { + TRPC_UNARY = 0x00; + + TRPC_STREAM_FRAME_INIT = 0x01; + + TRPC_STREAM_FRAME_DATA = 0x02; + + TRPC_STREAM_FRAME_FEEDBACK = 0x03; + + TRPC_STREAM_FRAME_CLOSE = 0x04; +} + +// The message definition of stream `INIT` frame +message TrpcStreamInitMeta { + // request meta information + TrpcStreamInitRequestMeta request_meta = 1; + + // response meta information + TrpcStreamInitResponseMeta response_meta = 2; + + // The window size is notified by the receiver to the sender + uint32 init_window_size = 3; + + // The serialization type of the request data + // eg: proto/json/.., default proto + // The specific value corresponds to `TrpcContentEncodeType` + uint32 content_type = 4; + + // The compression type of the requested data + // eg: gzip/snappy/..., not used by default + // The specific value corresponds to `TrpcCompressType` + uint32 content_encoding = 5; +} + +// The request meta information definition of stream `INIT` frame +message TrpcStreamInitRequestMeta { + // Caller name + // The specification format: trpc.application_name.server_name.proto_service_name, 4 segments + bytes caller = 1; + + // Callee name + // The specification format: trpc.application_name.server_name.proto_service_name[.interface_name] + bytes callee = 2; + + // Interface name of callee + // The specification format: /package.service_name/interface_name + bytes func = 3; + + // The message type of the transparent transmission information + // such as tracing, dyeing key, gray, authentication, multi-environment, set name, etc. + // The specific value corresponds to `TrpcMessageType` + uint32 message_type = 4; + + // The information key-value pair transparently transmitted by the framework + // Currently divided into two parts: + // 1 part is the information to be transparently transmitted by the framework layer, + // and the name of the key must be started with `trpc-`` + // 2 part is the information to be transparently transmitted by the business layer, + // and the business can set it by itself, it is recommended to start with `app-``, not `trpc-` + // Note: The key-value pair in trans_info will be transparently transmitted through the whole link, please use it carefully for business. + map trans_info = 5; +}; + +// The response meta information definition of stream `INIT` frame +message TrpcStreamInitResponseMeta { + // Error code + // The specific value corresponds to `TrpcRetCode` + int32 ret = 1; + + // The result information when the call fails + bytes error_msg = 2; +}; + +// The meta information definition of stream `FEEDBACK` frame +message TrpcStreamFeedBackMeta { + // increased window size + uint32 window_size_increment = 1; +} + +// The closed type of trpc stream protocol +enum TrpcStreamCloseType { + // normal closes unidirectional flow + TRPC_STREAM_CLOSE = 0; + + // Exception closes bidirectional stream + TRPC_STREAM_RESET = 1; +} + +// The meta information definition of trpc stream protocol for closing stream +message TrpcStreamCloseMeta { + // The type of stream closure, close one end, or close all + int32 close_type = 1; + + // Error code + // The specific value corresponds to `TrpcRetCode` + int32 ret = 2; + + // The result information when the call fails + bytes msg = 3; + + // The message type of the transparent transmission information + // such as tracing, dyeing key, gray, authentication, multi-environment, set name, etc. + // The specific value corresponds to `TrpcMessageType` + uint32 message_type = 4; + + // The information key-value pair transparently transmitted by the framework + // Currently divided into two parts: + // 1 part is the information to be transparently transmitted by the framework layer, + // and the name of the key must be started with `trpc-`` + // 2 part is the information to be transparently transmitted by the business layer, + // and the business can set it by itself, it is recommended to start with `app-``, not `trpc-` + // Note: The key-value pair in trans_info will be transparently transmitted through the whole link, please use it carefully for business. + map trans_info = 5; + + // The error code of the interface + // 0 means success, other means failure + int32 func_ret = 6; +} + +// The version of trpc protocol +enum TrpcProtoVersion { + TRPC_PROTO_V1 = 0; +} + +// The call type of trpc protocol +enum TrpcCallType { + // Unary(one-response-one-response), include sync and async + TRPC_UNARY_CALL = 0; + + // Oneway + TRPC_ONEWAY_CALL = 1; +} + +// The message type of the transparent transmission information +enum TrpcMessageType { + // trpc does not use this value, it is used by the pb gen-code tool + TRPC_DEFAULT = 0x00; + + // Dyeing message + TRPC_DYEING_MESSAGE = 0x01; + + // Tracing message + TRPC_TRACE_MESSAGE = 0x02; + + // Multi-Environment message + TRPC_MULTI_ENV_MESSAGE = 0x04; + + // grid message + TRPC_GRID_MESSAGE = 0x08; + + // SetNmae message + TRPC_SETNAME_MESSAGE = 0x10; +} + +// The encoding type of the body data in the trpc protocol +// Use proto by default +// At present, it is agreed that the value in the range of 0-127 is used by the framework +enum TrpcContentEncodeType { + // pb + TRPC_PROTO_ENCODE = 0; + + // jce + TRPC_JCE_ENCODE = 1; + + // json + TRPC_JSON_ENCODE = 2; + + // flatbuffer + TRPC_FLATBUFFER_ENCODE = 3; + + // text or binary + TRPC_NOOP_ENCODE = 4; + + // xml + TRPC_XML_ENCODE = 5; + + // thrift + TRPC_THRIFT_ENCODE = 6; +} + +// The compressor type of the body data in the trpc protocol +// No compression by default +enum TrpcCompressType { + // No compression + TRPC_DEFAULT_COMPRESS = 0; + + // gzip + TRPC_GZIP_COMPRESS = 1; + + // snappy(Deprecated) + // please use `TRPC_SNAPPY_STREAM_COMPRESS`/`TRPC_SNAPPY_BLOCK_COMPRESS`, + // Because trpc-go and trpc-cpp use stream and block modes respectively, + // the two are not compatible, and cross-language calls will make mistakes + TRPC_SNAPPY_COMPRESS = 2; + + // zlib + TRPC_ZLIB_COMPRESS = 3; + + // snappy stream + TRPC_SNAPPY_STREAM_COMPRESS = 4; + + // snappy block + TRPC_SNAPPY_BLOCK_COMPRESS = 5; + + // lz4 frame + TRPC_LZ4_FRAME_COMPRESS = 6; + + // lz4 block + TRPC_LZ4_BLOCK_COMPRESS = 7; +} + +// 框架层接口调用的返回码定义 +// The return code definition of the framework layer interface call +enum TrpcRetCode { + // success + TRPC_INVOKE_SUCCESS = 0; + + // server-side error code + // Mainly divided into several categories: + // 1. protocol related, + // 2. interface call related, + // 3. queue timeout or overload related + // 4. ... + + // server-side protocol error code + + // server-side decode error + TRPC_SERVER_DECODE_ERR = 1; + // server-side encode error + TRPC_SERVER_ENCODE_ERR = 2; + + // interface call error code + + // the server-side does not have a corresponding service implementation + TRPC_SERVER_NOSERVICE_ERR = 11; + // the server-side does not have a corresponding interface implementation + TRPC_SERVER_NOFUNC_ERR = 12; + + // timeout/overload/limiter error code + + // the request timed out on the server-side + TRPC_SERVER_TIMEOUT_ERR = 21; + // the request is overloaded on the server-side and the request is discarded + TRPC_SERVER_OVERLOAD_ERR = 22; + // the request is throttled on the server-side + TRPC_SERVER_LIMITED_ERR = 23; + // The request is timed out on the server-side due to the full link timeout + TRPC_SERVER_FULL_LINK_TIMEOUT_ERR = 24; + + // server-side system error + TRPC_SERVER_SYSTEM_ERR = 31; + + // the server-side request authentication failed error + TRPC_SERVER_AUTH_ERR = 41; + + // the server-side request parameter automatic verification failed error + TRPC_SERVER_VALIDATE_ERR = 51; + + // client-side error code + // Mainly divided into several categories: + // 1. timeout related, + // 2. network or connection related, + // 3. protocol related, + // 4. routing related + // 5. ... + + // timeout error code + + // the request is timed out on the client-side + TRPC_CLIENT_INVOKE_TIMEOUT_ERR = 101; + // the request is timed out on the client-side due to the full link timeout + TRPC_CLIENT_FULL_LINK_TIMEOUT_ERR = 102; + + // network or connection error code + + // client-side connection error + TRPC_CLIENT_CONNECT_ERR = 111; + + // protocol error code + + // client-side encode error + TRPC_CLIENT_ENCODE_ERR = 121; + // client-side decode error + TRPC_CLIENT_DECODE_ERR = 122; + + // client-side overload/limter error code + + // the request is throttled on the client-side + TRPC_CLIENT_LIMITED_ERR = 123; + // The request is overloaded on the client-side and discarded + TRPC_CLIENT_OVERLOAD_ERR = 124; + + // service routing error code + TRPC_CLIENT_ROUTER_ERR = 131; + + // client-side network or connection error code + TRPC_CLIENT_NETWORK_ERR = 141; + + // client-side response parameter automatic verification failed error + TRPC_CLIENT_VALIDATE_ERR = 151; + + // upstream actively disconnected, early cancellation request error code + TRPC_CLIENT_CANCELED_ERR = 161; + + // client-side read data frame error + TRPC_CLIENT_READ_FRAME_ERR = 171; + + // error code of the server-side stream + // mainly divided into several categories: + // 1. network or connection related, + // 2. protocol related, + // 3. write stream data related, + // 4. read stream data related, + // 5. ... + + // server-side streaming network or connection error code + TRPC_STREAM_SERVER_NETWORK_ERR = 201; + + // server-side streaming error code + + // stream message exceeds size limit + TRPC_STREAM_SERVER_MSG_EXCEED_LIMIT_ERR = 211; + + // server-side streaming encode error code + TRPC_STREAM_SERVER_ENCODE_ERR = 221; + // server-side streaming decode error code + TRPC_STREAM_SERVER_DECODE_ERR = 222; + + // server-side stream write error code + TRPC_STREAM_SERVER_WRITE_END = 231; + TRPC_STREAM_SERVER_WRITE_OVERFLOW_ERR = 232; + TRPC_STREAM_SERVER_WRITE_CLOSE_ERR = 233; + TRPC_STREAM_SERVER_WRITE_TIMEOUT_ERR = 234; + + // server-side stream read error code + TRPC_STREAM_SERVER_READ_END = 251; + TRPC_STREAM_SERVER_READ_CLOSE_ERR = 252; + TRPC_STREAM_SERVER_READ_EMPTY_ERR = 253; + TRPC_STREAM_SERVER_READ_TIMEOUT_ERR = 254; + + // error code of the client-side stream + // mainly divided into several categories: + // 1. network or connection related, + // 2. protocol related, + // 3. write stream data related, + // 4. read stream data related, + // 5. ... + + // client-side streaming network or connection error code + TRPC_STREAM_CLIENT_NETWORK_ERR = 301; + + // client-side streaming error code + + // client-side stream message exceeds size limit + TRPC_STREAM_CLIENT_MSG_EXCEED_LIMIT_ERR = 311; + + // client-side streaming encode error code + TRPC_STREAM_CLIENT_ENCODE_ERR = 321; + // client-side streaming decode error code + TRPC_STREAM_CLIENT_DECODE_ERR = 322; + + // client-side stream write error code + TRPC_STREAM_CLIENT_WRITE_END = 331; + TRPC_STREAM_CLIENT_WRITE_OVERFLOW_ERR = 332; + TRPC_STREAM_CLIENT_WRITE_CLOSE_ERR = 333; + TRPC_STREAM_CLIENT_WRITE_TIMEOUT_ERR = 334; + + // client-side stream read error code + TRPC_STREAM_CLIENT_READ_END = 351; + TRPC_STREAM_CLIENT_READ_CLOSE_ERR = 352; + TRPC_STREAM_CLIENT_READ_EMPTY_ERR = 353; + TRPC_STREAM_CLIENT_READ_TIMEOUT_ERR = 354; + + // unspecified error code(unary) + TRPC_INVOKE_UNKNOWN_ERR = 999; + // unspecified error code(stream) + TRPC_STREAM_UNKNOWN_ERR = 1000; +} + +// The following key already used by trans_info, be careful not to repeat it: +// "trpc-dyeing-key": dyeing key + +// The request header for unary +message RequestProtocol { + // The version of protocol + // The specific value corresponds to `TrpcProtoVersion` + uint32 version = 1; + + // Call type + // eg: unary, one-way + // The specific value corresponds to `TrpcCallType` + uint32 call_type = 2; + + // The unique id of the request(on the conneciton) + uint32 request_id = 3; + + // The timeout of the request(ms) + uint32 timeout = 4; + + // Caller name + // The specification format: trpc.application_name.server_name.proto_service_name, 4 segments + bytes caller = 5; + + // Callee name + // The specification format: trpc.application_name.server_name.proto_service_name[.interface_name] + bytes callee = 6; + + // Interface name of callee + // The specification format: /package.service_name/interface_name + bytes func = 7; + + // The message type of the transparent transmission information + // such as tracing, dyeing key, gray, authentication, multi-environment, set name, etc. + // The specific value corresponds to `TrpcMessageType` + uint32 message_type = 8; + + // The information key-value pair transparently transmitted by the framework + // Currently divided into two parts: + // 1 part is the information to be transparently transmitted by the framework layer, + // and the name of the key must be started with `trpc-`` + // 2 part is the information to be transparently transmitted by the business layer, + // and the business can set it by itself, it is recommended to start with `app-``, not `trpc-` + // Note: The key-value pair in trans_info will be transparently transmitted through the whole link, please use it carefully for business. + map trans_info = 9; + + // The serialization type of the request data + // eg: proto/json/.., default proto + // The specific value corresponds to `TrpcContentEncodeType` + uint32 content_type = 10; + + // The compression type of the requested data + // eg: gzip/snappy/..., not used by default + // The specific value corresponds to `TrpcCompressType` + uint32 content_encoding = 11; + + // The size of attachment data + uint32 attachment_size = 12; +} + +// The response header for unary +message ResponseProtocol { + // The version of protocol + // The specific value corresponds to `TrpcProtoVersion` + uint32 version = 1; + + // Call type + // eg: unary, one-way + // The specific value corresponds to `TrpcCallType` + uint32 call_type = 2; + + // The unique id of the request(on the conneciton) + uint32 request_id = 3; + + // Error code + // The specific value corresponds to `TrpcRetCode` + int32 ret = 4; + + // The error code of the interface + // 0 means success, other means failure + int32 func_ret = 5; + + // The result information when the call fails + bytes error_msg = 6; + + // The message type of the transparent transmission information + // such as tracing, dyeing key, gray, authentication, multi-environment, set name, etc. + // The specific value corresponds to `TrpcMessageType` + uint32 message_type = 7; + + // The information key-value pair transparently transmitted by the framework + // Currently divided into two parts: + // 1 part is the information to be transparently transmitted by the framework layer, + // and the name of the key must be started with `trpc-`` + // 2 part is the information to be transparently transmitted by the business layer, + // and the business can set it by itself, it is recommended to start with `app-``, not `trpc-` + map trans_info = 8; + + // The serialization type of the request data + // eg: proto/json/.., default proto + // The specific value corresponds to `TrpcContentEncodeType` + uint32 content_type = 9; + + // The compression type of the requested data + // eg: gzip/snappy/..., not used by default + // The specific value corresponds to `TrpcCompressType` + uint32 content_encoding = 10; + + // The size of attachment data + uint32 attachment_size = 12; +} diff --git a/src/application_protocols/trpc/trpc_codec.cc b/src/application_protocols/trpc/trpc_codec.cc new file mode 100644 index 00000000..11d1893a --- /dev/null +++ b/src/application_protocols/trpc/trpc_codec.cc @@ -0,0 +1,203 @@ +#include + +#include "envoy/buffer/buffer.h" + +#include "source/common/common/logger.h" + +#include "src/meta_protocol_proxy/codec/codec.h" +#include "src/application_protocols/trpc/trpc_codec.h" +#include "src/application_protocols/trpc/protocol.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +MetaProtocolProxy::DecodeStatus TrpcCodec::decode(Buffer::Instance& buffer, + MetaProtocolProxy::Metadata& metadata) { + ENVOY_LOG(debug, "trpc decoder: {} bytes available", buffer.length()); + messageType_ = metadata.getMessageType(); + ASSERT(messageType_ == MetaProtocolProxy::MessageType::Request || + messageType_ == MetaProtocolProxy::MessageType::Response); + + auto state = decoder_base_.onData(buffer); + + if (state == CodecChecker::DecodeStage::kWaitForData) { + ENVOY_LOG(debug, "trpc decoder: wait for data"); + return DecodeStatus::WaitForData; + } + ASSERT(state == CodecChecker::DecodeStage::kDecodeDone); + toMetadata(metadata); + return DecodeStatus::Done; +} + +void TrpcCodec::encode(const MetaProtocolProxy::Metadata& metadata, + const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer) { + if (mutation.size() < 1) { + return; + } + for (const auto& keyValue : mutation) { + ENVOY_LOG(debug, "trpc: codec mutation {} : {}", keyValue.first, keyValue.second); + } + if (metadata.getMessageType() == MetaProtocolProxy::MessageType::Request){ + TrpcRequestProtocol request_protocol; + if (!request_protocol.mutateHeader(buffer, mutation)) { + throw EnvoyException("encode request failed"); + } + }else if(metadata.getMessageType() == MetaProtocolProxy::MessageType::Response) { + TrpcResponseProtocol response_protocol; + if (!response_protocol.mutateHeader(buffer, mutation)) { + throw EnvoyException("encode response failed"); + } + } +} + +void TrpcCodec::onError(const MetaProtocolProxy::Metadata& metadata, + const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) { + int32_t errCode; + switch (error.type) { + case MetaProtocolProxy::ErrorType::RouteNotFound: + errCode = trpc::TRPC_SERVER_NOSERVICE_ERR; + break; + default: + errCode = trpc::TRPC_SERVER_SYSTEM_ERR; + } + + if (metadata.getMessageType() == MetaProtocolProxy::MessageType::Stream_Init) { + TrpcStreamInitMeta stream_init; + stream_init.fixed_header_.stream_id = metadata.getStreamId(); + stream_init.fixed_header_.data_frame_type = trpc::TrpcDataFrameType::TRPC_STREAM_FRAME; + stream_init.fixed_header_.stream_frame_type = trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_INIT; + + auto response_meta = stream_init.protocol_header_.mutable_response_meta(); + response_meta->set_ret(errCode); + response_meta->set_error_msg(error.message); + + stream_init.encode(buffer); + } else { + TrpcResponseProtocol response_protocol; + response_protocol.fixed_header_.data_frame_type = trpc::TrpcStreamFrameType::TRPC_UNARY; + + // rsp_header_ + auto& header = response_protocol.protocol_header_; + header.set_request_id(metadata.getRequestId()); + //header.set_call_type(metadata.getUint32("call_type")); + //header.set_version(metadata.getUint32("version")); + //header.set_content_type(metadata.getUint32("content_type")); + //header.set_content_type(metadata.getUint32("content_encoding")); + header.set_error_msg(error.message); + header.set_ret(errCode); + header.set_func_ret(errCode); + + response_protocol.encode(buffer); + } +} + +void TrpcCodec::onFixedHeaderDecoded(std::unique_ptr fixed_header) { + fixed_header_ = std::move(fixed_header); + ENVOY_LOG(debug, "trpc decoder: stream id {}", fixed_header_->stream_id); +} + +bool TrpcCodec::onUnaryHeader(std::string&& header_raw) { + ASSERT(fixed_header_->stream_frame_type == trpc::TrpcStreamFrameType::TRPC_UNARY); + if (messageType_ == MetaProtocolProxy::MessageType::Request) { + return requestHeader_.ParseFromString(header_raw); + } + return responseHeader_.ParseFromString(header_raw); +} + +bool TrpcCodec::onStreamFrame(std::string&& header_raw) { + ASSERT(fixed_header_->stream_frame_type == trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_INIT || + fixed_header_->stream_frame_type == trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_CLOSE || + fixed_header_->stream_frame_type == + trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_FEEDBACK || + fixed_header_->stream_frame_type == trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_DATA); + switch (fixed_header_->stream_frame_type) { + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_INIT: + return streamInitMeta_.ParseFromString(header_raw); + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_CLOSE: + return streamCloseMeta_.ParseFromString(header_raw); + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_DATA: + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_FEEDBACK: + return true; + default: + PANIC("not reached"); + } + return true; +} + +void TrpcCodec::onCompleted(std::unique_ptr buffer) { + origin_msg_ = std::move(buffer); +} + +void TrpcCodec::toMetadata(MetaProtocolProxy::Metadata& metadata) { + metadata.setHeaderSize(fixed_header_->getHeaderSize()); + metadata.setBodySize(fixed_header_->getPayloadSize()); + + switch (fixed_header_->stream_frame_type) { + case trpc::TrpcStreamFrameType::TRPC_UNARY: + if (messageType_ == MetaProtocolProxy::MessageType::Request) { + metadata.setRequestId(requestHeader_.request_id()); + metadata.putString("caller", requestHeader_.caller()); + metadata.putString("callee", requestHeader_.callee()); + metadata.putString("func", requestHeader_.func()); + metadata.put("call_type", requestHeader_.call_type()); + metadata.put("version", requestHeader_.version()); + metadata.put("content_type", requestHeader_.content_type()); + metadata.put("content_encoding", requestHeader_.content_encoding()); + for (auto const& kv : requestHeader_.trans_info()) { + metadata.putString(kv.first, kv.second); + } + } else { + metadata.setRequestId(responseHeader_.request_id()); + metadata.putString("error_msg", responseHeader_.error_msg()); + for (auto const& kv : responseHeader_.trans_info()) { + metadata.putString(kv.first, kv.second); + } + } + break; + // reset messageType of streaming messages to correct types + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_INIT: + ENVOY_LOG(debug, "frame type: frame_init"); + metadata.setMessageType(MetaProtocolProxy::MessageType::Stream_Init); + metadata.setStreamId(fixed_header_->stream_id); + metadata.putString("caller", streamInitMeta_.request_meta().caller()); + metadata.putString("callee", streamInitMeta_.request_meta().callee()); + metadata.putString("func", streamInitMeta_.request_meta().func()); + for (auto const& kv : streamInitMeta_.request_meta().trans_info()) { + metadata.putString(kv.first, kv.second); + } + break; + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_DATA: + ENVOY_LOG(debug, "frame type: frame_data"); + metadata.setMessageType(MetaProtocolProxy::MessageType::Stream_Data); + metadata.setStreamId(fixed_header_->stream_id); + break; + // Metaprotocol framework just treats feedback frame as a plain data frame + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_FEEDBACK: + ENVOY_LOG(debug, "frame type: frame_feedback"); + metadata.setMessageType(MetaProtocolProxy::MessageType::Stream_Data); + metadata.setStreamId(fixed_header_->stream_id); + break; + case trpc::TrpcStreamFrameType::TRPC_STREAM_FRAME_CLOSE: + if (streamCloseMeta_.close_type() == trpc::TrpcStreamCloseType::TRPC_STREAM_CLOSE) { + ENVOY_LOG(debug, "frame type: frame_close_one_way"); + metadata.setMessageType(MetaProtocolProxy::MessageType::Stream_Close_One_Way); + } else { + metadata.setMessageType(MetaProtocolProxy::MessageType::Stream_Close_Two_Way); + ENVOY_LOG(debug, "frame type: frame_close_two_way"); + } + metadata.setStreamId(fixed_header_->stream_id); + break; + } + + metadata.originMessage().move(*origin_msg_); +} + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy + diff --git a/src/application_protocols/trpc/trpc_codec.h b/src/application_protocols/trpc/trpc_codec.h new file mode 100644 index 00000000..7d21c7dc --- /dev/null +++ b/src/application_protocols/trpc/trpc_codec.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include + +#include "envoy/buffer/buffer.h" +#include "envoy/common/optref.h" +#include "envoy/common/pure.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/logger.h" + +#include "src/meta_protocol_proxy/codec/codec.h" +#include "src/application_protocols/trpc/codec_checker.h" +#include "src/application_protocols/trpc/protocol.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace MetaProtocolProxy { +namespace Trpc { + +/** + * Codec for Trpc protocol. + */ +class TrpcCodec : public MetaProtocolProxy::Codec, + public CodecCheckerCallBacks, + public Logger::Loggable { +public: + TrpcCodec() : decoder_base_(*this), messageType_(MetaProtocolProxy::MessageType::Request){}; + ~TrpcCodec() override = default; + + MetaProtocolProxy::DecodeStatus decode(Buffer::Instance& buffer, + MetaProtocolProxy::Metadata& metadata) override; + void encode(const MetaProtocolProxy::Metadata& metadata, + const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer) override; + void onError(const MetaProtocolProxy::Metadata& metadata, const MetaProtocolProxy::Error& error, + Buffer::Instance& buffer) override; + void onFixedHeaderDecoded(std::unique_ptr fixed_header) override; + bool onUnaryHeader(std::string&& header_raw) override; + bool onStreamFrame(std::string&& header_raw) override; + void onCompleted(std::unique_ptr buffer) override; + +private: + void toMetadata(MetaProtocolProxy::Metadata& metadata); + +private: + CodecChecker decoder_base_; + std::unique_ptr fixed_header_; + trpc::RequestProtocol requestHeader_; + trpc::ResponseProtocol responseHeader_; + trpc::TrpcStreamInitMeta streamInitMeta_; + trpc::TrpcStreamCloseMeta streamCloseMeta_; + std::unique_ptr origin_msg_; + MetaProtocolProxy::MessageType messageType_; +}; + +} // namespace Trpc +} // namespace MetaProtocolProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/src/application_protocols/trpc/trpc_codec.proto b/src/application_protocols/trpc/trpc_codec.proto new file mode 100644 index 00000000..9d4d1853 --- /dev/null +++ b/src/application_protocols/trpc/trpc_codec.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package aeraki.meta_protocol.codec; + +import "udpa/annotations/status.proto"; + +option java_package = "io.aeraki.meta_protocol.codec"; +option java_outer_classname = "CodecProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +message TrpcCodec { +} + diff --git a/test/trpc/test.sh b/test/trpc/test.sh new file mode 100755 index 00000000..eaa97f29 --- /dev/null +++ b/test/trpc/test.sh @@ -0,0 +1,12 @@ +#!/bin/bash +BASEDIR=$(dirname "$0") + +docker kill client server +docker rm client server +docker run --network host -d --name server aeraki/trpc-server +docker run --network host -d --env server_addr=127.0.0.1:28000 --env c=1 --env cmd=SayHello --name client aeraki/trpc-client +#docker run --network host --env server_addr=127.0.0.1:28000 --env c=1 --env cmd=SayHelloClientStream --name client aeraki/trpc-client +#docker run --network host --env server_addr=127.0.0.1:28000 --env c=1 --env cmd=SayHelloServerStream --name client aeraki/trpc-client +#docker run --network host --env server_addr=127.0.0.1:28000 --env c=1 --env cmd=SayHelloBidirectionStream --name client aeraki/trpc-client +#docker logs -f server +$BASEDIR/../../bazel-bin/envoy -c $BASEDIR/test.yaml -l debug diff --git a/test/trpc/test.yaml b/test/trpc/test.yaml new file mode 100644 index 00000000..10bd29c5 --- /dev/null +++ b/test/trpc/test.yaml @@ -0,0 +1,102 @@ +admin: + access_log_path: ./envoy_debug.log + address: + socket_address: + address: 127.0.0.1 + port_value: 8080 +node: + cluster: dubbo-sample-consumer.metaprotocol + id: sidecar~10.244.0.14~dubbo-sample-consumer-797c4f7cc4-t678c.meta-dubbo~meta-dubbo.svc.cluster.local + metadata: + ANNOTATIONS: + kubectl.kubernetes.io/default-container: dubbo-sample-consumer + kubectl.kubernetes.io/default-logs-container: dubbo-sample-consumer + APP_CONTAINERS: dubbo-sample-consumer + CLUSTER_ID: Kubernetes + INTERCEPTION_MODE: REDIRECT + LABELS: + app: dubbo-sample-consumer + pod-template-hash: 797c4f7cc4 + security.istio.io/tlsMode: istio + service.istio.io/canonical-name: dubbo-sample-consumer + service.istio.io/canonical-revision: latest + MESH_ID: cluster.local + NAME: dubbo-sample-consumer-797c4f7cc4-t678c + NAMESPACE: meta-dubbo + OWNER: kubernetes://apis/apps/v1/namespaces/meta-dubbo/deployments/dubbo-sample-consumer + PROXY_CONFIG: + binaryPath: /usr/local/bin/envoy + controlPlaneAuthPolicy: MUTUAL_TLS + discoveryAddress: istiod.istio-system.svc:15012 + proxyAdminPort: 15000 + proxyMetadata: + ISTIO_META_DNS_CAPTURE: "true" + serviceCluster: istio-proxy + SERVICE_ACCOUNT: default + WORKLOAD_NAME: dubbo-sample-consumer +static_resources: + listeners: + name: listener_meta_protocol + address: + socket_address: + address: 0.0.0.0 + port_value: 28000 + traffic_direction: OUTBOUND + filter_chains: + - filters: + - name: aeraki.meta_protocol_proxy + typed_config: + '@type': type.googleapis.com/aeraki.meta_protocol_proxy.v1alpha.MetaProtocolProxy + application_protocol: trpc + codec: + name: aeraki.meta_protocol.codec.trpc + metaProtocolFilters: + - name: aeraki.meta_protocol.filters.metadata_exchange + - name: aeraki.meta_protocol.filters.istio_stats + config: + '@type': type.googleapis.com/aeraki.meta_protocol_proxy.filters.istio_stats.v1alpha.IstioStats + destination_service: org.apache.dubbo.samples.basic.api.demoservice + - name: aeraki.meta_protocol.filters.router + routeConfig: + routes: + - name: default + match: + metadata: + - name: callee + exact_match: trpc.test.helloworld + - name: func + prefix_match: /trpc.test.helloworld.Greeter/SayHello + route: + cluster: trpc.test.helloworld + request_mutation: + - key: version + value: v10 + - key: foo + value: bar + - key: foo1 + value: bar1 + statPrefix: trpc.test.helloworld + statPrefix: trpc.test.helloworld + access_log: + - name: envoy.access_loggers.file + typed_config: + '@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + log_format: + text_format_source: + inline_string: | + [%START_TIME%] %REQ(X-META-PROTOCOL-APPLICATION-PROTOCOL)% %RESPONSE_CODE% %RESPONSE_CODE_DETAILS% %CONNECTION_TERMINATION_DETAILS% "%UPSTREAM_TRANSPORT_FAILURE_REASON%" %BYTES_RECEIVED% %BYTES_SENT% %DURATION% "%REQ(X-FORWARDED-FOR)%" "%REQ(X-REQUEST-ID)%" %UPSTREAM_CLUSTER% %UPSTREAM_LOCAL_ADDRESS% %DOWNSTREAM_LOCAL_ADDRESS% %DOWNSTREAM_REMOTE_ADDRESS% %ROUTE_NAME% + path: /dev/stdout + + clusters: + name: trpc.test.helloworld + type: STATIC + connect_timeout: 5s + load_assignment: + cluster_name: trpc.test.helloworld + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 8000