apiVersion: v1 kind: ConfigMap metadata: name: {{ template "kafka.fullname" . }}-scripts labels: {{- include "common.labels.standard" . | nindent 4 }} {{- if .Values.commonLabels }} {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} {{- end }} {{- if .Values.commonAnnotations }} annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} {{- end }} data: {{- $fullname := include "kafka.fullname" . }} {{- $releaseNamespace := .Release.Namespace }} {{- $clusterDomain := .Values.clusterDomain }} {{- $interBrokerPort := .Values.service.internalPort }} {{- $clientPort := .Values.service.port }} {{- $jksTruststoreSecret := coalesce .Values.auth.tls.jksTruststoreSecret .Values.auth.jksTruststoreSecret -}} {{- $jksTruststore := coalesce .Values.auth.tls.jksTruststore .Values.auth.jksTruststore -}} {{- $jksKeystoreSAN := coalesce .Values.auth.tls.jksKeystoreSAN .Values.auth.jksKeystoreSAN -}} {{- if .Values.externalAccess.autoDiscovery.enabled }} auto-discovery.sh: |- #!/bin/bash SVC_NAME="${MY_POD_NAME}-external" {{- if eq .Values.externalAccess.service.type "LoadBalancer" }} # Auxiliary functions retry_while() { local -r cmd="${1:?cmd is missing}" local -r retries="${2:-12}" local -r sleep_time="${3:-5}" local return_value=1 read -r -a command <<< "$cmd" for ((i = 1 ; i <= retries ; i+=1 )); do "${command[@]}" && return_value=0 && break sleep "$sleep_time" done return $return_value } k8s_svc_lb_ip() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} local service_ip=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].ip}") local service_hostname=$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.status.loadBalancer.ingress[0].hostname}") if [[ -n ${service_ip} ]]; then echo "${service_ip}" else echo "${service_hostname}" fi } k8s_svc_lb_ip_ready() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} [[ -n "$(k8s_svc_lb_ip "$namespace" "$service")" ]] } # Wait until LoadBalancer IP is ready retry_while "k8s_svc_lb_ip_ready {{ $releaseNamespace }} $SVC_NAME" || exit 1 # Obtain LoadBalancer external IP k8s_svc_lb_ip "{{ $releaseNamespace }}" "$SVC_NAME" | tee "$SHARED_FILE" {{- else if eq .Values.externalAccess.service.type "NodePort" }} k8s_svc_node_port() { local namespace=${1:?namespace is missing} local service=${2:?service is missing} local index=${3:-0} local node_port="$(kubectl get svc "$service" -n "$namespace" -o jsonpath="{.spec.ports[$index].nodePort}")" echo "$node_port" } k8s_svc_node_port "{{ $releaseNamespace }}" "$SVC_NAME" | tee "$SHARED_FILE" {{- end }} {{- end }} setup.sh: |- #!/bin/bash ID="${MY_POD_NAME#"{{ $fullname }}-"}" if [[ -f "/bitnami/kafka/data/meta.properties" ]]; then export KAFKA_CFG_BROKER_ID="$(grep "broker.id" /bitnami/kafka/data/meta.properties | awk -F '=' '{print $2}')" else export KAFKA_CFG_BROKER_ID="$((ID + {{ .Values.minBrokerId }}))" fi {{- if .Values.externalAccess.enabled }} # Configure external ip and port {{- if eq .Values.externalAccess.service.type "LoadBalancer" }} {{- if .Values.externalAccess.autoDiscovery.enabled }} export EXTERNAL_ACCESS_IP="$(<${SHARED_FILE})" {{- else }} export EXTERNAL_ACCESS_IP=$(echo '{{ .Values.externalAccess.service.loadBalancerIPs }}' | tr -d '[]' | cut -d ' ' -f "$(($ID + 1))") {{- end }} export EXTERNAL_ACCESS_PORT={{ .Values.externalAccess.service.port }} {{- else if eq .Values.externalAccess.service.type "NodePort" }} {{- if or .Values.externalAccess.service.useHostIPs .Values.externalAccess.autoDiscovery.enabled }} export EXTERNAL_ACCESS_IP="${HOST_IP}" {{- else if .Values.externalAccess.service.domain }} export EXTERNAL_ACCESS_IP={{ .Values.externalAccess.service.domain }} {{- else }} export EXTERNAL_ACCESS_IP=$(curl -s https://ipinfo.io/ip) {{- end }} {{- if .Values.externalAccess.autoDiscovery.enabled }} export EXTERNAL_ACCESS_PORT="$(<${SHARED_FILE})" {{- else }} export EXTERNAL_ACCESS_PORT=$(echo '{{ .Values.externalAccess.service.nodePorts }}' | tr -d '[]' | cut -d ' ' -f "$(($ID + 1))") {{- end }} {{- end }} # Configure Kafka advertised listeners {{- if .Values.advertisedListeners }} export KAFKA_CFG_ADVERTISED_LISTENERS={{ join "," .Values.advertisedListeners }} {{- else }} export KAFKA_CFG_ADVERTISED_LISTENERS="INTERNAL://${MY_POD_NAME}.{{ $fullname }}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}:{{ $interBrokerPort }},CLIENT://${MY_POD_NAME}.{{ $fullname }}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}:{{ $clientPort }},EXTERNAL://${EXTERNAL_ACCESS_IP}:${EXTERNAL_ACCESS_PORT}" {{- end }} {{- end }} {{- if (include "kafka.tlsEncryption" .) }} mkdir -p /opt/bitnami/kafka/config/certs {{- if eq .Values.auth.tls.type "jks" }} JKS_TRUSTSTORE={{ printf "/%s/%s" (ternary "certs" "truststore" (empty $jksTruststoreSecret)) (default "kafka.truststore.jks" $jksTruststore) | quote }} JKS_KEYSTORE={{ printf "/certs/%s" (default "kafka-${ID}.keystore.jks" $jksKeystoreSAN) | quote }} if [[ -f "$JKS_TRUSTSTORE" ]] && [[ -f "$JKS_KEYSTORE" ]]; then cp "$JKS_TRUSTSTORE" "/opt/bitnami/kafka/config/certs/kafka.truststore.jks" cp "$JKS_KEYSTORE" "/opt/bitnami/kafka/config/certs/kafka.keystore.jks" else echo "Couldn't find the expected Java Key Stores (JKS) files! They are mandatory when encryption via TLS is enabled." exit 1 fi {{- else if eq .Values.auth.tls.type "pem" }} PEM_CA="/certs/kafka.truststore.pem" PEM_CERT="/certs/kafka-${ID}.keystore.pem" PEM_KEY="/certs/kafka-${ID}.keystore.key" if [[ -f "$PEM_CA" ]] && [[ -f "$PEM_CERT" ]] && [[ -f "$PEM_KEY" ]]; then cp "$PEM_CA" "/opt/bitnami/kafka/config/certs/kafka.truststore.pem" cp "$PEM_CERT" "/opt/bitnami/kafka/config/certs/kafka.keystore.pem" # Ensure the key used PEM format with PKCS#8 openssl pkcs8 -topk8 -nocrypt -in "$PEM_KEY" > "/opt/bitnami/kafka/config/certs/kafka.keystore.key" else echo "Couldn't find the expected PEM files! They are mandatory when encryption via TLS is enabled." exit 1 fi {{- end }} {{- end }} exec /entrypoint.sh /run.sh