Dawn's Blogs

分享技术 记录成长

0%

Triple协议 (6) Triple实现之协议接口定义

Triple 协议接口定义如下,包括两个方法:有 grpcProtocol 和 tripleProtocol 实现了这个接口,其中 grpc 用于处理 gRPC 协议(包括 Unary 和 Streaming,Triple 协议底层的 Streaming 通信用 gRPC 实现)和 Triple 协议(仅仅支持 Unary,但是同时支持 HTTP 1 和 HTTP 2)。

  • NewHandler:创建服务端 Handler 的方法,输入为 Handler 的参数,输出为 protocolHandler。
  • NewClient:创建客户端 Client 的方法,输入为 Client 的参数,输出为 protocolClient。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// A Protocol defines the HTTP semantics to use when sending and receiving
// messages. It ties together codecs, compressors, and net/http to produce
// Senders and Receivers.
//
// For example, triple supports the gRPC protocol using this abstraction. Among
// many other things, the protocol implementation is responsible for
// translating timeouts from Go contexts to HTTP and vice versa. For gRPC, it
// converts timeouts to and from strings (for example, 10*time.Second <->
// "10S"), and puts those strings into the "Grpc-Timeout" HTTP header. Other
// protocols might encode durations differently, put them into a different HTTP
// header, or ignore them entirely.
//
// We don't have any short-term plans to export this interface; it's just here
// to separate the protocol-specific portions of triple from the
// protocol-agnostic plumbing.
type protocol interface {
NewHandler(*protocolHandlerParams) protocolHandler
NewClient(*protocolClientParams) (protocolClient, error)
}

Handler

Handler 在 Triple 协议总被定义为服务器。

Handler 配置

Handler 配置 protocolHandlerParams 定义如下:

  • Spec:定义了 handler spec,包括传输类型、请求 Path(Procedure)、是否为客户端、IdempotencyLevel。
  • Codecs:保存了这个 Handler 支持的编码类型。
  • CompressionPools:保存了这个 Handler 支持的压缩方法。
  • CompressMinBytes:当小于这个值时,不进行压缩,即进行压缩的最小长度
  • BufferPool:当接受请求或者发送响应时,进行编码和压缩时用的 buffer,这些 buffer 通过 BufferPool 进行复用(底层为 sync.Pool)。
  • ReadMaxBytes:最大请求读取长度。
  • SendMaxBytes:最大响应写入长度。
  • RequireTripleProtocolHeader:是否需要检测 Triple-Protocol-Version Header,如果可以通过这个 Header 快速判断是否为 Triple 请求
  • IdempotencyLevel:幂等等级。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// HandlerParams are the arguments provided to a Protocol's NewHandler
// method, bundled into a struct to allow backward-compatible argument
// additions. Protocol implementations should take care to use the supplied
// Spec rather than constructing their own, since new fields may have been
// added.
type protocolHandlerParams struct {
Spec Spec
Codecs readOnlyCodecs
CompressionPools readOnlyCompressionPools
CompressMinBytes int
BufferPool *bufferPool
ReadMaxBytes int
SendMaxBytes int
RequireTripleProtocolHeader bool
IdempotencyLevel IdempotencyLevel
}

Handler 定义

Handler 的抽象定义如下,在 Triple 实现时,grpcHandler(用于 streaming 通信和 unary gRPC 通信)和 tripleHandler(仅用于实现 triple unary 通信)实现了这个接口。定义的方法如下:

  • Methods:返回可以处理的 HTTP 方法。
  • ContentTypes:返回可以处理的 Content-Type 类型。
  • SetTimeout:通过读取请求中的 timeout Header(Triple-Timeout-Ms,Grpc-Timeout),来设置处理的超时时间,返回带超时时间的 context。
  • CanHandlePayload:返回是否可以处理 HTTP 请求。
  • NewConn:Handler 的核心函数,用与创建一会与客户端的连接,可以读取 request,写入 response。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Handler is the server side of a protocol. HTTP handlers typically support
