kafkasasl的简单介绍

本篇文章给大家谈谈kafkasasl,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Flink消费集成kerberos认证的纳物kafka集群时,需要做一些配置才可以正常执行。

    Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone

    //指示是否从 Kerberos ticket 缓存中读取

  洞迟液  security.kerberos.login.use-ticket-cache: false1

   //Kerberos 密钥表文件的绝对路径

    security.kerberos.login.keytab: /data/home/keytab/flink.keytab

   //认证主体名称

    security.kerberos.login.principal: flink@data.com

    //Kerberos登陆contexts

    security.kerberos.login.contexts: Client,KafkaClient

  val properties: Properties =new Properties()

  properties.setProperty("bootstrap.servers","broker:9092")

  properties.setProperty("group.id","testKafka")

  properties.setProperty("security.protocol","SASL_PLAINTEXT")

  properties.setProperty("sasl.mechanism","GSSAPI")

  properties.setProperty("sasl.kerberos.service.name","kafka")

  consumer =new   FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)

    参数说明 :security.protocol 

    运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配旦陵置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。

Kafka身份认证与权限控制配置

编辑原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties

listeners=SASL_PLAINTEXT://192.168.43.209:9092

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

allow.everyone.if.no.acl.found=true

super.users=User:root

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf

KafkaServer{

       org.apache.kafka.common.security.plain.PlainLoginModule required

        username="kafka"

        password="kafkapswd"

        user_ kafkaa(用户名)="kafkaapswd"(密码)

        user_ kafkab(用户名冲租)=" kafkabpswd"(密码)

user_ kafkac(用户名)=" kafkacpswd"(密码)

user_ kafkad(用户名)=" kafkadpswd"(密码);

};

修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/空判拦home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"

fi

修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/斗胡home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'

if ["x$DAEMON_MODE" = "xtrue" ]; then

  nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS  $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" "$CONSOLE_OUTPUT_FILE" 21 /dev/null

else

  exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"

fi

创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf

KafkaClient{

       org.apache.kafka.common.security.plain.PlainLoginModule required

        username=" kafkaa"

        password=" kafkaapswd";

};

修改执行文件

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"

fi

运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件

在程序中添加如下配置

System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");

props.put("security.protocol","SASL_PLAINTEXT");

props.put("sasl.mechanism","PLAIN");

问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :{test2=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。

[img]

KAFKA SASL

适用于kafka_2.11-1.1.1版本

第1步

将kafka_client_jaas.conf/kafka_server_jaas.conf/kafka_zoo_jaas.conf三个文件放入kafka的config文件夹中,文件中配置用户,admin用户必须配置。

kafka_client_jaas.conf内容如下

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

kafka_server_jaas.conf内容如哗腊下

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin"

user_test="test#2018";

};

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

Client {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

kafka_zoo_jaas.conf内容如下

ZKServer{

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin";

};

第2步

修改kafka的bin文件夹中乱誉滑的zookeeper-server-start.sh,

添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/application/kafka_2.11-1.1.1/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"

第3步

修改kafka的bin文件夹中的kafka-server-start.sh,

添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/application/kafka_2.11-1.1.1/config/kafka_server_jaas.conf"

第4步

修改kafka的bin文件夹中的kafka-console-producer.sh

添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/application/kafka_2.11-1.1.1/config/kafka_client_jaas.conf"

第5步

修改kafka的bin文件夹中的kafka-console-consumer.sh

添加:

export KAFKA_OPTS="虚腊 -Djava.security.auth.login.config=/data/application/kafka_2.11-1.1.1/config/kafka_client_jaas.conf"

第6步

修改kafka的config文件夹中的consumer.properties

添加:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

第7步

修改kafka的config文件夹中的producer.properties

添加:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

第8步

修改kafka的config文件夹中的zookeeper.properties

添加:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

第9步

修改kafka的config文件夹中的server.properties

修改:

listeners=SASL_PLAINTEXT://192.168.1.115:9092

添加:

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:admin

delete.topic.enable=true

auto.create.topics.enable=false

第10步

启动zookeeper服务

执行

sh bin/zookeeper-server-start.sh config/zookeeper.properties

第11步

启动kafka服务

执行

sh bin/kafka-server-start.sh config/server.properties

第12步

查看topic列表

执行

sh bin/kafka-topics.sh --list --zookeeper localhost:2181

第13步

创建新的topic

执行

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 16 --topic test

第14步

给admin用户授权

执行

sh bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --group=* --topic=*

第15步

给用户test授予某个topic的读写的权限

执行

sh bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operationRead --operationWrite --topic test --group=*

说明:

控制读写:–operationRead–operationWrite

控制消费组:不控制组 --group=*,指定消费组 --grouptest-comsumer-group

第16步

移除权限

执行

sh bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:test --allow-host 192.168.1.101 --operationRead --operationWrite --topic test

第17步

列出topic为test的所有权限账户

执行

sh bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic test

第18步

测试启动消费者

执行

sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.115:9092 --topic test --from-beginning --consumer.config config/consumer.properties

第19步

测试启动生产者

执行

sh bin/kafka-console-producer.sh --broker-list 192.168.1.115:9092 --topic test --producer.config config/producer.properties

第20步

启用kafka-manager,需要使用最新版本1.3.3.21,

链接: 

提取码:vreq

第21步

将my-jaas.conf移到kafa-manager的conf文件夹中

my-jaas.conf内容如下:

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

Client {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

第22步

修改kafa-manager的conf文件夹中的consumer.properties,

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

第23步

修改kafa-manager的conf文件夹中的application.conf

kafka-manager.consumer.properties.file=(上面修改的consumer.properties)

第24步

启动kafka-manager

bin/kafka-manager -Djava.security.auth.login.config=conf/my-jaas.conf -Dconfig.file=conf/application.conf

关于kafkasasl和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表