kafka(五)——消费者流程分析(c++)-流程(c++)

时间:2024-04-17 15:07:53
  • 配置消费者客户端;
  • 订阅主题和分区;
  • 拉取消息;
  • 处理消息;
  • 提交消费位移;

配置消费者客户端

int CKafkaConsumer::Create()
{
	std::string errorStr;
	RdKafka::Conf::ConfResult errorCode;

	do 
	{
		// 1、创建配置对象
		// 1.1、构造 consumer conf 对象
		m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
		if(nullptr == m_config)
		{
            printf("Create RdKafka Conf failed.\n");
			break;
		}

		// 必要参数1:指定 broker 地址列表
		errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 必要参数2:设置消费者组 id
		errorCode = m_config->set("group.id", m_groupID, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(group.id) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置事件回调
		m_event_cb = new ConsumerEventCb;
		errorCode = m_config->set("event_cb", m_event_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(event_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置消费者组再平衡回调
		m_rebalance_cb = new ConsumerRebalanceCb;
		errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
		errorCode = m_config->set("enable.partition.eof", "false", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 每次最大拉取的数据大小
		errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置分区分配策略:range、roundrobin、cooperative-sticky
		errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳探活超时时间---1s
		errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳保活间隔
		errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 1.2、创建 topic conf 对象
		m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
		if (nullptr == m_topicConfig) 
		{
            printf("Create RdKafka Topic Conf failed.\n");
			break;
		}

		// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
		errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 默认 topic 配置,用于自动订阅 topics
		errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 2、创建 Consumer 对象
		m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
		if (nullptr == m_consumer) 
		{
            printf("Create KafkaConsumer failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

        printf("Created consumer success, consumerName:%s.\n",
                  m_consumer->name().c_str());
		return 0;
	} while (0);

	Destroy();
	return -1;
}

订阅主题和分区

std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");

RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode) 
{
    printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
    return;
}

拉取消息

// 可放到线程中处理

while (m_running) 
{
    RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
    if(NULL != msg)
    {
        // 消费消息
        ConsumeMsg_(msg, NULL);

        m_consumer->commitAsync(); 
        delete msg;
    }
}

处理消息

void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{
	switch (msg->err()) 
	{
	case RdKafka::ERR__TIMED_OUT: // 超时
		break;
	case RdKafka::ERR_NO_ERROR:   // 有消息进来
		printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",
			msg->topic_name().c_str(), 
			msg->partition(), 
			msg->key()->c_str(), 
			(char *)msg->payload());
            
        // 消息处理
		break;
	default:
		break;
	}
}

提交消费位移

m_consumer->commitAsync();