socket 简单实现jms(消费者生产者模型)

时间:2022-08-18 17:38:49

本文基于socket通讯,以及lock锁机制来初步实现jms的异步队列。
设计分成三个部分,分别是信息队列管理类,服务端类和客户端类。

Buffer(信息队列管理类)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
private static Queue<Object> queue = new LinkedList<Object>();

private static int INITSIZE = 2;

private Lock mutex;

private Condition condition;

private Buffer(){
mutex = new ReentrantLock();
condition = mutex.newCondition();
}

public static Buffer getIntance(){
return QueueBuffer.instance;
}

static class QueueBuffer{
private static Buffer instance = new Buffer();
}

public void setInitSize(int size){
INITSIZE = size;
}

public void produce(String msg){
mutex.lock();
try {
while(queue.size() >= INITSIZE ){
System.out.println("queue wait to consume");
condition.await();
}

queue.offer(msg);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
condition.signalAll();
mutex.unlock();
}

}

public Object consume(){
mutex.lock();
try {
while (queue.size() == 0) {
System.out.println("queue wait to produce");
condition.await();
}

return queue.poll();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
} finally {
condition.signalAll();
mutex.unlock();
}
}

public int getQueueSize(){
return queue.size();
}
}

Server (服务端类)

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.lang3.StringUtils;

public class Server extends Thread{

private Socket socket;

public Server(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
BufferedReader reader;
try {
InputStream in = socket.getInputStream();
reader = new BufferedReader(
new InputStreamReader(
in));
handle(socket,reader.readLine());
reader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void handle(Socket socket, String msg) throws IOException{
PrintWriter pw = new PrintWriter(socket.getOutputStream());
if (StringUtils.isNotBlank(msg)) {
if (msg.contains("add")) {
msg = msg.substring(msg.indexOf("add")+4);
Buffer.getIntance().produce(msg);
pw.write("server:add "+ msg +" to queue successfully");
}else if(msg.contains("poll")){
String consumeMsg = (String) Buffer.getIntance().consume();
pw.write("server:remove "+ consumeMsg +" from queue successfully");
}else if(msg.contains("size")){
pw.write("server:size is "+ Buffer.getIntance().getQueueSize());
}else{
pw.write("server:no such command");
}
}else{
pw.write("server:blank message");
}
pw.flush();
socket.shutdownOutput();
pw.close();
}

@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6666);
while(true){
new Server(serverSocket.accept()).start();
}

}
}

Client (客户端类)

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;


public class Client {
private Socket socket;

private String serverIP;

private int port;

public Client(String serverIP,int port) throws UnknownHostException, IOException{
this.serverIP = serverIP;
this.port = port;
}

public void run() throws IOException{
while (true) {
socket = new Socket(serverIP, port);
input();
}
}

@SuppressWarnings("resource")
public void input() throws IOException{
Scanner scanner = new Scanner(System.in);
String servermsg = scanner.nextLine();
PrintWriter pw = new PrintWriter(socket.getOutputStream());
pw.write(servermsg);
pw.flush();
socket.shutdownOutput();
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
}
pw.close();
br.close();
}

public static void main(String[] args) throws UnknownHostException, IOException {
Client c = new Client("127.0.0.1",6666);
c.run();
}

}