Dawn's Blogs

分享技术 记录成长

0%

Triple协议 (4) Triple实现之编码和压缩

从本节开始,来阅读 Triple 源码,地址为:https://github.com/apache/dubbo-go/tree/main/protocol/triple/triple_protocol

编码

Triple 支持 protobuf、json 多种编码方法,这些编码方法都实现了 Codec 接口:

  • Name 方法:返回编码名称,如 proto 和 json。
  • Marshal 方法:用于序列化。
  • Unmarshal 方法:用于反序列化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Codec marshals structs (typically generated from a schema) to and from bytes.
type Codec interface {
// Name returns the name of the Codec.
//
// This may be used as part of the Content-Type within HTTP. For example,
// with gRPC this is the content subtype, so "application/grpc+proto" will
// map to the Codec with name "proto".
//
// Names must not be empty.
Name() string
// Marshal marshals the given message.
//
// Marshal may expect a specific type of message, and will error if this type
// is not given.
Marshal(interface{}) ([]byte, error)
// Unmarshal unmarshals the given message.
//
// Unmarshal may expect a specific type of message, and will error if this
// type is not given.
Unmarshal([]byte, interface{}) error
}

Codec 接口实现

proto 编码

protobuf 编码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type protoBinaryCodec struct{}

var _ Codec = (*protoBinaryCodec)(nil)

func (c *protoBinaryCodec) Name() string { return codecNameProto }

func (c *protoBinaryCodec) Marshal(message interface{}) ([]byte, error) {
protoMessage, ok := message.(proto.Message)
if !ok {
return nil, errNotProto(message)
}
return proto.Marshal(protoMessage)
}

func (c *protoBinaryCodec) Unmarshal(data []byte, message interface{}) error {
protoMessage, ok := message.(proto.Message)
if !ok {
return errNotProto(message)
}
return proto.Unmarshal(data, protoMessage)
}

json 编码

json 编码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type protoJSONCodec struct {
name string
}

var _ Codec = (*protoJSONCodec)(nil)

func (c *protoJSONCodec) Name() string { return c.name }

func (c *protoJSONCodec) Marshal(message interface{}) ([]byte, error) {
protoMessage, ok := message.(proto.Message)
if !ok {
return nil, errNotProto(message)
}
var options protojson.MarshalOptions
return options.Marshal(protoMessage)
}

func (c *protoJSONCodec) Unmarshal(binary []byte, message interface{}) error {
protoMessage, ok := message.(proto.Message)
if !ok {
return errNotProto(message)
}
if len(binary) == 0 {
return errors.New("zero-length payload is not a valid JSON object")
}
var options protojson.UnmarshalOptions
return options.Unmarshal(binary, protoMessage)
}

编码方法字典

为了集成编码方法,实现了一个只读的编码方法字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// readOnlyCodecs is a read-only interface to a map of named codecs.
type readOnlyCodecs interface {
// Get gets the Codec with the given name.
Get(string) Codec
// Protobuf gets the user-supplied protobuf codec, falling back to the default
// implementation if necessary.
//
// This is helpful in the gRPC protocol, where the wire protocol requires
// marshaling protobuf structs to binary even if the RPC procedures were
// generated from a different IDL.
Protobuf() Codec
// Names returns a copy of the registered codec names. The returned slice is
// safe for the caller to mutate.
Names() []string
}

func newReadOnlyCodecs(nameToCodec map[string]Codec) readOnlyCodecs {
return &codecMap{
nameToCodec: nameToCodec,
}
}

type codecMap struct {
nameToCodec map[string]Codec
}

func (m *codecMap) Get(name string) Codec {
return m.nameToCodec[name]
}

func (m *codecMap) Protobuf() Codec {
if pb, ok := m.nameToCodec[codecNameProto]; ok {
return pb
}
return &protoBinaryCodec{}
}

func (m *codecMap) Names() []string {
names := make([]string, 0, len(m.nameToCodec))
for name := range m.nameToCodec {
names = append(names, name)
}
return names
}

压缩

Triple 协议支持 gzip 压缩,对编码(protobuf 或者 json)后的数据再进行压缩,为了抽象出压缩和解压缩方法,分别定义了两个接口。

压缩/解压缩接口

Triple 分别定义了 Decompressor 和 Compressor 接口,用于解压缩和压缩。

Decompressor 接口定义了解压缩方法,他继承了 io.Reader:

1
2
3
4
5
6
7
8
9
10
11
12
13
// A Decompressor is a reusable wrapper that decompresses an underlying data
// source. The standard library's [*gzip.Reader] implements Decompressor.
type Decompressor interface {
io.Reader

// Close closes the Decompressor, but not the underlying data source. It may
// return an error if the Decompressor wasn't read to EOF.
Close() error

// Reset discards the Decompressor's internal state, if any, and prepares it
// to read from a new source of compressed data.
Reset(io.Reader) error
}

Compressor 接口定义了压缩方法,他继承了 io.Reader:

1
2
3
4
5
6
7
8
9
10
11
12
13
// A Compressor is a reusable wrapper that compresses data written to an
// underlying sink. The standard library's [*gzip.Writer] implements Compressor.
type Compressor interface {
io.Writer

// Close flushes any buffered data to the underlying sink, then closes the
// Compressor. It must not close the underlying sink.
Close() error

// Reset discards the Compressor's internal state, if any, and prepares it to
// write compressed data to a new sink.
Reset(io.Writer)
}

基于这两个接口,定义 compressionPool,作用是复用压缩/解压缩对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type compressionPool struct {
decompressors sync.Pool
compressors sync.Pool
}

func newCompressionPool(
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) *compressionPool {
if newDecompressor == nil && newCompressor == nil {
return nil
}
return &compressionPool{
decompressors: sync.Pool{
New: func() interface{} { return newDecompressor() },
},
compressors: sync.Pool{
New: func() interface{} { return newCompressor() },
},
}
}

压缩方法字典

与 Codec 接口类似,关于压缩和解压缩,Triple 也定义了一个字典,用于记录自己支持的压缩方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// readOnlyCompressionPools is a read-only interface to a map of named
// compressionPools.
type readOnlyCompressionPools interface {
Get(string) *compressionPool
Contains(string) bool
// Wordy, but clarifies how this is different from readOnlyCodecs.Names().
CommaSeparatedNames() string
}

func newReadOnlyCompressionPools(
nameToPool map[string]*compressionPool,
reversedNames []string,
) readOnlyCompressionPools {
// Client and handler configs keep compression names in registration order,
// but we want the last registered to be the most preferred.
names := make([]string, 0, len(reversedNames))
seen := make(map[string]struct{}, len(reversedNames))
for i := len(reversedNames) - 1; i >= 0; i-- {
name := reversedNames[i]
if _, ok := seen[name]; ok {
continue
}
seen[name] = struct{}{}
names = append(names, name)
}
return &namedCompressionPools{
nameToPool: nameToPool,
commaSeparatedNames: strings.Join(names, ","),
}
}

type namedCompressionPools struct {
nameToPool map[string]*compressionPool
commaSeparatedNames string
}