How to use two Kerberos keytabs (for Kafka and Hadoop HDFS) from a Flink job on a Flink standalone cluster? How to use two Kerberos keytabs (for Kafka and Hadoop HDFS) from a Flink job on a Flink standalone cluster? hadoop hadoop

How to use two Kerberos keytabs (for Kafka and Hadoop HDFS) from a Flink job on a Flink standalone cluster?


Based on the answer and comment on this very similar question

It appears there is no clear way to use two credentials in a single Flink job.

Promising approaches or workarounds:

  • Creating a trust
  • Co-installing Kafka and HDFS on the same platform
  • Using something else to bridge the gap

An example of the last point:

You can use something like NiFi or Streams Replication Manager to bring the data from the source Kafka, to the Kafka in your cluster. NiFi is more modular, and it is possible to configure the kerberos credentials for each step. Afterward you are in a single context which Flink can handle.

Full disclosure: I am an employee of Cloudera, a driving force behind NiFi, Kafka, HDFS, Streams Replication Manager and since recently Flink


After three years from my initial post, our architecture has moved from standalone bare metal server to Docker container on Mesos, but let me summarize the workaround (for Flink 1.8):

  • Place krb5.conf with all realm definitions and domain-realm mappings (for example under /etc/ of the container)

  • Place Hadoop krb5.keytab (for example under /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/)

  • Configure Flink's security.kerberos.login.* properties in flink-conf.yaml

    • security.kerberos.login.use-ticket-cache: true
    • security.kerberos.login.principal: username@HADOOP_CLUSTER.ORG.EXAMPLE.COM
    • security.kerberos.login.contexts should not be configured. This ensures that Flink does not use Hadoop’s credentials for Kafka and Zookeeper.
  • Copy keytabs for Kafka into separate directories inside the container (for example under /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/)

  • Periodically run custom script to renew ticket cache

KINIT_COMMAND_1='kinit -kt /kerberos/HADOOP_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@HADOOP_CLUSTER.ORG.EXAMPLE.COM'KINIT_COMMAND_2='kinit -kt /kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab username@KAFKA_CLUSTER.ORG.EXAMPLE.COM -c /tmp/krb5cc_kafka'...
  • Set the property sasl.jaas.config when instantiating each FlinkKafkaConsumer to the actual JAAS configuration string.
    • To bypass the global JAAS configuration. If we set this globally, we can’t use different Kafka instances with different credentials, or unsecured Kafka together with secured Kafka.
props.setProperty("sasl.jaas.config",     "com.sun.security.auth.module.Krb5LoginModule required " +    "refreshKrb5Config=true " +    "useKeyTab=true " +    "storeKey=true " +    "debug=true " +    "keyTab=\"/kerberos/KAFKA_CLUSTER.ORG.EXAMPLE.COM/krb5.keytab\" " +    "principal=\"username@KAFKA_CLUSTER.ORG.EXAMPLE.COM\";")