// multiple protocols, codecs, and compressors.
type protocolHandler interface {
// Methods is the set of HTTP methods the protocol can handle.
Methods() map[string]struct{}

// ContentTypes is the set of HTTP Content-Types that the protocol can
// handle.
ContentTypes() map[string]struct{}

// SetTimeout runs before NewStream. Implementations may inspect the HTTP
// request, parse any timeout set by the client, and return a modified
// context and cancellation function.
//
// If the client didn't send a timeout, SetTimeout should return the
// request's context, a nil cancellation function, and a nil error.
SetTimeout(*http.Request) (context.Context, context.CancelFunc, error)

// CanHandlePayload returns true if the protocol can handle an HTTP request.
// This is called after the request method is validated, so we only need to
// be concerned with the content type/payload specifically.
CanHandlePayload(*http.Request, string) bool

// NewConn constructs a HandlerConn for the message exchange.
NewConn(http.ResponseWriter, *http.Request) (handlerConnCloser, bool)
}

Client

Client 在 Triple 协议中被定义为客户端。

Client 配置

Client 配置 protocolClientParams 定义如下:

  • CompressionName:此次请求使用的压缩方法名。
  • CompressionPools:客户端支持的压缩方法。
  • Codec:客户端支持的编码方法。
  • CompressMinBytes:当小于这个值时,不进行压缩,即进行压缩的最小长度
  • HTTPClient:HTTPClient,发出 HTTP 请求的客户端,即 http.Client。
  • URL:请求的 URL。
  • BufferPool:当接受请求或者发送响应时,进行编码和压缩时用的 buffer,这些 buffer 通过 BufferPool 进行复用(底层为 sync.Pool)。
  • ReadMaxBytes:最大响应读取长度。
  • SendMaxBytes:最大请求写入长度。
  • Protobuf:protobuf 编码方式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ClientParams are the arguments provided to a Protocol's NewClient method,
// bundled into a struct to allow backward-compatible argument additions.
// Protocol implementations should take care to use the supplied Spec rather
// than constructing their own, since new fields may have been added.
type protocolClientParams struct {
CompressionName string
CompressionPools readOnlyCompressionPools
Codec Codec
CompressMinBytes int
HTTPClient HTTPClient
URL *url.URL
BufferPool *bufferPool
ReadMaxBytes int
SendMaxBytes int
EnableGet bool
GetURLMaxBytes int
GetUseFallback bool
// The gRPC family of protocols always needs access to a Protobuf codec to
// marshal and unmarshal errors.
Protobuf Codec
}

Client 定义

Client 的抽象定义如下,同样的,在 Triple 实现时,grpcClient(用于 streaming 通信和 unary gRPC 通信)和 tripleClient(仅用于实现 triple unary 通信)实现了这个接口。定义的方法如下:

  • Peer:返回 RPC 请求的 Server 信息,包括地址,协议,url query 参数。
  • WriteRequestHander:向 HTTP Header 中写入协议相关的 Header(Triple 或者 gRPC 特有的 Header)。
  • NewConn:Client 的核心方法,用于与服务器之间创建一条连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Client is the client side of a protocol. HTTP clients typically use a single
// protocol, codec, and compressor to send requests.
type protocolClient interface {
// Peer describes the server for the RPC.
Peer() Peer

// WriteRequestHeader writes any protocol-specific request headers.
WriteRequestHeader(StreamType, http.Header)

// NewConn constructs a StreamingClientConn for the message exchange.
//
// Implementations should assume that the supplied HTTP headers have already
// been populated by WriteRequestHeader. When constructing a stream for a
// unary call, implementations may assume that the Sender's Send and Close
// methods return before the Receiver's Receive or Close methods are called.
NewConn(context.Context, Spec, http.Header) StreamingClientConn
}

连接的抽象

