blob: ef6022486cb7d45374d240164bb9fe1afb8221fc [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "fmt"
5
6 "github.com/rcrowley/go-metrics"
7)
8
9// Encoder is the interface that wraps the basic Encode method.
10// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
11type encoder interface {
12 encode(pe packetEncoder) error
13}
14
15type encoderWithHeader interface {
16 encoder
17 headerVersion() int16
18}
19
20// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
21func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
22 if e == nil {
23 return nil, nil
24 }
25
26 var prepEnc prepEncoder
27 var realEnc realEncoder
28
29 err := e.encode(prepareFlexibleEncoder(&prepEnc, e))
30 if err != nil {
31 return nil, err
32 }
33
34 if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
35 return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
36 }
37
38 realEnc.raw = make([]byte, prepEnc.length)
39 realEnc.registry = metricRegistry
40 err = e.encode(prepareFlexibleEncoder(&realEnc, e))
41 if err != nil {
42 return nil, err
43 }
44
45 return realEnc.raw, nil
46}
47
48// decoder is the interface that wraps the basic Decode method.
49// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
50type decoder interface {
51 decode(pd packetDecoder) error
52}
53
54type versionedDecoder interface {
55 decode(pd packetDecoder, version int16) error
56}
57
58type flexibleVersion interface {
59 isFlexibleVersion(version int16) bool
60 isFlexible() bool
61}
62
63// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
64// interpreted using Kafka's encoding rules.
65func decode(buf []byte, in decoder, metricRegistry metrics.Registry) error {
66 if buf == nil {
67 return nil
68 }
69 helper := realDecoder{
70 raw: buf,
71 registry: metricRegistry,
72 }
73 err := in.decode(&helper)
74 if err != nil {
75 return err
76 }
77
78 if helper.off != len(buf) {
79 return PacketDecodingError{fmt.Sprintf("invalid length: buf=%d decoded=%d %#v", len(buf), helper.off, in)}
80 }
81
82 return nil
83}
84
85func versionedDecode(buf []byte, in versionedDecoder, version int16, metricRegistry metrics.Registry) error {
86 if buf == nil {
87 return nil
88 }
89
90 helper := prepareFlexibleDecoder(&realDecoder{
91 raw: buf,
92 registry: metricRegistry,
93 }, in, version)
94 err := in.decode(helper, version)
95 if err != nil {
96 return err
97 }
98
99 if remaining := helper.remaining(); remaining != 0 {
100 return PacketDecodingError{
101 Info: fmt.Sprintf("invalid length len=%d remaining=%d", len(buf), remaining),
102 }
103 }
104
105 return nil
106}
107
108func prepareFlexibleDecoder(pd *realDecoder, in versionedDecoder, version int16) packetDecoder {
109 if flexibleDecoder, ok := in.(flexibleVersion); ok && flexibleDecoder.isFlexibleVersion(version) {
110 return &realFlexibleDecoder{pd}
111 }
112 return pd
113}
114
115func prepareFlexibleEncoder(pe packetEncoder, req encoder) packetEncoder {
116 if flexibleEncoder, ok := req.(flexibleVersion); ok && flexibleEncoder.isFlexible() {
117 switch e := pe.(type) {
118 case *prepEncoder:
119 return &prepFlexibleEncoder{e}
120 case *realEncoder:
121 return &realFlexibleEncoder{e}
122 default:
123 return pe
124 }
125 }
126 return pe
127}
128
129func downgradeFlexibleDecoder(pd packetDecoder) packetDecoder {
130 if f, ok := pd.(*realFlexibleDecoder); ok {
131 return f.realDecoder
132 }
133 return pd
134}