// duplexHTTPCall is a full-duplex stream between the client and server. The // request body is the stream from client to server, and the response body is // the reverse. // // Be warned: we need to use some lesser-known APIs to do this with net/http. type duplexHTTPCall struct { ctx context.Context httpClient HTTPClient streamType StreamType validateResponse func(*http.Response) *Error
// We'll use a pipe as the request body. We hand the read side of the pipe to // net/http, and we write to the write side (naturally). The two ends are // safe to use concurrently. requestBodyReader *io.PipeReader requestBodyWriter *io.PipeWriter
funcnewDuplexHTTPCall( ctx context.Context, httpClient HTTPClient, url *url.URL, spec Spec, header http.Header, ) *duplexHTTPCall { // ensure we make a copy of the url before we pass along to the // Request. This ensures if a transport out of our control wants // to mutate the req.URL, we don't feel the effects of it. url = cloneURL(url) pipeReader, pipeWriter := io.Pipe()
// todo(DMwangnima): remove cloneURL logic in WithContext // This is mirroring what http.NewRequestContext did, but // using an already parsed url.URL object, rather than a string // and parsing it again. This is a bit funny with HTTP/1.1 // explicitly, but this is logic copied over from // NewRequestContext and doesn't effect the actual version // being transmitted. request := (&http.Request{ Method: http.MethodPost, URL: url, Header: header, Proto: "HTTP/1.1", ProtoMajor: 1, ProtoMinor: 1, Body: pipeReader, Host: url.Host, }).WithContext(ctx) return &duplexHTTPCall{ ctx: ctx, httpClient: httpClient, streamType: spec.StreamType, requestBodyReader: pipeReader, requestBodyWriter: pipeWriter, request: request, responseReady: make(chanstruct{}), } }
写请求
写请求 Body 如下,通过向 requestBodyWriter 写入实现。但是可以注意到,在写入之前,首先调用了 d.ensureRequestMade 确保双工的通信流已经被初始化。
// Write to the request body. Returns an error wrapping io.EOF after SetError // is called. func(d *duplexHTTPCall)Write(data []byte)(int, error) { // ensure stream has been initialized d.ensureRequestMade() // Before we send any data, check if the context has been canceled. if err := d.ctx.Err(); err != nil { d.SetError(err) return0, wrapIfContextError(err) } // It's safe to write to this side of the pipe while net/http concurrently // reads from the other side. bytesWritten, err := d.requestBodyWriter.Write(data) if err != nil && errors.Is(err, io.ErrClosedPipe) { // Signal that the stream is closed with the more-typical io.EOF instead of // io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to // match grpc-go's behavior. return bytesWritten, io.EOF } return bytesWritten, err }
func(d *duplexHTTPCall)ensureRequestMade() { d.sendRequestOnce.Do(func() { go d.makeRequest() }) }
func(d *duplexHTTPCall)makeRequest() { // This runs concurrently with Write and CloseWrite. Read and CloseRead wait // on d.responseReady, so we can't race with them. deferclose(d.responseReady)
// Once we send a message to the server, they send a message back and // establish the receive side of the stream. response, err := d.httpClient.Do(d.request) //nolint:bodyclose if err != nil { // stream knowledge err = wrapIfContextError(err) err = wrapIfLikelyH2CNotConfiguredError(d.request, err) err = wrapIfLikelyWithGRPCNotUsedError(err) err = wrapIfRSTError(err) if _, ok := asError(err); !ok { err = NewError(CodeUnavailable, err) } d.SetError(err) return } d.response = response if err := d.validateResponse(response); err != nil { d.SetError(err) return } if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 { // If we somehow dialed an HTTP/1.x server, fail with an explicit message // rather than returning a more cryptic error later on. d.SetError(errorf( CodeUnimplemented, "response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2", d.request.URL, response.ProtoMajor, response.ProtoMinor, )) } }
读响应
读响应 Body 很简单(可以多次读取),只是在读之前需要调用 BlockUntilResponseReady 阻塞,确保响应流已经准备好,可以进行读取。
// Read from the response body. Returns the first error passed to SetError. func(d *duplexHTTPCall)Read(data []byte)(int, error) { // For sure that server-to-client stream has been initialized // First, we wait until we've gotten the response headers and established the // server-to-client side of the stream. d.BlockUntilResponseReady() if err := d.getError(); err != nil { // The stream is already closed or corrupted. return0, err } // Before we read, check if the context has been canceled. if err := d.ctx.Err(); err != nil { d.SetError(err) return0, wrapIfContextError(err) } if d.response == nil { return0, fmt.Errorf("nil response from %v", d.request.URL) } n, err := d.response.Body.Read(data) return n, wrapIfRSTError(err) }