利用生产者消费者模型和MQ模型写一个自己的日志系统-并发设计里一定会用到的手段

时间:2023-03-09 05:27:05
利用生产者消费者模型和MQ模型写一个自己的日志系统-并发设计里一定会用到的手段

一:前言

  写这个程序主要是用来理解生产者消费者模型,以及通过这个Demo来理解Redis的单线程取原子任务是怎么实现的和巩固一下并发相关的知识;这个虽然是个Demo,但是只要稍加改下Appender部分也是可以用于项目中的,假设项目里确实不需要log4j/logback之类的日志组件的时候;

二:实现方式

1.利用LinkedList作为MQ(还可以用jdk自带的LinkedBlockingQueue,不过这个Demo主要是为了更好的理解原理因此写的比较底层);

2.利用一个Daemon线程作为消费者从MQ里实时获取日志对象/日志记录,并将它提交给线程池,由线程池再遍历所有的appender并调用它们的通知方法,这个地方还可以根据场景进行效率优化,如将循环遍历appender改为将每个appender都再此提交到线程池实现异步通知观察者;

3.为生产者提供log方法作为生产日志记录的接口,无论是生产日志对象还是消费日志对象在操作队列时都需要对队列加锁,因为个人用的是非并发包里的;

4.消费者在获取之前会先判断MQ里是否有数据,有则获取并提交给线程池处理,否则wait;

5.生产者生产了日志对象后通过notify通知消费者去取,因为只有一个消费者,而生产者是不会wait的因此只需要notify而不用notifyAll

6.。。剩下的就看代码来说明吧;

三:代码

1.MyLogger类的实现

package me.silentdoer.mqlogger.log;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.DEBUG;
import static me.silentdoer.mqlogger.log.MyLogger.LogLevel.ERROR; /**
* @author silentdoer
* @version 1.0
* @description 这里只是做一个简单的logger实现,不提供Appender之类的功能,主要是用来学习生产者和消费者及MQ的实现原理
* @date 4/26/18 6:07 PM
*/
public class MyLogger{
private LogLevel loggerLevel = DEBUG;
private String charset = "UTF-8"; // 暂且没用,但是当需要序列化时是可能用到的;
// TODO 也可以直接用LinkedQueue,然后手动通过ReentrantLock来实现并发时的数据安全(synchronized也可)
//private BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<LogRecord>(); // 可以理解为支持并发的LinkedList
// TODO 想了一下既然是要学习原理干脆就实现的更底层一点
private final Queue<LogRecord> records = new LinkedList<LogRecord>();
// TODO 用于记录生产了多少条日志,可供外部获取
private AtomicLong produceCount = new AtomicLong(0);
// TODO 用于记录消费了多少条日志
private AtomicLong consumeCount = new AtomicLong(0);
// TODO 日志记录的Consumer
private Thread consumer = new LogDaemon(); public MyLogger(){
consumer.setDaemon(true);
consumer.start();
} /**
* 对外提供的接口,即log方法就是生产者用于生产日志数据的接口
* @param msg
* @param level
*/
public void log(String msg, LogLevel level){
Date curr = generateCurrDate();
log(new LogRecord(level, msg, curr));
} /**
* 对外提供的接口,即log方法就是生产者用于生产日志数据的接口
* @param msg
*/
public void log(String msg){
Date curr = generateCurrDate();
log(new LogRecord(this.loggerLevel, msg, curr));
} /**
* 给生产者(即调用log的方法都可以理解为生产者在生产日志对象)提供用于生产日志记录的接口
* @param record
*/
public void log(LogRecord record){
// ReentrantLock可以替代synchronized,不过当前场景下synchronized已经足够
synchronized (this.records){ // TODO 如果用的是LinkedBlockingQueue是不需要这个的
this.records.offer(record);
this.produceCount.incrementAndGet();
this.records.notify(); // TODO 只有一个线程会records.wait(),因此notify()足够
}
} // TODO 类似Redis的那个单线程,用于读取命令对象,而这里则是用于读取LogRecord并通过appender将数据写到相应位置
private class LogDaemon extends Thread{
private volatile boolean valid = true;
// 充当appenders的角色
private List<Writer> appenders = null;
private ExecutorService threadPool = new ThreadPoolExecutor(1, 3
, 180000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024)); @Override
public void run() {
while(this.valid){
// TODO 根据最少知道原则,在这里不要去想整体里是否存在打断此线程的地方,你就认为此线程是可能被外界打断的即可,因此需要做一定处理
try {
synchronized (MyLogger.this.records) {
if (MyLogger.this.records.size() <= 0) {
MyLogger.this.records.wait();
}
final LogRecord firstRecord = MyLogger.this.records.poll();
MyLogger.this.consumeCount.incrementAndGet();
//threadPool.submit()
threadPool.execute(() -> MyLogger.this.notifyAppender(this.appenders, firstRecord));
}
}catch (InterruptedException ex){
this.valid = false;
ex.printStackTrace();
}catch (Throwable t){
t.printStackTrace();
}
}
}
} private void notifyAppender(final List<Writer> appenders, final LogRecord record) {
if(appenders == null){
PrintWriter writer = new PrintWriter(record.level == ERROR ? System.err : System.out);
writer.append(record.toString());
writer.flush();
}else{
// TODO 这种是同步的方式,如果是异步的方式可以将每个appender的执行都由一个Runnable对象包装,然后submit给线程池(或者中间加个中间件)
for(Writer writer : appenders){
try {
writer.append(record.toString());
}catch (IOException ex){
ex.printStackTrace();
}
}
}
} /**
* 用于产生当前时间的模块,防止因为并发而导致LogRecord的timestamp根实际情况不符
*/
private Lock currDateLock = new ReentrantLock(); // 直接用synchronized亦可
private Date generateCurrDate(){
currDateLock.lock();
Date result = new Date();
currDateLock.unlock();
return result;
} // 生产者生产的数据对象
public static class LogRecord{
private LogLevel level;
private String msg;
private Date timestamp;
private static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
private SimpleDateFormat dateFormat = DEFAULT_DATE_FORMAT; /*public LogRecord(){
this(INFO, "");
}*/ public LogRecord(LogLevel level, String msg){
this(level, msg, new Date()); // 还是最好由外界设置timestamp,否则高并发下会比较不准
} // TODO 最好用这个,不然高并发下timestamp容易出现顺序不准确的情况。
public LogRecord(LogLevel level, String msg, Date timestamp){
this.level = level;
this.msg = msg;
this.timestamp = timestamp;
} @Override
public String toString(){
return String.format("[Level:%s, Datetime:%s] : %s\n", level, dateFormat.format(timestamp), msg);
} public LogLevel getLevel() {
return level;
} public String getMsg() {
return msg;
} public void setDateFormat(SimpleDateFormat dateFormat) {
this.dateFormat = dateFormat;
} public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
} public enum LogLevel{ // TODO 内部enum默认就是static
INFO,
DEBUG,
ERROR
} public LogLevel getLoggerLevel() {
return loggerLevel;
} public void setLoggerLevel(LogLevel loggerLevel) {
this.loggerLevel = loggerLevel;
} public String getCharset() {
return charset;
} public void setCharset(String charset) {
this.charset = charset;
} public AtomicLong getProduceCount() {
return produceCount;
} public AtomicLong getConsumeCount() {
return consumeCount;
}
}

