生产者与消费者以及ActiveMQ

时间:2024-04-04 00:03:38

生产者与消费者以及ActiveMQ

一、 多线程实现生产者与消费者

1.1 生产者与消费者头文件

#pragma once

#include <iostream>

#include <mutex>

#include <thread>

#include <condition_variable>

class ProAndConClass

{

public:

ProAndConClass();

~ProAndConClass();

void printThread();

void addThread(int num);

bool g_flag = false;

private:

std::condition_variable g_cond_add_enable; //计算条件变量

std::condition_variable g_cond_print_enable;//打印条件

std::mutex g_mutex;

int g_value = 0;

bool g_print_able = false; //是否可以打印

};

1.2 实现cpp文件

#include "ProduceAndConsume.h"

//若不在堆上创建类的实例。需要将如下变量定义在全局数据区,不能放在类的内部,类成员变量可能分配在堆或者栈上,而线程是独享栈区的

//std::condition_variable g_cond_add_enable; //计算条件变量

//std::condition_variable g_cond_print_enable;//打印条件

//std::mutex g_mutex;

//int g_value = 0;

//bool g_print_able = false; //是否可以打印

ProAndConClass::ProAndConClass()

{

}

ProAndConClass::~ProAndConClass()

{

}

void ProAndConClass::addThread(int numThread)

{

std::cout << "add thread begin" << std::endl;

while (g_value < numThread)

{

std::unique_lock<std::mutex>my_lock(g_mutex);

g_cond_add_enable.wait(my_lock,

[=] {

return !g_print_able;

});

g_value++;

g_print_able = true;

std::cout << "++add thread" << g_value << std::endl;

g_cond_print_enable.notify_one(); //增加打印

}

//g_flag = false;

std::cout << "add thread leave"<<std::endl;

}

void ProAndConClass::printThread()

{

std::cout << "print thread begin" << std::endl;

while (g_flag)

{

std::unique_lock<std::mutex> my_lock(g_mutex);

g_cond_print_enable.wait(my_lock,

[=] {

return g_print_able;

});

g_print_able = false;

std::cout << "-- print thread" << g_value << std::endl;

g_cond_add_enable.notify_one();//通知增加线程

}

std::cout << "print thread leave" << std::endl;

}

1.3 主函数main

void testProAndCon()

{

ProAndConClass *proandcon = new(std::nothrow)ProAndConClass();//分配在堆上

proandcon->g_flag = true;

/*ProAndConClass proandcon;//如此,需要把变量定义到全局数据段.因为这种形式对象构

//造在栈区,而线程独享栈区

proandcon.g_flag = true;*/

//线程的初始化三种方式:普通函数、类成员函数、函数对象

std::thread thread_add(&ProAndConClass::addThread, proandcon, 10); // 生产者

std::thread thread_print(&ProAndConClass::printThread, proandcon); //消费者

//getchar();

Sleep(1000);

if (thread_add.joinable())

{

std::cout << "join add thread" << std::endl;

thread_add.join();

}

if (thread_print.joinable())

{

std::cout << "join print thread" << std::endl;

thread_print.join();

}

return;

}

1.4 生产者消费者小结

    使用两个线程,一个生产一个消费。两个线程是独享栈区的,所以测试的变量要放在他们可以共同可以操作的到的地方。

二、基于生产者消费者的MQ

2.1 代码调用过程

//Mq也分为生产者和消费者两个类