在 Handler 和 Client 接口中,都有 NewConn 方法,返回一个连接。在 Triple 中,客户端和服务器之间的连接也进行了抽象。并且分别以客户端和服务器的视角,分别进行了定义。

  • StreamingHandlerConn:以服务器的视角,描述了服务器所接受的客户端的连接。
    • 类似于 http.ResponseWriter,在第一次向客户端响应信息时,写入 Response Header,后续的更改实际上是没有用的。
    • 当客户端完成发送数据时,Receive 返回一个包装 io.EOF 的错误。
    • Triple-Grpc- 开头的头部和尾部是 gRPC 和 Triple 协议保留的,应用可以读取它们但不能写入它们。
    • 保证所有返回的错误都是 Triple 协议定义的标准错误。
  • StreamingClientConn:以客户端的视角,描述了客户端与服务器之间的连接。
    • 在第一次向服务器发送请求时,写入 Request Header,后续的更改实际上是没有用的。
    • 当服务器完成发送数据时,StreamingClientConn 的 Receive 方法将返回一个错误,该错误包装了 io.EOF。
    • Triple-Grpc- 开头的头部和尾部是 gRPC 和 Triple 协议保留的,应用可以读取它们但不能写入它们。
    • 保证所有返回的错误都是 Triple 协议定义的标准错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// StreamingHandlerConn is the server's view of a bidirectional message
// exchange. Interceptors for streaming RPCs may wrap StreamingHandlerConns.
//
// Like the standard library's [http.ResponseWriter], StreamingHandlerConns write
// response headers to the network with the first call to Send. Any subsequent
// mutations are effectively no-ops. Handlers may mutate response trailers at
// any time before returning. When the client has finished sending data,
// Receive returns an error wrapping [io.EOF]. Handlers should check for this
// using the standard library's [errors.Is].
//
// Headers and trailers beginning with "Triple-" and "Grpc-" are reserved for
// use by the gRPC and Triple protocols: applications may read them but
// shouldn't write them.
//
// StreamingHandlerConn implementations provided by this module guarantee that
// all returned errors can be cast to [*Error] using the standard library's
// [errors.As].
//
// StreamingHandlerConn implementations do not need to be safe for concurrent use.
type StreamingHandlerConn interface {
Spec() Spec
Peer() Peer

Receive(interface{}) error
RequestHeader() http.Header

Send(interface{}) error
ResponseHeader() http.Header
ResponseTrailer() http.Header
}

// StreamingClientConn is the client's view of a bidirectional message exchange.
// Interceptors for streaming RPCs may wrap StreamingClientConn.
//
// StreamingClientConn write request headers to the network with the first
// call to Send. Any subsequent mutations are effectively no-ops. When the
// server is done sending data, the StreamingClientConn's Receive method
// returns an error wrapping [io.EOF]. Clients should check for this using the
// standard library's [errors.Is] or [IsEnded]. If the server encounters an error
// during processing, subsequent calls to the StreamingClientConn's Send method
// will return an error wrapping [io.EOF]; clients may then call Receive to
// unmarshal the error.
//
// Headers and trailers beginning with "Triple-" and "Grpc-" are reserved for
// use by the gRPC and Triple protocols: applications may read them but
// shouldn't write them.
//
// StreamingClientConn implementations provided by this module guarantee that
// all returned errors can be cast to [*Error] using the standard library's
// [errors.As].
//
// In order to support bidirectional streaming RPCs, all StreamingClientConn
// implementations must support limited concurrent use. See the comments on
// each group of methods for details.
type StreamingClientConn interface {
// Spec and Peer must be safe to call concurrently with all other methods.
Spec() Spec
Peer() Peer

// Send, RequestHeader, and CloseRequest may race with each other, but must
// be safe to call concurrently with all other methods.
Send(interface{}) error
RequestHeader() http.Header
CloseRequest() error

// Receive, ResponseHeader, ResponseTrailer, and CloseResponse may race with
// each other, but must be safe to call concurrently with all other methods.
Receive(interface{}) error
ResponseHeader() http.Header
ResponseTrailer() http.Header
CloseResponse() error
}