2.测试用例1

package me.silentdoer.mqlogger;

import me.silentdoer.mqlogger.log.MyLogger;

import java.util.Scanner;

/**
* @author silentdoer
* @version 1.0
* @description the description
* @date 4/26/18 10:13 PM
*/
public class Entrance {
private static MyLogger logger = new MyLogger(); public static void main(String[] args){
//logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
Scanner scanner = new Scanner(System.in);
String line;
while(!(line = scanner.nextLine()).equals("exit")){
if(line.equals(""))
continue;
logger.log(line);
System.out.println(String.format("共生产了%s条日志。", logger.getConsumeCount()));
try {
Thread.sleep(500);
}catch (InterruptedException ex){ }
System.out.println(String.format("共消费了%s条日志。", logger.getProduceCount()));
}
}
}

3.测试用例2

package me.silentdoer.mqlogger;

import me.silentdoer.mqlogger.log.MyLogger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; /**
* @author silentdoer
* @version 1.0
* @description the description
* @date 4/26/18 10:32 PM
*/
public class Entrance2 {
private static MyLogger logger = new MyLogger(); public static void main(String[] args){
logger.setLoggerLevel(MyLogger.LogLevel.ERROR);
ExecutorService threadPool = Executors.newCachedThreadPool();
for(int i=0;i<10;i++){
final int index = i + 1;
threadPool.execute(() -> {
logger.log(String.format("生产的第%s条记录。", index));
System.out.println(String.format("共生产了%s条记录。", index));
});
try {
Thread.sleep(100);
}catch (InterruptedException ex){ }
}
try {
Thread.sleep(3000);
System.out.println(String.format("共%s条记录被消费。", logger.getConsumeCount()));
}catch (InterruptedException ex){ }
//threadPool.shutdown();
//threadPool.shutdownNow();
}
}

四:补充

  如果想实现像BlockingQueue一样能够控制MQ的元素个数范围,则可以通过ReentrantLock的Confition来实现,即通过lock创建两个Condition对象,一个用来描述是否MQ中元素达到上限的情况,一个用于描述MQ中元素降到下限的情况;

无论是达到上限或降到下限都会通过相应的condition对象来阻塞对应的生产者或消费者的生产/消费过程从而实现MQ元素个数的可控性;