blob: 93ea9218bdaa7fba493d19fe27eaf074c3f99da1 [file] [log] [blame]
Abhay Kumar40252eb2025-10-13 13:25:53 +00001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "fmt"
7 "io"
8 "math"
9 "net"
10 "strings"
11 "time"
12
13 "github.com/jcmturner/gofork/encoding/asn1"
14 "github.com/jcmturner/gokrb5/v8/asn1tools"
15 "github.com/jcmturner/gokrb5/v8/gssapi"
16 "github.com/jcmturner/gokrb5/v8/iana/chksumtype"
17 "github.com/jcmturner/gokrb5/v8/iana/keyusage"
18 "github.com/jcmturner/gokrb5/v8/messages"
19 "github.com/jcmturner/gokrb5/v8/types"
20)
21
22const (
23 TOK_ID_KRB_AP_REQ = 256
24 GSS_API_GENERIC_TAG = 0x60
25 KRB5_USER_AUTH = 1
26 KRB5_KEYTAB_AUTH = 2
27 KRB5_CCACHE_AUTH = 3
28 GSS_API_INITIAL = 1
29 GSS_API_VERIFY = 2
30 GSS_API_FINISH = 3
31)
32
33type GSSAPIConfig struct {
34 AuthType int
35 KeyTabPath string
36 CCachePath string
37 KerberosConfigPath string
38 ServiceName string
39 Username string
40 Password string
41 Realm string
42 DisablePAFXFAST bool
43 BuildSpn BuildSpnFunc
44}
45
46type GSSAPIKerberosAuth struct {
47 Config *GSSAPIConfig
48 ticket messages.Ticket
49 encKey types.EncryptionKey
50 NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
51 step int
52}
53
54type KerberosClient interface {
55 Login() error
56 GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
57 Domain() string
58 CName() types.PrincipalName
59 Destroy()
60}
61
62type BuildSpnFunc func(serviceName, host string) string
63
64// writePackage appends length in big endian before the payload, and sends it to kafka
65func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
66 length := uint64(len(payload))
67 size := length + 4 // 4 byte length header + payload
68 if size > math.MaxInt32 {
69 return 0, errors.New("payload too large, will overflow int32")
70 }
71 finalPackage := make([]byte, size)
72 copy(finalPackage[4:], payload)
73 binary.BigEndian.PutUint32(finalPackage, uint32(length))
74 bytes, err := broker.conn.Write(finalPackage)
75 if err != nil {
76 return bytes, err
77 }
78 return bytes, nil
79}
80
81// readPackage reads payload length (4 bytes) and then reads the payload into []byte
82func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
83 bytesRead := 0
84 lengthInBytes := make([]byte, 4)
85 bytes, err := io.ReadFull(broker.conn, lengthInBytes)
86 if err != nil {
87 return nil, bytesRead, err
88 }
89 bytesRead += bytes
90 payloadLength := binary.BigEndian.Uint32(lengthInBytes)
91 payloadBytes := make([]byte, payloadLength) // buffer for read..
92 bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
93 if err != nil {
94 return payloadBytes, bytesRead, err
95 }
96 bytesRead += bytes
97 return payloadBytes, bytesRead, nil
98}
99
100func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
101 a := make([]byte, 24)
102 flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
103 binary.LittleEndian.PutUint32(a[:4], 16)
104 for _, i := range flags {
105 f := binary.LittleEndian.Uint32(a[20:24])
106 f |= uint32(i)
107 binary.LittleEndian.PutUint32(a[20:24], f)
108 }
109 return a
110}
111
112// Construct Kerberos AP_REQ package, conforming to RFC-4120
113// https://tools.ietf.org/html/rfc4120#page-84
114func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
115 domain string,
116 cname types.PrincipalName,
117 ticket messages.Ticket,
118 sessionKey types.EncryptionKey,
119) ([]byte, error) {
120 auth, err := types.NewAuthenticator(domain, cname)
121 if err != nil {
122 return nil, err
123 }
124 auth.Cksum = types.Checksum{
125 CksumType: chksumtype.GSSAPI,
126 Checksum: krbAuth.newAuthenticatorChecksum(),
127 }
128 APReq, err := messages.NewAPReq(
129 ticket,
130 sessionKey,
131 auth,
132 )
133 if err != nil {
134 return nil, err
135 }
136 aprBytes := make([]byte, 2)
137 binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
138 tb, err := APReq.Marshal()
139 if err != nil {
140 return nil, err
141 }
142 aprBytes = append(aprBytes, tb...)
143 return aprBytes, nil
144}
145
146// Append the GSS-API header to the payload, conforming to RFC-2743
147// Section 3.1, Mechanism-Independent Token Format
148//
149// https://tools.ietf.org/html/rfc2743#page-81
150//
151// GSSAPIHeader + <specific mechanism payload>
152func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
153 oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
154 if err != nil {
155 return nil, err
156 }
157 tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
158 GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
159 GSSHeader = append(GSSHeader, oidBytes...)
160 GSSPackage := append(GSSHeader, payload...)
161 return GSSPackage, nil
162}
163
164func (krbAuth *GSSAPIKerberosAuth) initSecContext(
165 client KerberosClient,
166 bytes []byte,
167) ([]byte, error) {
168 switch krbAuth.step {
169 case GSS_API_INITIAL:
170 aprBytes, err := krbAuth.createKrb5Token(
171 client.Domain(),
172 client.CName(),
173 krbAuth.ticket,
174 krbAuth.encKey)
175 if err != nil {
176 return nil, err
177 }
178 krbAuth.step = GSS_API_VERIFY
179 return krbAuth.appendGSSAPIHeader(aprBytes)
180 case GSS_API_VERIFY:
181 wrapTokenReq := gssapi.WrapToken{}
182 if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
183 return nil, err
184 }
185 // Validate response.
186 isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
187 if !isValid {
188 return nil, err
189 }
190
191 wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
192 if err != nil {
193 return nil, err
194 }
195 krbAuth.step = GSS_API_FINISH
196 return wrapTokenResponse.Marshal()
197 }
198 return nil, nil
199}
200
201func (krbAuth *GSSAPIKerberosAuth) spn(broker *Broker) string {
202 host, _, _ := net.SplitHostPort(broker.addr)
203 var spn string
204 if krbAuth.Config.BuildSpn != nil {
205 spn = krbAuth.Config.BuildSpn(broker.conf.Net.SASL.GSSAPI.ServiceName, host)
206 } else {
207 spn = fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
208 }
209 return spn
210}
211
212// Login will use the given KerberosClient to login and get a ticket for the given spn.
213func (krbAuth *GSSAPIKerberosAuth) Login(
214 client KerberosClient,
215 spn string,
216) (*messages.Ticket, error) {
217 if err := client.Login(); err != nil {
218 Logger.Printf("Kerberos client login error: %s", err)
219 return nil, err
220 }
221
222 ticket, encKey, err := client.GetServiceTicket(spn)
223 if err != nil {
224 Logger.Printf("Kerberos service ticket error for %s: %s", spn, err)
225 return nil, err
226 }
227 krbAuth.ticket = ticket
228 krbAuth.encKey = encKey
229 krbAuth.step = GSS_API_INITIAL
230
231 return &ticket, nil
232}
233
234// Authorize performs the kerberos auth handshake for authorization
235func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
236 client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
237 if err != nil {
238 Logger.Printf("Kerberos client initialization error: %s", err)
239 return err
240 }
241 defer client.Destroy()
242
243 ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
244 if err != nil {
245 return err
246 }
247
248 principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
249 var receivedBytes []byte
250
251 for {
252 packBytes, err := krbAuth.initSecContext(client, receivedBytes)
253 if err != nil {
254 Logger.Printf("Kerberos init error as %s: %s", principal, err)
255 return err
256 }
257
258 requestTime := time.Now()
259 bytesWritten, err := krbAuth.writePackage(broker, packBytes)
260 if err != nil {
261 Logger.Printf("Kerberos write error as %s: %s", principal, err)
262 return err
263 }
264 broker.updateOutgoingCommunicationMetrics(bytesWritten)
265
266 switch krbAuth.step {
267 case GSS_API_VERIFY:
268 var bytesRead int
269 receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
270 requestLatency := time.Since(requestTime)
271 broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
272 if err != nil {
273 Logger.Printf("Kerberos read error as %s: %s", principal, err)
274 return err
275 }
276 case GSS_API_FINISH:
277 return nil
278 }
279 }
280}
281
282// AuthorizeV2 performs the SASL v2 GSSAPI authentication with the Kafka broker.
283func (krbAuth *GSSAPIKerberosAuth) AuthorizeV2(
284 broker *Broker,
285 authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error),
286) error {
287 client, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
288 if err != nil {
289 Logger.Printf("Kerberos client initialization error: %s", err)
290 return err
291 }
292 defer client.Destroy()
293
294 ticket, err := krbAuth.Login(client, krbAuth.spn(broker))
295 if err != nil {
296 return err
297 }
298
299 principal := strings.Join(ticket.SName.NameString, "/") + "@" + ticket.Realm
300 var receivedBytes []byte
301
302 for {
303 token, err := krbAuth.initSecContext(client, receivedBytes)
304 if err != nil {
305 Logger.Printf("SASL Kerberos init error as %s: %s", principal, err)
306 return err
307 }
308
309 authResponse, err := authSendReceiver(token)
310 if err != nil {
311 Logger.Printf("SASL Kerberos authenticate error as %s: %s", principal, err)
312 return err
313 }
314
315 receivedBytes = authResponse.SaslAuthBytes
316
317 if krbAuth.step == GSS_API_FINISH {
318 return nil
319 }
320 }
321}