Dawn's Blogs

分享技术 记录成长

0%

本项目完整地址 simple-redis

main.go 文件用于开启一个 simple-redis 服务器,其流程如下:

  • 首先加载配置文件
  • 接着加载日志记录模块
  • 最后开启 TCP 服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var configFilename string
var defaultConfigFileName = "config.yaml"

func main() {
flag.StringVar(&configFilename, "f", defaultConfigFileName, "the config file")
flag.Parse()

// 加载配置文件
config.SetupConfig(configFilename)

// 加载日志
logger.SetupLogger()

//
if err := tcp.ListenAndServeWithSignal(server.MakeHandler()); err != nil {
logger.Error(err)
}
}

TCP 服务器

tcp/setver.go 用于实现 TCP 服务器。

在 main 函数中,调用 ListenAndServeWithSignal 函数开启了一个 TCP 服务器。ListenAndServeWithSignal 的函数头如下:

1
2
// ListenAndServeWithSignal 服务器开启监听,并且使用 signal 作为结束信号
func ListenAndServeWithSignal(handler tcp.Handler) error

tcp.Handler 为 TCP 服务器上层应用接口,在本项目中代表一个 simple-redis 服务器

ListenAndServeWithSignal

ListenAndServeWithSignal 用于绑定端口,开启一个 TCP 服务器,它的流程如下:

  • 首先开启两个通道,一个用于接收系统信号(系统信号用于关闭),一个用于传递关闭信号
1
2
3
closeChan := make(chan struct{})   // 监听结束信号
signalChan := make(chan os.Signal) // 监听 signal
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  • 开启一个协程用于接收系统信号,当接收到关闭的系统信号时,向关闭管道中传入数据,表明要关闭服务器。
1
2
3
4
5
6
7
8
9
10
11
12
go func() {
for {
sig := <-signalChan
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
closeChan <- struct{}{}
return
default:
continue
}
}
}()
  • 开始监听 TCP 端口。
1
2
3
4
5
address := fmt.Sprintf("%v:%v", config.Properties.Bind, config.Properties.Port)
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
  • 调用 ListenAndServe 开启真正的应用层服务
1
2
3
4
logger.Infoln("tcp server is listening at:", address)
ListenAndServe(listener, handler, closeChan)

return nil
阅读全文 »

Triple 协议接口定义如下,包括两个方法:有 grpcProtocol 和 tripleProtocol 实现了这个接口,其中 grpc 用于处理 gRPC 协议(包括 Unary 和 Streaming,Triple 协议底层的 Streaming 通信用 gRPC 实现)和 Triple 协议(仅仅支持 Unary,但是同时支持 HTTP 1 和 HTTP 2)。

  • NewHandler:创建服务端 Handler 的方法,输入为 Handler 的参数,输出为 protocolHandler。
  • NewClient:创建客户端 Client 的方法,输入为 Client 的参数,输出为 protocolClient。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// A Protocol defines the HTTP semantics to use when sending and receiving
// messages. It ties together codecs, compressors, and net/http to produce
// Senders and Receivers.
//
// For example, triple supports the gRPC protocol using this abstraction. Among
// many other things, the protocol implementation is responsible for
// translating timeouts from Go contexts to HTTP and vice versa. For gRPC, it
// converts timeouts to and from strings (for example, 10*time.Second <->
// "10S"), and puts those strings into the "Grpc-Timeout" HTTP header. Other
// protocols might encode durations differently, put them into a different HTTP
// header, or ignore them entirely.
//
// We don't have any short-term plans to export this interface; it's just here
// to separate the protocol-specific portions of triple from the
// protocol-agnostic plumbing.
type protocol interface {
NewHandler(*protocolHandlerParams) protocolHandler
NewClient(*protocolClientParams) (protocolClient, error)
}

Handler

Handler 在 Triple 协议总被定义为服务器。

Handler 配置

Handler 配置 protocolHandlerParams 定义如下:

  • Spec:定义了 handler spec,包括传输类型、请求 Path(Procedure)、是否为客户端、IdempotencyLevel。
  • Codecs:保存了这个 Handler 支持的编码类型。
  • CompressionPools:保存了这个 Handler 支持的压缩方法。
  • CompressMinBytes:当小于这个值时,不进行压缩,即进行压缩的最小长度
  • BufferPool:当接受请求或者发送响应时,进行编码和压缩时用的 buffer,这些 buffer 通过 BufferPool 进行复用(底层为 sync.Pool)。
  • ReadMaxBytes:最大请求读取长度。
  • SendMaxBytes:最大响应写入长度。
  • RequireTripleProtocolHeader:是否需要检测 Triple-Protocol-Version Header,如果可以通过这个 Header 快速判断是否为 Triple 请求
  • IdempotencyLevel:幂等等级。
