Spring-Kafka —— KafkaListener手动启动和停止

时间:2024-04-30 21:21:24

一、KafkaListener消费

    /**
* 手动提交监听.
*
* @param record 消息记录
* @param ack 确认实例
*/
@Override
@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning")
public void ackListener(ConsumerRecord record, Acknowledgment ack) {
if (LOG.isInfoEnabled()) {
LOG.info("###################预警ackListener接收到消息###################");
} boolean ackFlag = true;
long beginTime = System.currentTimeMillis();
try {
WarningInfo warningInfo = parseConsumerRecord(record);
if (null == warningInfo) {
dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()}));
} else {
warningBusinessHandle.doHandle(record, warningInfo);
}
// } catch (BusinessException ex) {
// LOG.error(record.topic() + "消费失败:" + ex.getMessage(), ex);
// // 业务处理失败(目前暂无此场景),把消息发送至重试主题
// this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage()));
} catch (Exception e) {
LOG.error("[" + record.topic() + "]消费发生运行时异常:" + e.getMessage(), e);
ackFlag = false;
consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING);
dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()}));
} finally {
if (ackFlag) {
// 手动提交offset
ack.acknowledge();
}
LOG.info("###################预警ackListener处理完消息,耗时" + (System.currentTimeMillis()-beginTime) + "ms ###################");
}
}

二、使用KafkaListenerEndpointRegistry实现启动和停止功能

下面参数里面的listenerId值,必须是消费时@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl;

import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service; /**
* Kafka消费监听服务实现类.
*
* @author weixiong.cao
* @date 2019/7/2
*/
@Service
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService { /**
* LOG.
*/
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class); /**
* registry.
*/
@Autowired
private KafkaListenerEndpointRegistry registry; /**
* 开启监听.
*
* @param listenerId 监听ID
*/
@Override
public void startListener(String listenerId) {
//判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer(listenerId).isRunning()) {
registry.getListenerContainer(listenerId).start();
}
//项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
registry.getListenerContainer(listenerId).resume();
LOG.info(listenerId + "开启监听成功。");
} /**
* 停止监听.
*
* @param listenerId 监听ID
*/
@Override
public void stopListener(String listenerId) {
registry.getListenerContainer(listenerId).stop();
LOG.info(listenerId + "停止监听成功。");
} }

三、Controller

package com.macaupass.kafka.consumer.controller;

import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody; import java.util.HashMap;
import java.util.Map; /**
* Kafka消费监听Controller.
*
* @author weixiong.cao
* @date 2019/7/2
*/
@Controller
@RequestMapping(value = "/listener")
public class KafkaConsumerListenerController { /**
* LOG.
*/
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class); /**
* 注入监听服务.
*/
@Autowired
private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService; /**
* 开启监听.
*
* @param listenerId 监听ID
*/
@RequestMapping("/start")
@ResponseBody
public Map<String, String> startListener(@RequestParam(required=false) String listenerId) {
if (LOG.isInfoEnabled()) {
LOG.info("开启监听...listenerId=" + listenerId);
} Map<String, String> retMap = new HashMap<>();
try {
kafkaConsumerListenerService.startListener(listenerId);
retMap.put("respCode", "0000");
retMap.put("respMsg", "启动成功。");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
retMap.put("respCode", "0001");
retMap.put("respMsg", "启动失败:" + e.getMessage());
}
return retMap;
} /**
* 停止监听.
*
* @param listenerId 监听ID
*/
@RequestMapping("/stop")
@ResponseBody
public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) {
if (LOG.isInfoEnabled()) {
LOG.info("停止监听...listenerId=" + listenerId);
} Map<String, String> retMap = new HashMap<>();
try {
kafkaConsumerListenerService.stopListener(listenerId);
retMap.put("respCode", "0000");
retMap.put("respMsg", "停止成功。");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
retMap.put("respCode", "0001");
retMap.put("respMsg", "停止失败:" + e.getMessage());
}
return retMap;
} /**
* 访问入口.
*/
@RequestMapping("/index")
public String index() {
return "kafka/listener";
} }

四、JSP界面

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
<title>消费监听管理</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<link rel="stylesheet" href="../css/common.css">
<script src="../jquery/jquery-3.3.1.min.js"></script>
<script type="text/javascript">
var contextPath = "<%=request.getContextPath() %>"; /**
* 开启监听.
*
* @param listenerId 监听ID
*/
function startListener(listenerId) {
ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId);
} /**
* 停止监听.
*
* @param listenerId 监听ID
*/
function stopListener(listenerId) {
ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId);
} /**
* ajax请求.
*
* @param url 请求url
*/
function ajaxPostByJson(url) {
$.ajax({
type: "POST",
url: url,
dataType:"json",
contentType : 'application/json;charset=utf-8',
success: function(respData){
alert(respData.respMsg);
},
error: function(res){
alert("系統異常:" + res.responseText);
}
});
}
</script>
</head>
<body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4>
<div id="main">
<div id="head">
<dl class="alipay_link">
<a target="_blank" href=""><span>&nbsp;</span></a>
</dl>
<span class="title">Kafka消费手动管理</span>
</div>
<div class="cashier-nav">
</div>
<form name=query method=post>
<div id="body" style="clear:left">
<dl class="content">
<dd>
<span class="new-btn-login-sp">
<button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">开启【预警】消费</button>
</span>
<span class="new-btn-login-sp">
<button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【预警】消费</button>
</span>
</dd>
</dl>
</div>
</form>
</div>
</body>
</html>

五、功能界面

Spring-Kafka —— KafkaListener手动启动和停止