blob: 5bc211294426edecd206a5e111f5b0fc78711242 [file] [log] [blame]
Abhay Kumara2ae5992025-11-10 14:02:24 +00001package sarama
2
3import "errors"
4
5type MessageBlock struct {
6 Offset int64
7 Msg *Message
8}
9
10// Messages convenience helper which returns either all the
11// messages that are wrapped in this block
12func (msb *MessageBlock) Messages() []*MessageBlock {
13 if msb.Msg.Set != nil {
14 return msb.Msg.Set.Messages
15 }
16 return []*MessageBlock{msb}
17}
18
19func (msb *MessageBlock) encode(pe packetEncoder) error {
20 pe.putInt64(msb.Offset)
21 pe.push(&lengthField{})
22 err := msb.Msg.encode(pe)
23 if err != nil {
24 return err
25 }
26 return pe.pop()
27}
28
29func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
30 if msb.Offset, err = pd.getInt64(); err != nil {
31 return err
32 }
33
34 lengthDecoder := acquireLengthField()
35 defer releaseLengthField(lengthDecoder)
36
37 if err = pd.push(lengthDecoder); err != nil {
38 return err
39 }
40
41 msb.Msg = new(Message)
42 if err = msb.Msg.decode(pd); err != nil {
43 return err
44 }
45
46 if err = pd.pop(); err != nil {
47 return err
48 }
49
50 return nil
51}
52
53type MessageSet struct {
54 PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
55 OverflowMessage bool // whether the set on the wire contained an overflow message
56 Messages []*MessageBlock
57}
58
59func (ms *MessageSet) encode(pe packetEncoder) error {
60 for i := range ms.Messages {
61 err := ms.Messages[i].encode(pe)
62 if err != nil {
63 return err
64 }
65 }
66 return nil
67}
68
69func (ms *MessageSet) decode(pd packetDecoder) (err error) {
70 ms.Messages = nil
71
72 for pd.remaining() > 0 {
73 magic, err := magicValue(pd)
74 if err != nil {
75 if errors.Is(err, ErrInsufficientData) {
76 ms.PartialTrailingMessage = true
77 return nil
78 }
79 return err
80 }
81
82 if magic > 1 {
83 return nil
84 }
85
86 msb := new(MessageBlock)
87 err = msb.decode(pd)
88 if err == nil {
89 ms.Messages = append(ms.Messages, msb)
90 } else if errors.Is(err, ErrInsufficientData) {
91 // As an optimization the server is allowed to return a partial message at the
92 // end of the message set. Clients should handle this case. So we just ignore such things.
93 if msb.Offset == -1 {
94 // This is an overflow message caused by chunked down conversion
95 ms.OverflowMessage = true
96 } else {
97 ms.PartialTrailingMessage = true
98 }
99 return nil
100 } else {
101 return err
102 }
103 }
104
105 return nil
106}
107
108func (ms *MessageSet) addMessage(msg *Message) {
109 block := new(MessageBlock)
110 block.Msg = msg
111 ms.Messages = append(ms.Messages, block)
112}