阅读全文 »

全双工通信

Triple 协议中的全双工通信请求指的是从客户端发出请求,到从服务端收到响应的全双工通信。全双工通信请求 duplexHTTPCall 重要的字段如下:

  • httpClient:HTTP 客户端,用于发出 HTTP 请求。
  • streamType:通信类型,包括 Unary 和 Streaming。
  • requestBodyReader 和 requestBodyWriter:由于写入请求 Body 是通过 io.Pipe 实现的,所以需要记录 writer 和 reader,在写入 writer 后,HTTP 请求的 Body 通过 Reader 读取写入的内容。
  • request 和 response:HTTP 请求和响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// duplexHTTPCall is a full-duplex stream between the client and server. The
// request body is the stream from client to server, and the response body is
// the reverse.
//
// Be warned: we need to use some lesser-known APIs to do this with net/http.
type duplexHTTPCall struct {
ctx context.Context
httpClient HTTPClient
streamType StreamType
validateResponse func(*http.Response) *Error

// We'll use a pipe as the request body. We hand the read side of the pipe to
// net/http, and we write to the write side (naturally). The two ends are
// safe to use concurrently.
requestBodyReader *io.PipeReader
requestBodyWriter *io.PipeWriter

sendRequestOnce sync.Once
responseReady chan struct{}
request *http.Request
response *http.Response

errMu sync.Mutex
err error
}

初始化

全双工通信的初始化如下,请求通过 io.Pipe 实现,用户(可以多次,即 client streaming )向 writer 中写入请求内容,请求 Body 读取请求内容并发送请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func newDuplexHTTPCall(
ctx context.Context,
httpClient HTTPClient,
url *url.URL,
spec Spec,
header http.Header,
) *duplexHTTPCall {
// ensure we make a copy of the url before we pass along to the
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
pipeReader, pipeWriter := io.Pipe()

// todo(DMwangnima): remove cloneURL logic in WithContext
// This is mirroring what http.NewRequestContext did, but
// using an already parsed url.URL object, rather than a string
// and parsing it again. This is a bit funny with HTTP/1.1
// explicitly, but this is logic copied over from
// NewRequestContext and doesn't effect the actual version
// being transmitted.
request := (&http.Request{
Method: http.MethodPost,
URL: url,
Header: header,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: pipeReader,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
requestBodyReader: pipeReader,
requestBodyWriter: pipeWriter,
request: request,
responseReady: make(chan struct{}),
}
}

写请求

写请求 Body 如下,通过向 requestBodyWriter 写入实现。但是可以注意到,在写入之前,首先调用了 d.ensureRequestMade 确保双工的通信流已经被初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Write to the request body. Returns an error wrapping io.EOF after SetError
// is called.
func (d *duplexHTTPCall) Write(data []byte) (int, error) {
// ensure stream has been initialized
d.ensureRequestMade()
// Before we send any data, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
// It's safe to write to this side of the pipe while net/http concurrently
// reads from the other side.
bytesWritten, err := d.requestBodyWriter.Write(data)
if err != nil && errors.Is(err, io.ErrClosedPipe) {
// Signal that the stream is closed with the more-typical io.EOF instead of
// io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to
// match grpc-go's behavior.
return bytesWritten, io.EOF
}
return bytesWritten, err
}

确保双工流被初始化

在 ensureRequestMade 方法中,只会执行一次 go d.makeRequest 用于创建一个请求。在调用 httpClient.Do 方法发出一个 message 后,客户端返回一个 response,用于表示响应流。

在返回 response 后,关闭 responseReady 表示可以读取响应了。这个建立流的过程是异步的,这样不会阻塞 Write 写入 message 方法。

由于 streaming 通信模式:

  • 客户端可以通过 request Body 多次写入,发送多个 message
  • 同样的,服务器的响应通过多次读取 response Body,接收多个 message。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (d *duplexHTTPCall) ensureRequestMade() {
d.sendRequestOnce.Do(func() {
go d.makeRequest()
})
}

func (d *duplexHTTPCall) makeRequest() {
// This runs concurrently with Write and CloseWrite. Read and CloseRead wait
// on d.responseReady, so we can't race with them.
defer close(d.responseReady)

// Once we send a message to the server, they send a message back and
// establish the receive side of the stream.
response, err := d.httpClient.Do(d.request) //nolint:bodyclose
if err != nil {
// stream knowledge
err = wrapIfContextError(err)
err = wrapIfLikelyH2CNotConfiguredError(d.request, err)
err = wrapIfLikelyWithGRPCNotUsedError(err)
err = wrapIfRSTError(err)
if _, ok := asError(err); !ok {
err = NewError(CodeUnavailable, err)
}
d.SetError(err)
return
}
d.response = response
if err := d.validateResponse(response); err != nil {
d.SetError(err)
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.SetError(errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
))
}
}

