{{- $replicaCount := int .Values.replicaCount -}} {{- $releaseNamespace := .Release.Namespace -}} {{- $clusterDomain := .Values.clusterDomain -}} {{- $fullname := include "kafka.fullname" . -}} {{- $clientProtocol := include "kafka.listenerType" ( dict "protocol" .Values.auth.clientProtocol ) -}} {{- $servicePort := int .Values.service.port -}} {{- $loadBalancerIPListLength := len .Values.externalAccess.service.loadBalancerIPs -}} {{- if and .Values.externalAccess.enabled (not .Values.externalAccess.autoDiscovery.enabled) (not (eq $replicaCount $loadBalancerIPListLength )) (eq .Values.externalAccess.service.type "LoadBalancer") }} ############################################################################### ### ERROR: You enabled external access to Kafka brokers without specifying ### ### the array of load balancer IPs for Kafka brokers. ### ############################################################################### This deployment will be incomplete until you configure the array of load balancer IPs for Kafka brokers. To complete your deployment follow the steps below: 1. Wait for the load balancer IPs (it may take a few minutes for them to be available): . kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -w . 2. Obtain the load balancer IPs and upgrade your chart: . {{- range $i, $e := until $replicaCount }} LOAD_BALANCER_IP_{{ add $i 1 }}="$(kubectl get svc --namespace {{ $releaseNamespace }} {{ $fullname }}-{{ $i }}-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}')" {{- end }} . 3. Upgrade you chart: . helm upgrade {{ .Release.Name }} bitnami/{{ .Chart.Name }} \ --set replicaCount={{ $replicaCount }} \ --set externalAccess.enabled=true \ {{- range $i, $e := until $replicaCount }} --set externalAccess.service.loadBalancerIPs[{{ $i }}]=$LOAD_BALANCER_IP_{{ add $i 1 }} \ {{- end }} --set externalAccess.service.type=LoadBalancer . {{- else }} {{- if and (or (eq .Values.service.type "LoadBalancer") .Values.externalAccess.enabled) (eq .Values.auth.clientProtocol "plaintext") }} --------------------------------------------------------------------------------------------- WARNING By specifying "serviceType=LoadBalancer" and not configuring the authentication you have most likely exposed the Kafka service externally without any authentication mechanism. For security reasons, we strongly suggest that you switch to "ClusterIP" or "NodePort". As alternative, you can also configure the Kafka authentication. --------------------------------------------------------------------------------------------- {{- end }} ** Please be patient while the chart is being deployed ** Kafka can be accessed by consumers via port {{ $servicePort }} on the following DNS name from within your cluster: {{ $fullname }}.{{ $releaseNamespace }}.svc.{{ $clusterDomain }} Each Kafka broker can be accessed by producers via port {{ $servicePort }} on the following DNS name(s) from within your cluster: {{- range $i, $e := until $replicaCount }} {{ $fullname }}-{{ $i }}.{{ $fullname }}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }} {{- end }} {{- if (include "kafka.client.saslAuthentication" .) }} You need to configure your Kafka client to access using SASL authentication. To do so, you need to create the 'kafka_jaas.conf' and 'client.properties' configuration files with the content below: - kafka_jaas.conf: KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="{{ .Values.auth.jaas.clientUser }}" password="$(kubectl get secret {{ $fullname }}-jaas -n {{ $releaseNamespace }} -o jsonpath='{.data.client-password}' | base64 --decode)"; }; - client.properties: {{- if eq .Values.auth.clientProtocol "sasl_tls" }} ssl.truststore.location=/tmp/kafka.truststore.jks {{- if .Values.auth.jksPassword }} ssl.truststore.password={{ .Values.auth.jksPassword }} {{- end }} {{- end }} security.protocol={{ $clientProtocol }} sasl.mechanism=PLAIN {{- end }} To create a pod that you can use as a Kafka client run the following commands: kubectl run {{ $fullname }}-client --restart='Never' --image {{ template "kafka.image" . }} --namespace {{ $releaseNamespace }} --command -- sleep infinity {{- if (include "kafka.client.saslAuthentication" .) }} kubectl cp --namespace {{ $releaseNamespace }} /path/to/client.properties {{ $fullname }}-client:/tmp/client.properties kubectl cp --namespace {{ $releaseNamespace }} /path/to/kafka_jaas.conf {{ $fullname }}-client:/tmp/kafka_jaas.conf {{- if eq .Values.auth.clientProtocol "sasl_tls" }} kubectl cp --namespace {{ $releaseNamespace }} ./kafka.truststore.jks kafka-client:/tmp/kafka.truststore.jks {{- end }} {{- end }} kubectl exec --tty -i {{ $fullname }}-client --namespace {{ $releaseNamespace }} -- bash {{- if (include "kafka.client.saslAuthentication" .) }} export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf" {{- end }} PRODUCER: kafka-console-producer.sh \ {{ if (include "kafka.client.saslAuthentication" .) }}--producer.config /tmp/client.properties \{{ end }} --broker-list {{ range $i, $e := until $replicaCount }}{{ $fullname }}-{{ $i }}.{{ $fullname }}-headless.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}:{{ $servicePort }},{{ end }} \ --topic test CONSUMER: kafka-console-consumer.sh \ {{ if (include "kafka.client.saslAuthentication" .) }}--consumer.config /tmp/client.properties \{{ end }} --bootstrap-server {{ $fullname }}.{{ $releaseNamespace }}.svc.{{ $clusterDomain }}:{{ .Values.service.port }} \ --topic test \ --from-beginning {{- if .Values.externalAccess.enabled }} To connect to your Kafka server from outside the cluster, follow the instructions below: {{- if eq "NodePort" .Values.externalAccess.service.type }} {{- if .Values.externalAccess.service.domain }} Kafka brokers domain: Use your provided hostname to reach Kafka brokers, {{ .Values.externalAccess.service.domain }} {{- else }} Kafka brokers domain: You can get the external node IP from the Kafka configuration file with the following commands (Check the EXTERNAL listener) 1. Obtain the pod name: kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka" 2. Obtain pod configuration: kubectl exec -it KAFKA_POD -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners {{- end }} Kafka brokers port: You will have a different node port for each Kafka broker. You can get the list of configured node ports using the command below: echo "$(kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].spec.ports[0].nodePort}' | tr ' ' '\n')" {{- else if contains "LoadBalancer" .Values.externalAccess.service.type }} NOTE: It may take a few minutes for the LoadBalancer IPs to be available. Watch the status with: 'kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -w' Kafka Brokers domain: You will have a different external IP for each Kafka broker. You can get the list of external IPs using the command below: echo "$(kubectl get svc --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ template "kafka.name" . }},app.kubernetes.io/instance={{ .Release.Name }},app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].status.loadBalancer.ingress[0].ip}' | tr ' ' '\n')" Kafka Brokers port: {{ .Values.externalAccess.service.port }} {{- end }} {{- end }} {{- end }} {{- include "kafka.checkRollingTags" . }} {{- include "kafka.validateValues" . }}