多线程socket编程示例

时间:2024-04-15 23:04:04

工程:

多线程socket编程示例

代码:

package com.my.socket.business;

/**
* 业务实现类
*
* @author ZY
*
*/
public class CoreMisBusinessImpl implements IBusiness { @Override
public String doBusiness(String requestJsonMsg) {
System.out.println("收到数据[" + requestJsonMsg + "]"); // TODO 业务处理
System.out.println("已处理数据"); // 返回业务数据(Json形式)
String resJson = "{\"markId\":1,\"ID\":\"02\",\"goods_id\":\"1\",\"markContent\":\"成功\",\"userNickname\":\"hello\"}"; System.out.println("已返回数据"); return resJson;
}
}
package com.my.socket.business;

public interface IBusiness {

    public String doBusiness(String requestJsonMsg);

}
package com.my.socket.common;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket; /**
* socket发送端
*
* @author ZY
*
*/
public class Client { /**
* 发送socket请求,返回服务器处理结果字符串
*
* @param ip
* @param port
* @param timeout
* @param requestJsonMsg
* @return
* @throws IOException
*/
public static String sendSocketRequest(String ip, int port, int timeout, String requestJsonMsg) throws IOException {
String res = null;
Socket socket = null;
BufferedReader br = null;
BufferedWriter out = null;
try {
socket = new Socket(ip, port);
socket.setSoTimeout(timeout);
System.out.println("现在客户端发起了一个socket请求,客户端[ip=" + socket.getLocalAddress() + ",port=" + port + "],服务端[ip="
+ ip + ",port=" + port + "]"); // 发送消息
requestJsonMsg = requestJsonMsg + Constant.LINESEPARATOR;
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), Constant.CHARCODE_UTF8));
out.write(requestJsonMsg);
out.flush();
// 接收服务器的反馈
br = new BufferedReader(new InputStreamReader(socket.getInputStream(), Constant.CHARCODE_UTF8));
res = br.readLine(); } catch (IOException e) {
e.printStackTrace();
throw e;
} finally {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
if (br != null)
br.close();
} catch (IOException e) {
e.printStackTrace();
}
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} return res;
} public static void main(String[] args) throws IOException { String ip = "10.5.109.184";
int port = 8088;
int timeout = 1000 * 60 * 5;
String requestJsonMsg = "{\"markId\":1,\"ID\":\"02\",\"goods_id\":\"1\",\"markContent\":\"测试\",\"userNickname\":\"hello\"}"; String res = sendSocketRequest(ip, port, timeout, requestJsonMsg); System.out.println("res=" + res);
}
}
package com.my.socket.common;

public class Constant {

    /**
* 编码方式
*/
public static final String CHARCODE_UTF8 = "utf-8"; /**
* 文件换行符
*/
public static final String LINESEPARATOR = System.getProperty("line.separator"); }
package com.my.socket.common;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket; import com.my.socket.business.IBusiness; class Handler implements Runnable { private Socket socket; private IBusiness business; public Handler(Socket socket,IBusiness business) {
this.socket = socket;
this.business = business;
} private BufferedWriter getWriter(Socket socket) throws IOException {
return new BufferedWriter(new OutputStreamWriter(socket
.getOutputStream(), Constant.CHARCODE_UTF8));
} private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn,
Constant.CHARCODE_UTF8));
} public void run() {
BufferedReader br = null;
BufferedWriter out = null;
try {
br = getReader(socket);
out = getWriter(socket);
String requestJsonMsg = null;
while ((requestJsonMsg = br.readLine()) != null) {
// 业务处理:接收到请求消息,处理后,返回消息
String responseJsonMsg = business.doBusiness(requestJsonMsg);
responseJsonMsg = responseJsonMsg + Constant.LINESEPARATOR;
out.write(responseJsonMsg);
out.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
if (br != null)
br.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
if (out != null)
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.my.socket.common;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import com.my.socket.business.CoreMisBusinessImpl;
import com.my.socket.business.IBusiness; /**
* 数据同步服务:可用于前置和核心mis,根据实例化参数配置<br>
* 注意:本实例只有一个
*
* @author zhangyi
*
*/
public class MultiThreadServer extends Thread {
private static MultiThreadServer server;
private ServerSocket serverSocket;
private ExecutorService executorService;
private final int POOL_SIZE = 10;
private int port;
// 业务business
private IBusiness business; public int getPort() {
return port;
} public void setPort(int port) {
this.port = port;
} public IBusiness getBusiness() {
return business;
} public void setBusiness(IBusiness business) {
this.business = business;
} private MultiThreadServer() {
} public static MultiThreadServer getInstance() {
if (null == server) {
server = new MultiThreadServer();
}
return server;
} private void init(int port) throws IOException {
serverSocket = new ServerSocket(port);
executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors()
* POOL_SIZE);
System.out.println("socket同步服务已启动...");
} private void startExecute() {
while (true) {
Socket socket = null;
try {
socket = serverSocket.accept();
executorService.execute(new Handler(socket, business)); } catch (Exception e) {
e.printStackTrace();
}
}
} /**
* 入口方法<br>
* 一个核心mis只有一个ip和端口
*
*/
@Override
public void run() { try {
server.init(port);
server.startExecute();
} catch (IOException e) {
e.printStackTrace();
} } public static void main(String[] args) {
// 必须是单例的
MultiThreadServer server = MultiThreadServer.getInstance(); IBusiness coreMisBusiness = new CoreMisBusinessImpl(); int port = 8088; server.setBusiness(coreMisBusiness);
server.setPort(port); Thread th = new Thread(server);
th.start();
System.out.println("服务已启动...");
}
}