Dawn's Blogs

分享技术 记录成长

0%

Triple协议 (5) Triple实现之全双工通信

全双工通信

Triple 协议中的全双工通信请求指的是从客户端发出请求,到从服务端收到响应的全双工通信。全双工通信请求 duplexHTTPCall 重要的字段如下:

  • httpClient:HTTP 客户端,用于发出 HTTP 请求。
  • streamType:通信类型,包括 Unary 和 Streaming。
  • requestBodyReader 和 requestBodyWriter:由于写入请求 Body 是通过 io.Pipe 实现的,所以需要记录 writer 和 reader,在写入 writer 后,HTTP 请求的 Body 通过 Reader 读取写入的内容。
  • request 和 response:HTTP 请求和响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// duplexHTTPCall is a full-duplex stream between the client and server. The
// request body is the stream from client to server, and the response body is
// the reverse.
//
// Be warned: we need to use some lesser-known APIs to do this with net/http.
type duplexHTTPCall struct {
ctx context.Context
httpClient HTTPClient
streamType StreamType
validateResponse func(*http.Response) *Error

// We'll use a pipe as the request body. We hand the read side of the pipe to
// net/http, and we write to the write side (naturally). The two ends are
// safe to use concurrently.
requestBodyReader *io.PipeReader
requestBodyWriter *io.PipeWriter

sendRequestOnce sync.Once
responseReady chan struct{}
request *http.Request
response *http.Response

errMu sync.Mutex
err error
}

初始化

全双工通信的初始化如下,请求通过 io.Pipe 实现,用户(可以多次,即 client streaming )向 writer 中写入请求内容,请求 Body 读取请求内容并发送请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func newDuplexHTTPCall(
ctx context.Context,
httpClient HTTPClient,
url *url.URL,
spec Spec,
header http.Header,
) *duplexHTTPCall {
// ensure we make a copy of the url before we pass along to the
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
pipeReader, pipeWriter := io.Pipe()

// todo(DMwangnima): remove cloneURL logic in WithContext
// This is mirroring what http.NewRequestContext did, but
// using an already parsed url.URL object, rather than a string
// and parsing it again. This is a bit funny with HTTP/1.1
// explicitly, but this is logic copied over from
// NewRequestContext and doesn't effect the actual version
// being transmitted.
request := (&http.Request{
Method: http.MethodPost,
URL: url,
Header: header,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Body: pipeReader,
Host: url.Host,
}).WithContext(ctx)
return &duplexHTTPCall{
ctx: ctx,
httpClient: httpClient,
streamType: spec.StreamType,
requestBodyReader: pipeReader,
requestBodyWriter: pipeWriter,
request: request,
responseReady: make(chan struct{}),
}
}

写请求

写请求 Body 如下,通过向 requestBodyWriter 写入实现。但是可以注意到,在写入之前,首先调用了 d.ensureRequestMade 确保双工的通信流已经被初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Write to the request body. Returns an error wrapping io.EOF after SetError
// is called.
func (d *duplexHTTPCall) Write(data []byte) (int, error) {
// ensure stream has been initialized
d.ensureRequestMade()
// Before we send any data, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
// It's safe to write to this side of the pipe while net/http concurrently
// reads from the other side.
bytesWritten, err := d.requestBodyWriter.Write(data)
if err != nil && errors.Is(err, io.ErrClosedPipe) {
// Signal that the stream is closed with the more-typical io.EOF instead of
// io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to
// match grpc-go's behavior.
return bytesWritten, io.EOF
}
return bytesWritten, err
}

确保双工流被初始化

在 ensureRequestMade 方法中,只会执行一次 go d.makeRequest 用于创建一个请求。在调用 httpClient.Do 方法发出一个 message 后,客户端返回一个 response,用于表示响应流。

在返回 response 后,关闭 responseReady 表示可以读取响应了。这个建立流的过程是异步的,这样不会阻塞 Write 写入 message 方法。

由于 streaming 通信模式:

  • 客户端可以通过 request Body 多次写入,发送多个 message
  • 同样的,服务器的响应通过多次读取 response Body,接收多个 message。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (d *duplexHTTPCall) ensureRequestMade() {
d.sendRequestOnce.Do(func() {
go d.makeRequest()
})
}

func (d *duplexHTTPCall) makeRequest() {
// This runs concurrently with Write and CloseWrite. Read and CloseRead wait
// on d.responseReady, so we can't race with them.
defer close(d.responseReady)

// Once we send a message to the server, they send a message back and
// establish the receive side of the stream.
response, err := d.httpClient.Do(d.request) //nolint:bodyclose
if err != nil {
// stream knowledge
err = wrapIfContextError(err)
err = wrapIfLikelyH2CNotConfiguredError(d.request, err)
err = wrapIfLikelyWithGRPCNotUsedError(err)
err = wrapIfRSTError(err)
if _, ok := asError(err); !ok {
err = NewError(CodeUnavailable, err)
}
d.SetError(err)
return
}
d.response = response
if err := d.validateResponse(response); err != nil {
d.SetError(err)
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.SetError(errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
))
}
}

读响应

读响应 Body 很简单(可以多次读取),只是在读之前需要调用 BlockUntilResponseReady 阻塞,确保响应流已经准备好,可以进行读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Read from the response body. Returns the first error passed to SetError.
func (d *duplexHTTPCall) Read(data []byte) (int, error) {
// For sure that server-to-client stream has been initialized
// First, we wait until we've gotten the response headers and established the
// server-to-client side of the stream.
d.BlockUntilResponseReady()
if err := d.getError(); err != nil {
// The stream is already closed or corrupted.
return 0, err
}
// Before we read, check if the context has been canceled.
if err := d.ctx.Err(); err != nil {
d.SetError(err)
return 0, wrapIfContextError(err)
}
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}

func (d *duplexHTTPCall) BlockUntilResponseReady() {
<-d.responseReady
}