kafka 配置认证与授权

时间:2024-04-15 17:38:39

本例不使用kerberos做认证,使用用户名和密码的方式来进行认证

1、服务端配置

1.0 配置server.properties 添加如下配置

#配置 ACL 入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
#本例使用 SASL PLAINTEXT 
listeners=SASL_PLAINTEXT://hadoop4:9092 
security.inter.broker.protocol= SASL_PLAINTEXT 
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN 
#设置本例中 admin 为超级用户
super.users=User:admin

1.1 创建服务端的jaas.conf文件,文件信息如下:

[hduser@hadoop4 config]$ cat jaas.conf 
KafkaServer { 
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};

1.2 修改启动脚本kafka-server-start.sh,

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf  kafka.Kafka "$@"

其中:-Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf 是新加的

2、生产者配置

2.1 生成jaas文件

[hduser@hadoop4 config]$ cat writer_jaas.conf 
KafkaClient { 
org.apache.kafka.common.security.plain.PlainLoginModule required 
username = "writer"
password="writer";
};

2.2 配置生产者启动脚本

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/writer_jaas.conf  kafka.tools.ConsoleProducer "$@"

2.3 配置启动脚本

kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092  --topic test2  --producer-property security.protocol=SASL_PLAINTEXT  --producer-property sasl.mechanism=PLAIN

可以发现,需要添加协议参数:

security.protocol: 表示开启安全协议,使用SASL,
sasl.mechanism: 协议机制,如果是使用Kerberos,那么就配置kerberos

如果继续执行上述的命令,可以发现还是失败,失败的原因是对于topic test2来说,没有授权。

2.4 授权
在设置具体的 ACL 规则之前,首先简单学习一下 Kafka ACL 的格式。根据官网 的介绍,
Kafka 一条 ACL 的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On
Resource R",含义描述如下:

principal :表示 Kafka user
operation :表示 个具体的操作类型,如 WRITE、READ 、DESCRIBE 。完整的操
作列表详见 http://docs.confluent.io/current/kafka/authorization.html#overview
Host 表示连 Kafka 集群的 client IP 地址,如果是“*”则表示所有四。注意 ,当
Kafka 不支持主机名,只能指定 IP 地址。
Resource :表示一种 Kafka 资源类型 。当前共有 种类型 TOPIC CLUSTER GROUP
和 TRANSACTIONID

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test2

3、消费者

3.1 配置jaas文件

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required    
    username="reader"
    password="reader";
};

3.2 消费者启动脚本配置

exec $(dirname $0)/kafka-run-class.sh  -Djava.security.auth.login.config=/data1/hadoop/kafka/config/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

3.3 创建消费者配置文件

[hduser@hadoop4 ~]$ cat consumer.config 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group

3.4 消费数据

  • 如果不指定consumer.config,将会出现下面的异常
[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2
[2021-05-08 09:44:35,771] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,187] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,599] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:37,006] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
  • 接着指定consumer.config
[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config 
[2021-05-08 09:46:10,044] WARN [Consumer clientId=consumer-test-group-1, groupId=test-group] Error while fetching metadata with correlation id 2 : {test2=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:46:10,045] ERROR [Consumer clientId=consumer-test-group-1, groupId=test-group] Topic authorization failed for topics [test2] (org.apache.kafka.clients.Metadata)
[2021-05-08 09:46:10,047] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test2]

可以发现跟生产者是一样的,没有权限访问topic test2

3.5 授权

[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`: 
 	(principal=User:reader, host=*, operation=READ, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`: 
 	(principal=User:writer, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

3.6 重新消费
接着消费还是会发现没有对组test-group的操作权限

[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config 
[2021-05-08 09:48:07,842] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
Processed a total of 0 messages

赋予权限

[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`: 
 	(principal=User:reader, host=*, operation=READ, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`: 
 	(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

生产者发送

[hduser@hadoop4 ~]$ kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092  --topic test2  --producer-property security.protocol=SASL_PLAINTEXT  --producer-property sasl.mechanism=PLAIN
>hahaha
>wanm^H^H
>完美
>

消费者消费

[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config 
hahaha
wanm
完美

4、管理员

使用admin用户查看用户的组信息
4.1 配置jaas.conf文件

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin"
    password="admin";
};

4.2 配置脚本kafka-consumer-groups.sh

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/admin_jaas.conf kafka.admin.ConsumerGroupCommand "$@"

4.3 配置安全协议属性

[hduser@hadoop4 ~]$ cat admin_sasl.config 
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

4.4 查看组信息

[hduser@hadoop4 ~]$ kafka-consumer-groups.sh --group test-group --describe --command-config admin_sasl.config --bootstrap-server 192.168.43.15:9092

Consumer group \'test-group\' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test2           0          3               3               0               -               -               -
test-group      test            1          1001515         1001516         1               -               -               -
test-group      test            0          992785          992786          1               -               -               -
test-group      test            3          1000894         1000894         0               -               -               -
test-group      test            2          1000772         1000773         1               -               -               -
test-group      test            4          1004034         1004034         0               -               -               -

一般生产环境还是得使用Kerberos配合ranger+ldap。

借鉴kafka实战