class
HelloWorldProducer : public Runnable {

private:

Connection* connection;

Session* session;

Destination* destination;

MessageProducer* producer;

int numMessages;

bool useTopic;

bool sessionTransacted;

std::string brokerURI;

private:

HelloWorldProducer(const
HelloWorldProducer&);

HelloWorldProducer& operator=(const
HelloWorldProducer&);

public:

HelloWorldProducer(const std::string&
brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted =
false) :

connection(NULL),

session(NULL),

destination(NULL),

producer(NULL),

numMessages(numMessages),

useTopic(useTopic),

sessionTransacted(sessionTransacted),

brokerURI(brokerURI) {

}

virtual ~HelloWorldProducer(){

cleanup();

}

void close() {

this->cleanup();

}

virtual void run() {

try {

//1 Create a ConnectionFactory

auto_ptr<ConnectionFactory>
connectionFactory(

ConnectionFactory::createCMSConnectionFactory(brokerURI));

// 2Create a Connection

connection =
connectionFactory->createConnection();

connection->start();

// 3Create a Session

if (this->sessionTransacted) {

session =
connection->createSession(Session::SESSION_TRANSACTED);

} else {

session = connection->createSession(Session::AUTO_ACKNOWLEDGE);

}

// 4Create the destination (Topic or Queue)

if (useTopic) {

destination =
session->createTopic("TEST.FOO");

} else {

destination =
session->createQueue("TEST.FOO");

}

// 5Create a MessageProducer from the Session to the Topic
or Queue

producer =
session->createProducer(destination);

producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

//6 Create the Thread Id String

string threadIdStr =
Long::toString(Thread::currentThread()->getId());

//7 Create a messages

string text = (string) "Hello
world! from thread " + threadIdStr;

for (int ix = 0; ix <
numMessages; ++ix) {

std::auto_ptr<TextMessage>
message(session->createTextMessage(text));

message->setIntProperty("Integer", ix);

printf("Sent message #%d
from thread %s\n", ix + 1, threadIdStr.c_str());

producer->send(message.get());

}

} catch (CMSException& e) {

e.printStackTrace();

}

}

private:

void cleanup() {

if (connection != NULL) {

try {

connection->close();

} catch (cms::CMSException& ex)
{

ex.printStackTrace();

}

}

// Destroy resources.

try {

delete destination;

destination = NULL;

delete producer;

producer = NULL;

delete session;

session = NULL;

delete connection;

connection = NULL;

} catch (CMSException& e) {

e.printStackTrace();

}

}

};

//消费者类

class
HelloWorldConsumer : public ExceptionListener,

public
MessageListener,

public Runnable {

private:

CountDownLatch latch;

CountDownLatch doneLatch;

Connection* connection;

Session* session;

Destination* destination;

MessageConsumer* consumer;

long waitMillis;

bool useTopic;

bool sessionTransacted;

std::string brokerURI;

private:

HelloWorldConsumer(const
HelloWorldConsumer&);

HelloWorldConsumer& operator=(const
HelloWorldConsumer&);

public:

HelloWorldConsumer(const std::string&
brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted =
false, int waitMillis = 30000) :

latch(1),

doneLatch(numMessages),

connection(NULL),

session(NULL),

destination(NULL),

consumer(NULL),

waitMillis(waitMillis),

useTopic(useTopic),

sessionTransacted(sessionTransacted),

brokerURI(brokerURI) {

}

virtual ~HelloWorldConsumer() {

cleanup();

}

void close() {

this->cleanup();

}

void waitUntilReady() {

latch.await();

}

virtual void run() {

try {

//1 Create a ConnectionFactory

auto_ptr<ConnectionFactory>
connectionFactory(

ConnectionFactory::createCMSConnectionFactory(brokerURI));

// 2Create a Connection

connection =
connectionFactory->createConnection();

connection->start();

connection->setExceptionListener(this);

//3 Create a Session

if (this->sessionTransacted ==
true) {

session =
connection->createSession(Session::SESSION_TRANSACTED);

} else {

session =
connection->createSession(Session::AUTO_ACKNOWLEDGE);

}

//4 Create the destination (Topic or Queue)

if (useTopic) {

destination =
session->createTopic("TEST.FOO");

} else {

destination =
session->createQueue("TEST.FOO");

}

//5 Create a MessageConsumer from the Session to the Topic
or Queue

consumer =
session->createConsumer(destination);

consumer->setMessageListener(this);

std::cout.flush();

std::cerr.flush();

//6 Indicate we are ready for messages.

latch.countDown();

// 7Wait while asynchronous messages come in.

doneLatch.await(waitMillis);

} catch (CMSException& e) {

// Indicate we are ready for
messages.

latch.countDown();

e.printStackTrace();

}

}

//8 Called from the consumer since this class is a registered
MessageListener.

virtual void onMessage(const Message*
message) {

static int count = 0;

try {

count++;

const TextMessage* textMessage =
dynamic_cast<const TextMessage*> (message);

string text = "";

if (textMessage != NULL) {

text =
textMessage->getText();

} else {

text = "NOT A
TEXTMESSAGE!";

}

printf("Message #%d Received:
%s\n", count, text.c_str());

} catch (CMSException& e) {

e.printStackTrace();

}

// Commit all messages.

if (this->sessionTransacted) {

session->commit();

}

// No matter what, tag the count down
latch until done.

doneLatch.countDown();

}

// If something bad happens you see it here
as this class is also been

// registered as an ExceptionListener with
the connection.

virtual void onException(const
CMSException& ex AMQCPP_UNUSED) {

printf("CMS Exception
occurred.  Shutting down
client.\n");

ex.printStackTrace();

exit(1);

}

private:

void cleanup() {

if (connection != NULL) {

try {

connection->close();

} catch (cms::CMSException& ex)
{

ex.printStackTrace();

}

}

// Destroy resources.

try {

delete destination;

destination = NULL;

delete consumer;

consumer = NULL;

delete session;

session = NULL;

delete connection;

connection = NULL;

} catch (CMSException& e) {

e.printStackTrace();

}

}

};

//main

int main(int
argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

activemq::library::ActiveMQCPP::initializeLibrary();

{

std::cout <<
"=====================================================\n";

std::cout << "Starting the
example:" << std::endl;

std::cout << "-----------------------------------------------------\n";

std::string brokerURI =

"failover:(tcp://localhost:61616"

//        "?wireFormat=openwire"

//       
"&transport.useInactivityMonitor=false"

//       
"&connection.alwaysSyncSend=true"

//        "&connection.useAsyncSend=true"

//       
"?transport.commandTracingEnabled=true"

//       
"&transport.tcpTracingEnabled=true"

//       
"&wireFormat.tightEncodingEnabled=true"

")";

bool useTopics = true;

bool sessionTransacted = false;

int numMessages = 2000;

long long startTime =
System::currentTimeMillis();

HelloWorldProducer  producer(brokerURI, numMessages, useTopics);

HelloWorldConsumer consumer(brokerURI,
numMessages, useTopics, sessionTransacted);

// Start the consumer thread.

Thread consumerThread(&consumer);

consumerThread.start();

// Wait for the consumer to indicate that
its ready to go.

consumer.waitUntilReady();

// Start the producer thread.

Thread producerThread(&producer);

producerThread.start();

// Wait for the threads to complete.

producerThread.join();

consumerThread.join();

long long endTime =
System::currentTimeMillis();

double totalTime = (double)(endTime -
startTime) / 1000.0;

consumer.close();

producer.close();

std::cout << "Time to completion
= " << totalTime << " seconds." << std::endl;

std::cout <<
"-----------------------------------------------------\n";

std::cout << "Finished with the
example." << std::endl;

std::cout << "=====================================================\n";

}

activemq::library::ActiveMQCPP::shutdownLibrary();

}

// END SNIPPET:
demo