读响应

读响应 Body 很简单(可以多次读取),只是在读之前需要调用 BlockUntilResponseReady 阻塞,确保响应流已经准备好,可以进行读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Read from the response body. Returns the first error passed to SetError.
func (d *duplexHTTPCall) Read(data []byte) (int, error) {
// For sure that server-to-client stream has been initialized
// First, we wait until we've gotten the response headers and established the
// server-to-client side of the stream.
d.BlockUntilResponseReady()
if err := d.getError(); err != nil {
// The stream is already closed or corrupted.
return 0, err
}
// Before we read, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}

func (d *duplexHTTPCall) BlockUntilResponseReady() {
<-d.responseReady
}

从本节开始,来阅读 Triple 源码,地址为:https://github.com/apache/dubbo-go/tree/main/protocol/triple/triple_protocol

编码

Triple 支持 protobuf、json 多种编码方法,这些编码方法都实现了 Codec 接口:

  • Name 方法:返回编码名称,如 proto 和 json。
  • Marshal 方法:用于序列化。
  • Unmarshal 方法:用于反序列化。
阅读全文 »

gRPC 是谷歌开源的一套 RPC 协议框架,基本上也是最为广泛使用的 RPC 协议。理解 gRPC 就要从两方面去理解:数据编码(序列化)和数据传输。

数据编码(序列化)

数据编码顾名思义就是在将请求的内存对像转化成可以传输的字节流发给服务端,并将收到的字节流再转化成内存对像。gRPC 默认使用 Protobuf 进行序列化,也支持 Json 格式的序列化。

Json

Json 序列化的优点:

  • 可读性高。

缺点:

  • 编码低效,是非二进制编码。
  • 信息冗余,对于同一个 Json 对象,需要传输 key。

Protobuf

Protobuf 序列化的优点:

  • 编码高效,是二进制编码,且会进行压缩。
  • 信息不存在冗余,给每一个字段一个编号,传输时只需要传输这个编号即可。

缺点:

  • 人类不可读性。

Protobuf 需要 IDL 描述接口,使用 IDL 进行强约束有好处也有坏处

好处就是对字段有了约束,只需要传输编号,避免信息冗余。坏处就是需要相关的工具链(protoc),为 IDL 生成代码。

数据传输

gRPC 使用的是 HTTP/2 协议,可以简单的认为一个 gRPC 请求就是一个 HTTP 请求,这个 HTTP 请求用的是 POST 方法。

请求 Request

一个 gRPC 的请求报文如下,包括:

  • HTTP Header
  • HTTP Body:又分为 Length-Prefixed Message 和 Protobuf 消息。

image2022-1-27_15-20-18

HTTP Header

一个 gRPC 定义包含三个部分,包名、服务名和接口名。在发起 HTTP 请求时,Path 路径如下:

1
/${包名}.${服务名}/${接口名}

gRPC 协议规定Content-Type 的取值为 application/grpc(默认,使用 protobuf 编码)、application/grpc+proto(使用 protobuf 编码)、application/json(使用 json 编码)。

HTTP Body

HTTP Body 并不会直接存放 Protobuf 消息,而是先添加 5 个字节的 Length-Prefixed Message 头部,其中用 4 个字节明确 Protobuf 消息的长度,1 字节表示消息是否被压缩过。

为什么要多此一举呢?这是因为,gRPC 支持流式消息,即在 HTTP/2 的 1 条 Stream 中,通过 DATA 帧发送多个 gRPC 消息,而 Length-Prefixed Message 就可以将不同的消息分离开。

因为 Length-Prefixed Message 的五个字节,导致 gRPC 只能是二进制协议,即使是 json 进行编码,curl 和浏览器(web)都不能原生的支持 gRPC,只能用专用的工具!

返回 Response

一个 gRPC 响应报文如下,包括:

  • HTTP 头部:包括 Header 和 Tailer。
  • HTTP Body:又分为 Length-Prefixed Message 和 Protobuf 消息。

image2022-1-27_15-24-18

其中 HTTP 头部被分为 Header 和 Tailer。Tailer 中的 grpc-status 和 grpc-message 是在 DATA 帧的最后,这样就允许服务器在发送完消息后再给出错误码。

gRPC 中以 grpc-status 和 grpc-message 表示自己的返回状态和消息。

HTTP 2 中的 HEADER 帧和 DATA 帧是独立的,对于一个 gRPC 响应可以先发一个 HEADER 帧告知 HTTP 状态,再发送 DATA 帧传输 gRPC 消息,最后发送一个 HEADER 帧传输 grpc-status。

一般都是先发 HEADER 再发 DATA,为什么 gRPC 需要在发完 DATA 之后才发 grpc-status 头呢

因为式传输,在所有的流式消息没有传输完成之前,服务端也不知道要传什么 grpc-status。

gRPC 中的 Tailer 设计

Tailer 介绍

H1.1 中的 Tailer

HTTP 协议在返回数据的时候通常是先发送 Header 信息,再发送 Body 数据。但 Trailers 是一类特殊的 Header,它们是在 Body 传输结束后才发送给客户端的。因为发送顺序不同,所以,在 HTTP/1.1 中 Trailers 只能跟 chunked 传输编码配合使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
Trailer: MD5

7\r\n
Mozilla\r\n
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n
MD5: 68b329da9893e34099c7d8ad5cb9c940\r\n
\r\n

Trailer 头里面表示数据传输结束后还有额外的 Header,Header 的名字为 MD5。可以指定多个,以逗号分隔。所以在分段数据之后又额外发送了所有数据的 MD5 值用作校验。因为 Body 的内容是动态生成的,不可能事先得到它的 MD5 值。只能是一边传输一边计算,等传完了也就计算好了,然后使用 Trailer 头「补发」给客户端。

H2 中的 Tailer

到了 HTTP/2 时代,因为有了帧的概念,所以 Header 和 Body 可以并发传输,不再有先发 Header 再传 Body 的限制。因此,在 HTTP/2 中,Trailers 不需要依赖 chunked 传输编码,所有的响应都能发送 Trailers 信息

gRPC 中的 Tailer

gRPC 中为什么用到了 Tailer?原因就是流传输,在流式接口中因为一个帧可能携带多条数据,所以无法事先确定数据的长度(所以不可能使用 Content-Length 头)

长度不确定,为什么不使用 chunked 传输?

假如客户端跟服务器之前有一个代理,代理收到响应之后开始将数据转发给客户端。首先就是把 Header 部分发送给客户端,于是调用方确定本次的状态码为 200,成功了。然后逐段转发数据部分,如果代理在转发完第一段数据后服务端异常退出了,那代理需要给客户端发送什么信号呢?

因为状态码已经发出去,所以没办法把 200 改成 5xx 了。也不能直接发送 0\r\n 来结束 chunked 传输,这样客户端没有办法得知服务器已经异常退出的信息。唯一能做的就是直接关闭对应的底层连接,但这样会因为客户端创建新的连接而消耗额外的资源。

所以需要找一种尽量复用底层连接的条件下通知客户端服务器出错的办法,最终 gRPC 团队决定使用 Trailers 来传输,Tailer 中携带 grpc-status 来反应响应的状态。

问题

Tailer 导致了不支持浏览器(Chrome 认为 Tailer 会导致安全问题),而 Length-Prefixed Message 同样导致无法直接用 curl + json 的方式来调试 gRPC 接口,必须使用专门的工具。

然有了消息前缀,那完全可以把 Trailers 的职能转移到消息前缀里,比如可以设置一个特殊的前缀来传输 grpc-status 等字段。如果当初这样做的话,那么现在也就可以直接在浏览器调用 gRPC 接口了,可以惜木已成舟。

gRPC 仅支持 HTTP 2 协议,而 Triple 协议不仅仅支持 HTTP 2,还支持 HTTP 1,下面介绍 HTTP 2 协议。

为什么提出 HTTP 2

既然已经出现了 HTTP 1.0、 HTTP 1.1,为什么要提出 HTTP 2,这就不得不说一下 HTTP 1.X 的缺陷。

HTTP 1.X 的缺陷

HTTP 1.X 主要有两个缺陷:

  • Header 巨大。
  • 并发能力差。

Header 巨大

每一个 HTTP 1.X 请求和响应都会携带沉重的 Header,这些 Header 严重拖慢传输速度。

并发能力差

尽管从 HTTP 1.0 到 HTTP 1.1 已经做了很多优化,但远远不够:

  • HTTP 1.0:刚开始是非持久连接,一个 TCP 连接只会发出一个 HTTP 请求。后来提出持久连接,但是只能等待上一个请求的响应回来了,才能发出下一个请求;并且持久连接与压缩是冲突的。
  • HTTP 1.1:为了解决持久连接和压缩冲突的问题,提出了分块传输。又提出了 pipeline 模式,允许请求方一口气并发多个请求,但是响应需要按照请求的顺序排列,所以普及率不高。

但是访问一个页面需要发出很多请求(css、js、img 等),如果让请求一个个串行执行,那页面的渲染会变得极慢。于是只能同时创建多个 TCP 连接,实现并发下载数据,快速渲染出页面。这会给浏览器造成较大的资源消耗,电脑会变卡。很多浏览器为了兼顾下载速度和资源消耗,会对同一个域名限制并发的 TCP 连接数量,如 Chrome 是 6 个左右,剩下的请求则需要排队。

为了避开这个数量限制,可以将图片、css、js 等资源放在不同域名下(或者二级域名),避开排队导致的渲染延迟。快速下载的目标实现了,但这会消耗更多的资源,背后都是高昂的带宽、CDN 成本。

HTTP 2 的提出

HTTP 2 的提出就是为了低成本、快速传输,主要做了以下几点:

  • HTTP 2 未改变 HTTP 协议的语义,只是在传输上做了优化。
  • 引入帧、流的概念,在一个域名下只有一个 TCP 连接,借助帧、流可以实现多路复用,降低资源消耗。
  • 引入头部压缩,降低 Header 的空间,提高传输速度。
阅读全文 »

Triple 协议简介

Triple 协议是 Dubbo3 设计的基于 HTTP 的 RPC 通信协议规范,它完全兼容 gRPC 协议,支持 Unary、Streaming 流式等通信模型,可同时运行在 HTTP/1 和 HTTP/2 之上。

Triple 包含两个部分:

  1. 构建一套自定义的精简 HTTP RPC 子协议,支持 HTTP/1 和 HTTP/2 为传输协议,仅支持 Unary 通信

  2. 构建基于 gRPC 的扩展子协议(与 gRPC 兼容),仅支持 HTTP/2 实现,支持 Unary 和 Streaming 通信

目标

Triple 协议完全兼容 gRPC 协议,为什么 Dubbo 还要通过 Triple 重新实现一遍?目标有如下两点:

  • 在协议设计上,Triple 是一个基于 HTTP 传输层协议的 RPC 协议,它同时可运行在 HTTP/1、HTTP/2 之上
  • 完全兼容基于 HTTP/2 的 gRPC 协议,因此 Dubbo Triple 协议实现可以 100% 与 gRPC 体系互调互通。
  • 仅依赖标准的、被广泛使用的 HTTP 特性,以便在实现层面可以直接依赖官方的标准 HTTP 网络库

协议规范

Triple 包含两个部分:

  1. 构建一套自定义的精简 HTTP RPC 子协议,支持 HTTP/1 和 HTTP/2 为传输协议,仅支持 Unary 通信

  2. 构建基于 gRPC 的扩展子协议(与 gRPC 兼容),仅支持 HTTP/2 实现,支持 Unary 和 Streaming 通信

HTTP RPC 协议

Triple HTTP RPC 同时支持 HTTP/1、HTTP/2 作为底层传输层协议,在实现上对应支持的 content-type 类型为 application/json、application/proto。

阅读全文 »