123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- {{- $replicaCount := int .Values.replicaCount -}}
- {{- $releaseNamespace := .Release.Namespace -}}
- {{- $clusterDomain := .Values.clusterDomain -}}
- {{- $fullname := include "kafka.fullname" . -}}
- {{- $clientProtocol := include "kafka.listenerType" (dict "protocol" .Values.auth.clientProtocol) -}}
- {{- $saslMechanisms := coalesce .Values.auth.sasl.mechanisms .Values.auth.saslMechanisms -}}
- {{- $tlsEndpointIdentificationAlgorithm := default "" (coalesce .Values.auth.tls.endpointIdentificationAlgorithm .Values.auth.tlsEndpointIdentificationAlgorithm) -}}
- {{- $tlsPassword := coalesce .Values.auth.tls.password .Values.auth.jksPassword -}}
- {{- $servicePort := int .Values.service.port -}}
- {{- $loadBalancerIPListLength := len .Values.externalAccess.service.loadBalancerIPs -}}
- {{- if and .Values.externalAccess.enabled (not .Values.externalAccess.autoDiscovery.enabled) (not (eq $replicaCount $loadBalancerIPListLength )) (eq .Values.externalAccess.service.type "LoadBalancer") }}
- ###############################################################################
- ### ERROR: You enabled external access to Kafka brokers without specifying ###
- ### the array of load balancer IPs for Kafka brokers. ###
- ###############################################################################
- This deployment will be incomplete until you configure the array of load balancer
- IPs for Kafka brokers. To complete your deployment follow the steps below:
- 1. Wait for the load balancer IPs (it may take a few minutes for them to be available):
- kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -w
- 2. Obtain the load balancer IPs and upgrade your chart:
- {{- range $i, $e := until $replicaCount }}
- LOAD_BALANCER_IP_{{ add $i 1 }}="$(kubectl get svc --namespace {{ $releaseNamespace }} {{ $fullname }}-{{ $i }}-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}')"
- {{- end }}
- 3. Upgrade you chart:
- helm upgrade --namespace {{ .Release.Namespace }} {{ .Release.Name }} bitnami/{{ .Chart.Name }} \
- --set replicaCount={{ $replicaCount }} \
- --set externalAccess.enabled=true \
- {{- range $i, $e := until $replicaCount }}
- --set externalAccess.service.loadBalancerIPs[{{ $i }}]=$LOAD_BALANCER_IP_{{ add $i 1 }} \
- {{- end }}
- --set externalAccess.service.type=LoadBalancer
- {{- else }}
- {{- if and (or (eq .Values.service.type "LoadBalancer") .Values.externalAccess.enabled) (eq $clientProtocol "PLAINTEXT") }}
- ---------------------------------------------------------------------------------------------
- WARNING
- By specifying "serviceType=LoadBalancer" and not configuring the authentication
- you have most likely exposed the Kafka service externally without any
- authentication mechanism.
- For security reasons, we strongly suggest that you switch to "ClusterIP" or
- "NodePort". As alternative, you can also configure the Kafka authentication.
- ---------------------------------------------------------------------------------------------
- {{- end }}
- ** Please be patient while the chart is being deployed **
- Kafka can be accessed by consumers via port {{ $servicePort }} on the following DNS name from within your cluster:
- {{ $fullname }}.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}
- Each Kafka broker can be accessed by producers via port {{ $servicePort }} on the following DNS name(s) from within your cluster:
- {{- $brokerList := list }}
- {{- range $e, $i := until $replicaCount }}
- {{- $brokerList = append $brokerList (printf "%s-%d.%s-headless.%s.svc.%s:%d" $fullname $i $fullname $releaseNamespace $clusterDomain $servicePort) }}
- {{- end }}
- {{ join "\n" $brokerList | nindent 4 }}
- {{- if (include "kafka.client.saslAuthentication" .) }}
- You need to configure your Kafka client to access using SASL authentication. To do so, you need to create the 'kafka_jaas.conf' and 'client.properties' configuration files with the content below:
- - kafka_jaas.conf:
- KafkaClient {
- {{- if $saslMechanisms | regexFind "scram" }}
- org.apache.kafka.common.security.scram.ScramLoginModule required
- {{- else }}
- org.apache.kafka.common.security.plain.PlainLoginModule required
- {{- end }}
- username="{{ index (coalesce .Values.auth.sasl.jaas.clientUsers .Values.auth.jaas.clientUsers) 0 }}"
- password="$(kubectl get secret {{ $fullname }}-jaas --namespace {{ $releaseNamespace }} -o jsonpath='{.data.client-passwords}' | base64 --decode | cut -d , -f 1)";
- };
- - client.properties:
- security.protocol={{ $clientProtocol }}
- {{- if $saslMechanisms | regexFind "scram-sha-256" }}
- sasl.mechanism=SCRAM-SHA-256
- {{- else if $saslMechanisms | regexFind "scram-sha-512" }}
- sasl.mechanism=SCRAM-SHA-512
- {{- else }}
- sasl.mechanism=PLAIN
- {{- end }}
- {{- if eq $clientProtocol "SASL_SSL" }}
- ssl.truststore.type={{ upper .Values.auth.tls.type }}
- {{- if eq .Values.auth.tls.type "jks" }}
- ssl.truststore.location=/tmp/kafka.truststore.jks
- {{- if not (empty $tlsPassword) }}
- ssl.truststore.password={{ $tlsPassword }}
- {{- end }}
- {{- else if eq .Values.auth.tls.type "pem" }}
- ssl.truststore.certificates=-----BEGIN CERTIFICATE----- \
- ... \
- -----END CERTIFICATE-----
- {{- end }}
- {{- if eq $tlsEndpointIdentificationAlgorithm "" }}
- ssl.endpoint.identification.algorithm=
- {{- end }}
- {{- end }}
- {{- else if (include "kafka.client.tlsEncryption" .) }}
- You need to configure your Kafka client to access using TLS authentication. To do so, you need to create the 'client.properties' configuration file with the content below:
- security.protocol={{ $clientProtocol }}
- ssl.truststore.type={{ upper .Values.auth.tls.type }}
- {{- if eq .Values.auth.tls.type "jks" }}
- ssl.truststore.location=/tmp/kafka.truststore.{{ .Values.auth.tls.type }}
- {{- if not (empty $tlsPassword) }}
- ssl.truststore.password={{ $tlsPassword }}
- {{- end }}
- {{- else if eq .Values.auth.tls.type "pem" }}
- ssl.truststore.certificates=-----BEGIN CERTIFICATE----- \
- ... \
- -----END CERTIFICATE-----
- {{- end }}
- {{- if eq .Values.auth.clientProtocol "mtls" }}
- ssl.keystore.type={{ upper .Values.auth.tls.type }}
- {{- if eq .Values.auth.tls.type "jks" }}
- ssl.keystore.location=/tmp/client.keystore.jks
- {{- if not (empty $tlsPassword) }}
- ssl.keystore.password={{ $tlsPassword }}
- {{- end }}
- {{- else if eq .Values.auth.tls.type "pem" }}
- ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- \
- ... \
- -----END CERTIFICATE-----
- ssl.keystore.key=-----BEGIN ENCRYPTED PRIVATE KEY----- \
- ... \
- -----END ENCRYPTED PRIVATE KEY-----
- {{- end }}
- {{- end }}
- {{- if eq $tlsEndpointIdentificationAlgorithm "" }}
- ssl.endpoint.identification.algorithm=
- {{- end }}
- {{- end }}
- To create a pod that you can use as a Kafka client run the following commands:
- kubectl run {{ $fullname }}-client --restart='Never' --image {{ template "kafka.image" . }} --namespace {{ $releaseNamespace }} --command -- sleep infinity
- {{- if or (include "kafka.client.saslAuthentication" .) (include "kafka.client.tlsEncryption" .) }}
- kubectl cp --namespace {{ $releaseNamespace }} /path/to/client.properties {{ $fullname }}-client:/tmp/client.properties
- {{- end }}
- {{- if (include "kafka.client.saslAuthentication" .) }}
- kubectl cp --namespace {{ $releaseNamespace }} /path/to/kafka_jaas.conf {{ $fullname }}-client:/tmp/kafka_jaas.conf
- {{- end }}
- {{- if and (include "kafka.client.tlsEncryption" .) (eq .Values.auth.tls.type "jks") }}
- kubectl cp --namespace {{ $releaseNamespace }} ./kafka.truststore.jks {{ $fullname }}-client:/tmp/kafka.truststore.jks
- {{- if eq .Values.auth.clientProtocol "mtls" }}
- kubectl cp --namespace {{ $releaseNamespace }} ./client.keystore.jks {{ $fullname }}-client:/tmp/client.keystore.jks
- {{- end }}
- {{- end }}
- kubectl exec --tty -i {{ $fullname }}-client --namespace {{ $releaseNamespace }} -- bash
- {{- if (include "kafka.client.saslAuthentication" .) }}
- export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"
- {{- end }}
- PRODUCER:
- kafka-console-producer.sh \
- {{ if or (include "kafka.client.saslAuthentication" .) (include "kafka.client.tlsEncryption" .) }}--producer.config /tmp/client.properties \{{ end }}
- --broker-list {{ join "," $brokerList }} \
- --topic test
- CONSUMER:
- kafka-console-consumer.sh \
- {{ if or (include "kafka.client.saslAuthentication" .) (include "kafka.client.tlsEncryption" .) }}--consumer.config /tmp/client.properties \{{ end }}
- --bootstrap-server {{ $fullname }}.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}:{{ .Values.service.port }} \
- --topic test \
- --from-beginning
- {{- if .Values.externalAccess.enabled }}
- To connect to your Kafka server from outside the cluster, follow the instructions below:
- {{- if eq "NodePort" .Values.externalAccess.service.type }}
- {{- if .Values.externalAccess.service.domain }}
- Kafka brokers domain: Use your provided hostname to reach Kafka brokers, {{ .Values.externalAccess.service.domain }}
- {{- else }}
- Kafka brokers domain: You can get the external node IP from the Kafka configuration file with the following commands (Check the EXTERNAL listener)
- 1. Obtain the pod name:
- kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka"
- 2. Obtain pod configuration:
- kubectl exec -it KAFKA_POD -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners
- {{- end }}
- Kafka brokers port: You will have a different node port for each Kafka broker. You can get the list of configured node ports using the command below:
- echo "$(kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].spec.ports[0].nodePort}' | tr ' ' '\n')"
- {{- else if contains "LoadBalancer" .Values.externalAccess.service.type }}
- NOTE: It may take a few minutes for the LoadBalancer IPs to be available.
- Watch the status with: 'kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -w'
- Kafka Brokers domain: You will have a different external IP for each Kafka broker. You can get the list of external IPs using the command below:
- echo "$(kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')"
- Kafka Brokers port: {{ .Values.externalAccess.service.port }}
- {{- end }}
- {{- end }}
- {{- end }}
- {{- include "common.warnings.rollingTag" .Values.image }}
- {{- include "common.warnings.rollingTag" .Values.externalAccess.autoDiscovery.image }}
- {{- include "common.warnings.rollingTag" .Values.metrics.kafka.image }}
- {{- include "common.warnings.rollingTag" .Values.metrics.jmx.image }}
- {{- include "kafka.validateValues" . }}
|