移动开发首页业界资讯移动应用平台技术专题 输入您要搜索的内容 基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

时间:2023-03-09 06:59:46
移动开发首页业界资讯移动应用平台技术专题  输入您要搜索的内容 基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

在阅读本文前需要对socket以及自定义协议有一个基本的了解,可以先查看上一篇文章《基于Java Socket的自定义协议,实现Android与服务器的长连接(一)》学习相关的基础知识点。

移动开发首页业界资讯移动应用平台技术专题  输入您要搜索的内容 基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

一、协议定义

上一篇文章中,我们对socket编程和自定义协议做了一个简单的了解,本文将在此基础上加以深入,来实现Android和服务器之间的长连接,现定义协议如下:

  • 数据类协议(Data)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,0表示数据)
    • 业务类型(pattion,8bit,0表示push,其他暂未定)
    • 数据格式(dtype,8bit,0表示json,其他暂未定)
    • 消息id(msgId,32bit)
    • 正文数据(data)
  • 数据ack类协议(DataAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,1表示数据ack)
    • ack消息id(ackMsgId,32bit)
    • 预留信息(unused)
  • 心跳类协议(ping)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,2表示心跳)
    • 心跳id(pingId,32bit,client上报取奇数,即1,3,5...,server下发取偶数,即0,2,4...)
    • 预留信息(unused)
  • 心跳ack类协议(pingAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,3表示心跳ack)
    • ack心跳id(pingId,32bit,client上报取奇数,即1,3,5...,server下发取偶数,即0,2,4...)
    • 预留信息(unused)

二、协议实现

从上述的协议定义中,我们可以看出,四种协议有共同的3个要素,分别是:长度、版本号、数据类型,那么我们可以先抽象出一个基本的协议,如下:

1. BasicProtocol

  1. import android.util.Log;
  2. import com.shandiangou.sdgprotocol.lib.Config;
  3. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  4. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  5. import java.io.ByteArrayOutputStream;
  6. /**
  7. * Created by meishan on 16/12/1.
  8. * <p>
  9. * 协议类型: 0表示数据,1表示数据Ack,2表示ping,3表示pingAck
  10. */
  11. public abstract class BasicProtocol {
  12. // 长度均以字节(byte)为单位
  13. public static final int LENGTH_LEN = 4;       //记录整条数据长度数值的长度
  14. protected static final int VER_LEN = 1;       //协议的版本长度(其中前3位作为预留位,后5位作为版本号)
  15. protected static final int TYPE_LEN = 1;      //协议的数据类型长度
  16. private int reserved = 0;                     //预留信息
  17. private int version = Config.VERSION;         //版本号
  18. /**
  19. * 获取整条数据长度
  20. * 单位:字节(byte)
  21. *
  22. * @return
  23. */
  24. protected int getLength() {
  25. return LENGTH_LEN + VER_LEN + TYPE_LEN;
  26. }
  27. public int getReserved() {
  28. return reserved;
  29. }
  30. public void setReserved(int reserved) {
  31. this.reserved = reserved;
  32. }
  33. public int getVersion() {
  34. return version;
  35. }
  36. public void setVersion(int version) {
  37. this.version = version;
  38. }
  39. /**
  40. * 获取协议类型,由子类实现
  41. *
  42. * @return
  43. */
  44. public abstract int getProtocolType();
  45. /**
  46. * 由预留值和版本号计算完整版本号的byte[]值
  47. *
  48. * @return
  49. */
  50. private int getVer(byte r, byte v, int vLen) {
  51. int num = 0;
  52. int rLen = 8 - vLen;
  53. for (int i = 0; i < rLen; i++) {
  54. num += (((r >> (rLen - 1 - i)) & 0x1) << (7 - i));
  55. }
  56. return num + v;
  57. }
  58. /**
  59. * 拼接发送数据,此处拼接了协议版本、协议类型和数据长度,具体内容子类中再拼接
  60. * 按顺序拼接
  61. *
  62. * @return
  63. */
  64. public byte[] genContentData() {
  65. byte[] length = SocketUtil.int2ByteArrays(getLength());
  66. byte reserved = (byte) getReserved();
  67. byte version = (byte) getVersion();
  68. byte[] ver = {(byte) getVer(reserved, version, 5)};
  69. byte[] type = {(byte) getProtocolType()};
  70. ByteArrayOutputStream baos = new ByteArrayOutputStream(LENGTH_LEN + VER_LEN + TYPE_LEN);
  71. baos.write(length, 0, LENGTH_LEN);
  72. baos.write(ver, 0, VER_LEN);
  73. baos.write(type, 0, TYPE_LEN);
  74. return baos.toByteArray();
  75. }
  76. /**
  77. * 解析出整条数据长度
  78. *
  79. * @param data
  80. * @return
  81. */
  82. protected int parseLength(byte[] data) {
  83. return SocketUtil.byteArrayToInt(data, 0, LENGTH_LEN);
  84. }
  85. /**
  86. * 解析出预留位
  87. *
  88. * @param data
  89. * @return
  90. */
  91. protected int parseReserved(byte[] data) {
  92. byte r = data[LENGTH_LEN];//前4个字节(0,1,2,3)为数据长度的int值,与版本号组成一个字节
  93. return (r >> 5) & 0xFF;
  94. }
  95. /**
  96. * 解析出版本号
  97. *
  98. * @param data
  99. * @return
  100. */
  101. protected int parseVersion(byte[] data) {
  102. byte v = data[LENGTH_LEN]; //与预留位组成一个字节
  103. return ((v << 3) & 0xFF) >> 3;
  104. }
  105. /**
  106. * 解析出协议类型
  107. *
  108. * @param data
  109. * @return
  110. */
  111. public static int parseType(byte[] data) {
  112. byte t = data[LENGTH_LEN + VER_LEN];//前4个字节(0,1,2,3)为数据长度的int值,以及ver占一个字节
  113. return t & 0xFF;
  114. }
  115. /**
  116. * 解析接收数据,此处解析了协议版本、协议类型和数据长度,具体内容子类中再解析
  117. *
  118. * @param data
  119. * @return
  120. * @throws ProtocolException 协议版本不一致,抛出异常
  121. */
  122. public int parseContentData(byte[] data) throws ProtocolException {
  123. int reserved = parseReserved(data);
  124. int version = parseVersion(data);
  125. int protocolType = parseType(data);
  126. if (version != getVersion()) {
  127. throw new ProtocolException("input version is error: " + version);
  128. }
  129. return LENGTH_LEN + VER_LEN + TYPE_LEN;
  130. }
  131. @Override
  132. public String toString() {
  133. return "Version: " + getVersion() + ", Type: " + getProtocolType();
  134. }
  135. }

上述涉及到的Config类和SocketUtil类如下:

  1. /**
  2. * Created by meishan on 16/12/2.
  3. */
  4. public class Config {
  5. public static final int VERSION = 1;                 //协议版本号
  6. public static final String ADDRESS = "10.17.64.237"; //服务器地址
  7. public static final int PORT = 9013;                 //服务器端口号
  8. }
  1. import java.io.BufferedInputStream;
  2. import java.io.BufferedOutputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import java.nio.ByteBuffer;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * Created by meishan on 16/12/1.
  11. */
  12. public class SocketUtil {
  13. private static Map<Integer, String> msgImp = new HashMap<>();
  14. static {
  15. msgImp.put(DataProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataProtocol");       //0
  16. msgImp.put(DataAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataAckProtocol"); //1
  17. msgImp.put(PingProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingProtocol");       //2
  18. msgImp.put(PingAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingAckProtocol"); //3
  19. }
  20. /**
  21. * 解析数据内容
  22. *
  23. * @param data
  24. * @return
  25. */
  26. public static BasicProtocol parseContentMsg(byte[] data) {
  27. int protocolType = BasicProtocol.parseType(data);
  28. String className = msgImp.get(protocolType);
  29. BasicProtocol basicProtocol;
  30. try {
  31. basicProtocol = (BasicProtocol) Class.forName(className).newInstance();
  32. basicProtocol.parseContentData(data);
  33. } catch (Exception e) {
  34. basicProtocol = null;
  35. e.printStackTrace();
  36. }
  37. return basicProtocol;
  38. }
  39. /**
  40. * 读数据
  41. *
  42. * @param inputStream
  43. * @return
  44. * @throws SocketExceptions
  45. */
  46. public static BasicProtocol readFromStream(InputStream inputStream) {
  47. BasicProtocol protocol;
  48. BufferedInputStream bis;
  49. //header中保存的是整个数据的长度值,4个字节表示。在下述write2Stream方法中,会先写入header
  50. byte[] header = new byte[BasicProtocol.LENGTH_LEN];
  51. try {
  52. bis = new BufferedInputStream(inputStream);
  53. int temp;
  54. int len = 0;
  55. while (len < header.length) {
  56. temp = bis.read(header, len, header.length - len);
  57. if (temp > 0) {
  58. len += temp;
  59. } else if (temp == -1) {
  60. bis.close();
  61. return null;
  62. }
  63. }
  64. len = 0;
  65. int length = byteArrayToInt(header);//数据的长度值
  66. byte[] content = new byte[length];
  67. while (len < length) {
  68. temp = bis.read(content, len, length - len);
  69. if (temp > 0) {
  70. len += temp;
  71. }
  72. }
  73. protocol = parseContentMsg(content);
  74. } catch (IOException e) {
  75. e.printStackTrace();
  76. return null;
  77. }
  78. return protocol;
  79. }
  80. /**
  81. * 写数据
  82. *
  83. * @param protocol
  84. * @param outputStream
  85. */
  86. public static void write2Stream(BasicProtocol protocol, OutputStream outputStream) {
  87. BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
  88. byte[] buffData = protocol.genContentData();
  89. byte[] header = int2ByteArrays(buffData.length);
  90. try {
  91. bufferedOutputStream.write(header);
  92. bufferedOutputStream.write(buffData);
  93. bufferedOutputStream.flush();
  94. } catch (IOException e) {
  95. e.printStackTrace();
  96. }
  97. }
  98. /**
  99. * 关闭输入流
  100. *
  101. * @param is
  102. */
  103. public static void closeInputStream(InputStream is) {
  104. try {
  105. if (is != null) {
  106. is.close();
  107. }
  108. } catch (IOException e) {
  109. e.printStackTrace();
  110. }
  111. }
  112. /**
  113. * 关闭输出流
  114. *
  115. * @param os
  116. */
  117. public static void closeOutputStream(OutputStream os) {
  118. try {
  119. if (os != null) {
  120. os.close();
  121. }
  122. } catch (IOException e) {
  123. e.printStackTrace();
  124. }
  125. }
  126. public static byte[] int2ByteArrays(int i) {
  127. byte[] result = new byte[4];
  128. result[0] = (byte) ((i >> 24) & 0xFF);
  129. result[1] = (byte) ((i >> 16) & 0xFF);
  130. result[2] = (byte) ((i >> 8) & 0xFF);
  131. result[3] = (byte) (i & 0xFF);
  132. return result;
  133. }
  134. public static int byteArrayToInt(byte[] b) {
  135. int intValue = 0;
  136. for (int i = 0; i < b.length; i++) {
  137. intValue += (b[i] & 0xFF) << (8 * (3 - i)); //int占4个字节(0,1,2,3)
  138. }
  139. return intValue;
  140. }
  141. public static int byteArrayToInt(byte[] b, int byteOffset, int byteCount) {
  142. int intValue = 0;
  143. for (int i = byteOffset; i < (byteOffset + byteCount); i++) {
  144. intValue += (b[i] & 0xFF) << (8 * (3 - (i - byteOffset)));
  145. }
  146. return intValue;
  147. }
  148. public static int bytes2Int(byte[] b, int byteOffset) {
  149. ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE);
  150. byteBuffer.put(b, byteOffset, 4); //占4个字节
  151. byteBuffer.flip();
  152. return byteBuffer.getInt();
  153. }
  154. }

接下来我们实现具体的协议。

2. DataProtocol

  1. import android.util.Log;
  2. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  3. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  4. import java.io.ByteArrayOutputStream;
  5. import java.io.Serializable;
  6. import java.io.UnsupportedEncodingException;
  7. /**
  8. * Created by meishan on 16/12/1.
  9. */
  10. public class DataProtocol extends BasicProtocol implements Serializable {
  11. public static final int PROTOCOL_TYPE = 0;
  12. private static final int PATTION_LEN = 1;
  13. private static final int DTYPE_LEN = 1;
  14. private static final int MSGID_LEN = 4;
  15. private int pattion;
  16. private int dtype;
  17. private int msgId;
  18. private String data;
  19. @Override
  20. public int getLength() {
  21. return super.getLength() + PATTION_LEN + DTYPE_LEN + MSGID_LEN + data.getBytes().length;
  22. }
  23. @Override
  24. public int getProtocolType() {
  25. return PROTOCOL_TYPE;
  26. }
  27. public int getPattion() {
  28. return pattion;
  29. }
  30. public void setPattion(int pattion) {
  31. this.pattion = pattion;
  32. }
  33. public int getDtype() {
  34. return dtype;
  35. }
  36. public void setDtype(int dtype) {
  37. this.dtype = dtype;
  38. }
  39. public void setMsgId(int msgId) {
  40. this.msgId = msgId;
  41. }
  42. public int getMsgId() {
  43. return msgId;
  44. }
  45. public String getData() {
  46. return data;
  47. }
  48. public void setData(String data) {
  49. this.data = data;
  50. }
  51. /**
  52. * 拼接发送数据
  53. *
  54. * @return
  55. */
  56. @Override
  57. public byte[] genContentData() {
  58. byte[] base = super.genContentData();
  59. byte[] pattion = {(byte) this.pattion};
  60. byte[] dtype = {(byte) this.dtype};
  61. byte[] msgid = SocketUtil.int2ByteArrays(this.msgId);
  62. byte[] data = this.data.getBytes();
  63. ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  64. baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id
  65. baos.write(pattion, 0, PATTION_LEN);       //业务类型
  66. baos.write(dtype, 0, DTYPE_LEN);           //业务数据格式
  67. baos.write(msgid, 0, MSGID_LEN);           //消息id
  68. baos.write(data, 0, data.length);          //业务数据
  69. return baos.toByteArray();
  70. }
  71. /**
  72. * 解析接收数据,按顺序解析
  73. *
  74. * @param data
  75. * @return
  76. * @throws ProtocolException
  77. */
  78. @Override
  79. public int parseContentData(byte[] data) throws ProtocolException {
  80. int pos = super.parseContentData(data);
  81. //解析pattion
  82. pattion = data[pos] & 0xFF;
  83. pos += PATTION_LEN;
  84. //解析dtype
  85. dtype = data[pos] & 0xFF;
  86. pos += DTYPE_LEN;
  87. //解析msgId
  88. msgId = SocketUtil.byteArrayToInt(data, pos, MSGID_LEN);
  89. pos += MSGID_LEN;
  90. //解析data
  91. try {
  92. this.data = new String(data, pos, data.length - pos, "utf-8");
  93. } catch (UnsupportedEncodingException e) {
  94. e.printStackTrace();
  95. }
  96. return pos;
  97. }
  98. @Override
  99. public String toString() {
  100. return "data: " + data;
  101. }
  102. }

3. DataAckProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6. * Created by meishan on 16/12/1.
  7. */
  8. public class DataAckProtocol extends BasicProtocol {
  9. public static final int PROTOCOL_TYPE = 1;
  10. private static final int ACKMSGID_LEN = 4;
  11. private int ackMsgId;
  12. private String unused;
  13. @Override
  14. public int getLength() {
  15. return super.getLength() + ACKMSGID_LEN + unused.getBytes().length;
  16. }
  17. @Override
  18. public int getProtocolType() {
  19. return PROTOCOL_TYPE;
  20. }
  21. public int getAckMsgId() {
  22. return ackMsgId;
  23. }
  24. public void setAckMsgId(int ackMsgId) {
  25. this.ackMsgId = ackMsgId;
  26. }
  27. public String getUnused() {
  28. return unused;
  29. }
  30. public void setUnused(String unused) {
  31. this.unused = unused;
  32. }
  33. /**
  34. * 拼接发送数据
  35. *
  36. * @return
  37. */
  38. @Override
  39. public byte[] genContentData() {
  40. byte[] base = super.genContentData();
  41. byte[] ackMsgId = SocketUtil.int2ByteArrays(this.ackMsgId);
  42. byte[] unused = this.unused.getBytes();
  43. ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44. baos.write(base, 0, base.length);              //协议版本+数据类型+数据长度+消息id
  45. baos.write(ackMsgId, 0, ACKMSGID_LEN);         //消息id
  46. baos.write(unused, 0, unused.length);          //unused
  47. return baos.toByteArray();
  48. }
  49. @Override
  50. public int parseContentData(byte[] data) throws ProtocolException {
  51. int pos = super.parseContentData(data);
  52. //解析ackMsgId
  53. ackMsgId = SocketUtil.byteArrayToInt(data, pos, ACKMSGID_LEN);
  54. pos += ACKMSGID_LEN;
  55. //解析unused
  56. try {
  57. unused = new String(data, pos, data.length - pos, "utf-8");
  58. } catch (UnsupportedEncodingException e) {
  59. e.printStackTrace();
  60. }
  61. return pos;
  62. }
  63. }

4. PingProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6. * Created by meishan on 16/12/1.
  7. */
  8. public class PingProtocol extends BasicProtocol {
  9. public static final int PROTOCOL_TYPE = 2;
  10. private static final int PINGID_LEN = 4;
  11. private int pingId;
  12. private String unused;
  13. @Override
  14. public int getLength() {
  15. return super.getLength() + PINGID_LEN + unused.getBytes().length;
  16. }
  17. @Override
  18. public int getProtocolType() {
  19. return PROTOCOL_TYPE;
  20. }
  21. public int getPingId() {
  22. return pingId;
  23. }
  24. public void setPingId(int pingId) {
  25. this.pingId = pingId;
  26. }
  27. public String getUnused() {
  28. return unused;
  29. }
  30. public void setUnused(String unused) {
  31. this.unused = unused;
  32. }
  33. /**
  34. * 拼接发送数据
  35. *
  36. * @return
  37. */
  38. @Override
  39. public byte[] genContentData() {
  40. byte[] base = super.genContentData();
  41. byte[] pingId = SocketUtil.int2ByteArrays(this.pingId);
  42. byte[] unused = this.unused.getBytes();
  43. ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44. baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id
  45. baos.write(pingId, 0, PINGID_LEN);         //消息id
  46. baos.write(unused, 0, unused.length);            //unused
  47. return baos.toByteArray();
  48. }
  49. @Override
  50. public int parseContentData(byte[] data) throws ProtocolException {
  51. int pos = super.parseContentData(data);
  52. //解析pingId
  53. pingId = SocketUtil.byteArrayToInt(data, pos, PINGID_LEN);
  54. pos += PINGID_LEN;
  55. try {
  56. unused = new String(data, pos, data.length - pos, "utf-8");
  57. } catch (UnsupportedEncodingException e) {
  58. e.printStackTrace();
  59. }
  60. return pos;
  61. }
  62. }

5. PingAckProtocol

  1. import com.shandiangou.sdgprotocol.lib.ProtocolException;
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6. * Created by meishan on 16/12/1.
  7. */
  8. public class PingAckProtocol extends BasicProtocol {
  9. public static final int PROTOCOL_TYPE = 3;
  10. private static final int ACKPINGID_LEN = 4;
  11. private int ackPingId;
  12. private String unused;
  13. @Override
  14. public int getLength() {
  15. return super.getLength() + ACKPINGID_LEN + unused.getBytes().length;
  16. }
  17. @Override
  18. public int getProtocolType() {
  19. return PROTOCOL_TYPE;
  20. }
  21. public int getAckPingId() {
  22. return ackPingId;
  23. }
  24. public void setAckPingId(int ackPingId) {
  25. this.ackPingId = ackPingId;
  26. }
  27. public String getUnused() {
  28. return unused;
  29. }
  30. public void setUnused(String unused) {
  31. this.unused = unused;
  32. }
  33. /**
  34. * 拼接发送数据
  35. *
  36. * @return
  37. */
  38. @Override
  39. public byte[] genContentData() {
  40. byte[] base = super.genContentData();
  41. byte[] ackPingId = SocketUtil.int2ByteArrays(this.ackPingId);
  42. byte[] unused = this.unused.getBytes();
  43. ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
  44. baos.write(base, 0, base.length);                //协议版本+数据类型+数据长度+消息id
  45. baos.write(ackPingId, 0, ACKPINGID_LEN);         //消息id
  46. baos.write(unused, 0, unused.length);            //unused
  47. return baos.toByteArray();
  48. }
  49. @Override
  50. public int parseContentData(byte[] data) throws ProtocolException {
  51. int pos = super.parseContentData(data);
  52. //解析ackPingId
  53. ackPingId = SocketUtil.byteArrayToInt(data, pos, ACKPINGID_LEN);
  54. pos += ACKPINGID_LEN;
  55. //解析unused
  56. try {
  57. unused = new String(data, pos, data.length - pos, "utf-8");
  58. } catch (UnsupportedEncodingException e) {
  59. e.printStackTrace();
  60. }
  61. return pos;
  62. }
  63. }

三、任务调度

上述已经给出了四种协议的实现,接下来我们将使用它们来实现app和服务端之间的通信,这里我们把数据的发送、接收和心跳分别用一个线程去实现,具体如下:

1. 客户端

  1. import android.os.Handler;
  2. import android.os.Looper;
  3. import android.os.Message;
  4. import android.util.Log;
  5. import com.shandiangou.sdgprotocol.lib.protocol.BasicProtocol;
  6. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  7. import com.shandiangou.sdgprotocol.lib.protocol.PingProtocol;
  8. import java.io.IOException;
  9. import java.io.InputStream;
  10. import java.io.OutputStream;
  11. import java.net.ConnectException;
  12. import java.net.Socket;
  13. import java.util.concurrent.ConcurrentLinkedQueue;
  14. import javax.net.SocketFactory;
  15. /**
  16. * 写数据采用死循环,没有数据时wait,有新消息时notify
  17. * <p>
  18. * Created by meishan on 16/12/1.
  19. */
  20. public class ClientRequestTask implements Runnable {
  21. private static final int SUCCESS = 100;
  22. private static final int FAILED = -1;
  23. private boolean isLongConnection = true;
  24. private Handler mHandler;
  25. private SendTask mSendTask;
  26. private ReciveTask mReciveTask;
  27. private HeartBeatTask mHeartBeatTask;
  28. private Socket mSocket;
  29. private boolean isSocketAvailable;
  30. private boolean closeSendTask;
  31. protected volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
  32. public ClientRequestTask(RequestCallBack requestCallBacks) {
  33. mHandler = new MyHandler(requestCallBacks);
  34. }
  35. @Override
  36. public void run() {
  37. try {
  38. try {
  39. mSocket = SocketFactory.getDefault().createSocket(Config.ADDRESS, Config.PORT);
  40. //                mSocket.setSoTimeout(10);
  41. } catch (ConnectException e) {
  42. failedMessage(-1, "服务器连接异常,请检查网络");
  43. return;
  44. }
  45. isSocketAvailable = true;
  46. //开启接收线程
  47. mReciveTask = new ReciveTask();
  48. mReciveTask.inputStream = mSocket.getInputStream();
  49. mReciveTask.start();
  50. //开启发送线程
  51. mSendTask = new SendTask();
  52. mSendTask.outputStream = mSocket.getOutputStream();
  53. mSendTask.start();
  54. //开启心跳线程
  55. if (isLongConnection) {
  56. mHeartBeatTask = new HeartBeatTask();
  57. mHeartBeatTask.outputStream = mSocket.getOutputStream();
  58. mHeartBeatTask.start();
  59. }
  60. } catch (IOException e) {
  61. failedMessage(-1, "网络发生异常,请稍后重试");
  62. e.printStackTrace();
  63. }
  64. }
  65. public void addRequest(DataProtocol data) {
  66. dataQueue.add(data);
  67. toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程
  68. }
  69. public synchronized void stop() {
  70. //关闭接收线程
  71. closeReciveTask();
  72. //关闭发送线程
  73. closeSendTask = true;
  74. toNotifyAll(dataQueue);
  75. //关闭心跳线程
  76. closeHeartBeatTask();
  77. //关闭socket
  78. closeSocket();
  79. //清除数据
  80. clearData();
  81. failedMessage(-1, "断开连接");
  82. }
  83. /**
  84. * 关闭接收线程
  85. */
  86. private void closeReciveTask() {
  87. if (mReciveTask != null) {
  88. mReciveTask.interrupt();
  89. mReciveTask.isCancle = true;
  90. if (mReciveTask.inputStream != null) {
  91. try {
  92. if (isSocketAvailable && !mSocket.isClosed() && mSocket.isConnected()) {
  93. mSocket.shutdownInput();//解决java.net.SocketException问题,需要先shutdownInput
  94. }
  95. } catch (IOException e) {
  96. e.printStackTrace();
  97. }
  98. SocketUtil.closeInputStream(mReciveTask.inputStream);
  99. mReciveTask.inputStream = null;
  100. }
  101. mReciveTask = null;
  102. }
  103. }
  104. /**
  105. * 关闭发送线程
  106. */
  107. private void closeSendTask() {
  108. if (mSendTask != null) {
  109. mSendTask.isCancle = true;
  110. mSendTask.interrupt();
  111. if (mSendTask.outputStream != null) {
  112. synchronized (mSendTask.outputStream) {//防止写数据时停止,写完再停
  113. SocketUtil.closeOutputStream(mSendTask.outputStream);
  114. mSendTask.outputStream = null;
  115. }
  116. }
  117. mSendTask = null;
  118. }
  119. }
  120. /**
  121. * 关闭心跳线程
  122. */
  123. private void closeHeartBeatTask() {
  124. if (mHeartBeatTask != null) {
  125. mHeartBeatTask.isCancle = true;
  126. if (mHeartBeatTask.outputStream != null) {
  127. SocketUtil.closeOutputStream(mHeartBeatTask.outputStream);
  128. mHeartBeatTask.outputStream = null;
  129. }
  130. mHeartBeatTask = null;
  131. }
  132. }
  133. /**
  134. * 关闭socket
  135. */
  136. private void closeSocket() {
  137. if (mSocket != null) {
  138. try {
  139. mSocket.close();
  140. isSocketAvailable = false;
  141. } catch (IOException e) {
  142. e.printStackTrace();
  143. }
  144. }
  145. }
  146. /**
  147. * 清除数据
  148. */
  149. private void clearData() {
  150. dataQueue.clear();
  151. isLongConnection = false;
  152. }
  153. private void toWait(Object o) {
  154. synchronized (o) {
  155. try {
  156. o.wait();
  157. } catch (InterruptedException e) {
  158. e.printStackTrace();
  159. }
  160. }
  161. }
  162. /**
  163. * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
  164. *
  165. * @param o
  166. */
  167. protected void toNotifyAll(Object o) {
  168. synchronized (o) {
  169. o.notifyAll();
  170. }
  171. }
  172. private void failedMessage(int code, String msg) {
  173. Message message = mHandler.obtainMessage(FAILED);
  174. message.what = FAILED;
  175. message.arg1 = code;
  176. message.obj = msg;
  177. mHandler.sendMessage(message);
  178. }
  179. private void successMessage(BasicProtocol protocol) {
  180. Message message = mHandler.obtainMessage(SUCCESS);
  181. message.what = SUCCESS;
  182. message.obj = protocol;
  183. mHandler.sendMessage(message);
  184. }
  185. private boolean isConnected() {
  186. if (mSocket.isClosed() || !mSocket.isConnected()) {
  187. ClientRequestTask.this.stop();
  188. return false;
  189. }
  190. return true;
  191. }
  192. /**
  193. * 服务器返回处理,主线程运行
  194. */
  195. public class MyHandler extends Handler {
  196. private RequestCallBack mRequestCallBack;
  197. public MyHandler(RequestCallBack callBack) {
  198. super(Looper.getMainLooper());
  199. this.mRequestCallBack = callBack;
  200. }
  201. @Override
  202. public void handleMessage(Message msg) {
  203. super.handleMessage(msg);
  204. switch (msg.what) {
  205. case SUCCESS:
  206. mRequestCallBack.onSuccess((BasicProtocol) msg.obj);
  207. break;
  208. case FAILED:
  209. mRequestCallBack.onFailed(msg.arg1, (String) msg.obj);
  210. break;
  211. default:
  212. break;
  213. }
  214. }
  215. }
  216. /**
  217. * 数据接收线程
  218. */
  219. public class ReciveTask extends Thread {
  220. private boolean isCancle = false;
  221. private InputStream inputStream;
  222. @Override
  223. public void run() {
  224. while (!isCancle) {
  225. if (!isConnected()) {
  226. break;
  227. }
  228. if (inputStream != null) {
  229. BasicProtocol reciverData = SocketUtil.readFromStream(inputStream);
  230. if (reciverData != null) {
  231. if (reciverData.getProtocolType() == 1 || reciverData.getProtocolType() == 3) {
  232. successMessage(reciverData);
  233. }
  234. } else {
  235. break;
  236. }
  237. }
  238. }
  239. SocketUtil.closeInputStream(inputStream);//循环结束则退出输入流
  240. }
  241. }
  242. /**
  243. * 数据发送线程
  244. * 当没有发送数据时让线程等待
  245. */
  246. public class SendTask extends Thread {
  247. private boolean isCancle = false;
  248. private OutputStream outputStream;
  249. @Override
  250. public void run() {
  251. while (!isCancle) {
  252. if (!isConnected()) {
  253. break;
  254. }
  255. BasicProtocol dataContent = dataQueue.poll();
  256. if (dataContent == null) {
  257. toWait(dataQueue);//没有发送数据则等待
  258. if (closeSendTask) {
  259. closeSendTask();//notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程
  260. }
  261. } else if (outputStream != null) {
  262. synchronized (outputStream) {
  263. SocketUtil.write2Stream(dataContent, outputStream);
  264. }
  265. }
  266. }
  267. SocketUtil.closeOutputStream(outputStream);//循环结束则退出输出流
  268. }
  269. }
  270. /**
  271. * 心跳实现,频率5秒
  272. * Created by meishan on 16/12/1.
  273. */
  274. public class HeartBeatTask extends Thread {
  275. private static final int REPEATTIME = 5000;
  276. private boolean isCancle = false;
  277. private OutputStream outputStream;
  278. private int pingId;
  279. @Override
  280. public void run() {
  281. pingId = 1;
  282. while (!isCancle) {
  283. if (!isConnected()) {
  284. break;
  285. }
  286. try {
  287. mSocket.sendUrgentData(0xFF);
  288. } catch (IOException e) {
  289. isSocketAvailable = false;
  290. ClientRequestTask.this.stop();
  291. break;
  292. }
  293. if (outputStream != null) {
  294. PingProtocol pingProtocol = new PingProtocol();
  295. pingProtocol.setPingId(pingId);
  296. pingProtocol.setUnused("ping...");
  297. SocketUtil.write2Stream(pingProtocol, outputStream);
  298. pingId = pingId + 2;
  299. }
  300. try {
  301. Thread.sleep(REPEATTIME);
  302. } catch (InterruptedException e) {
  303. e.printStackTrace();
  304. }
  305. }
  306. SocketUtil.closeOutputStream(outputStream);
  307. }
  308. }
  309. }

其中涉及到的RequestCallBack接口如下:

  1. /**
  2. * Created by meishan on 16/12/1.
  3. */
  4. public interface RequestCallBack {
  5. void onSuccess(BasicProtocol msg);
  6. void onFailed(int errorCode, String msg);
  7. }

2. 服务端

  1. import java.io.DataInputStream;
  2. import java.io.DataOutputStream;
  3. import java.net.Socket;
  4. import java.util.Iterator;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.ConcurrentLinkedQueue;
  7. /**
  8. * Created by meishan on 16/12/1.
  9. */
  10. public class ServerResponseTask implements Runnable {
  11. private ReciveTask reciveTask;
  12. private SendTask sendTask;
  13. private Socket socket;
  14. private ResponseCallback tBack;
  15. private volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
  16. private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();
  17. private String userIP;
  18. public String getUserIP() {
  19. return userIP;
  20. }
  21. public ServerResponseTask(Socket socket, ResponseCallback tBack) {
  22. this.socket = socket;
  23. this.tBack = tBack;
  24. this.userIP = socket.getInetAddress().getHostAddress();
  25. System.out.println("用户IP地址:" + userIP);
  26. }
  27. @Override
  28. public void run() {
  29. try {
  30. //开启接收线程
  31. reciveTask = new ReciveTask();
  32. reciveTask.inputStream = new DataInputStream(socket.getInputStream());
  33. reciveTask.start();
  34. //开启发送线程
  35. sendTask = new SendTask();
  36. sendTask.outputStream = new DataOutputStream(socket.getOutputStream());
  37. sendTask.start();
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. public void stop() {
  43. if (reciveTask != null) {
  44. reciveTask.isCancle = true;
  45. reciveTask.interrupt();
  46. if (reciveTask.inputStream != null) {
  47. SocketUtil.closeInputStream(reciveTask.inputStream);
  48. reciveTask.inputStream = null;
  49. }
  50. reciveTask = null;
  51. }
  52. if (sendTask != null) {
  53. sendTask.isCancle = true;
  54. sendTask.interrupt();
  55. if (sendTask.outputStream != null) {
  56. synchronized (sendTask.outputStream) {//防止写数据时停止,写完再停
  57. sendTask.outputStream = null;
  58. }
  59. }
  60. sendTask = null;
  61. }
  62. }
  63. public void addMessage(BasicProtocol data) {
  64. if (!isConnected()) {
  65. return;
  66. }
  67. dataQueue.offer(data);
  68. toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程
  69. }
  70. public Socket getConnectdClient(String clientID) {
  71. return onLineClient.get(clientID);
  72. }
  73. /**
  74. * 打印已经链接的客户端
  75. */
  76. public static void printAllClient() {
  77. if (onLineClient == null) {
  78. return;
  79. }
  80. Iterator<String> inter = onLineClient.keySet().iterator();
  81. while (inter.hasNext()) {
  82. System.out.println("client:" + inter.next());
  83. }
  84. }
  85. public void toWaitAll(Object o) {
  86. synchronized (o) {
  87. try {
  88. o.wait();
  89. } catch (InterruptedException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. }
  94. public void toNotifyAll(Object obj) {
  95. synchronized (obj) {
  96. obj.notifyAll();
  97. }
  98. }
  99. private boolean isConnected() {
  100. if (socket.isClosed() || !socket.isConnected()) {
  101. onLineClient.remove(userIP);
  102. ServerResponseTask.this.stop();
  103. System.out.println("socket closed...");
  104. return false;
  105. }
  106. return true;
  107. }
  108. public class ReciveTask extends Thread {
  109. private DataInputStream inputStream;
  110. private boolean isCancle;
  111. @Override
  112. public void run() {
  113. while (!isCancle) {
  114. if (!isConnected()) {
  115. isCancle = true;
  116. break;
  117. }
  118. BasicProtocol clientData = SocketUtil.readFromStream(inputStream);
  119. if (clientData != null) {
  120. if (clientData.getProtocolType() == 0) {
  121. System.out.println("dtype: " + ((DataProtocol) clientData).getDtype() + ", pattion: " + ((DataProtocol) clientData).getPattion() + ", msgId: " + ((DataProtocol) clientData).getMsgId() + ", data: " + ((DataProtocol) clientData).getData());
  122. DataAckProtocol dataAck = new DataAckProtocol();
  123. dataAck.setUnused("收到消息:" + ((DataProtocol) clientData).getData());
  124. dataQueue.offer(dataAck);
  125. toNotifyAll(dataQueue); //唤醒发送线程
  126. tBack.targetIsOnline(userIP);
  127. } else if (clientData.getProtocolType() == 2) {
  128. System.out.println("pingId: " + ((PingProtocol) clientData).getPingId());
  129. PingAckProtocol pingAck = new PingAckProtocol();
  130. pingAck.setUnused("收到心跳");
  131. dataQueue.offer(pingAck);
  132. toNotifyAll(dataQueue); //唤醒发送线程
  133. tBack.targetIsOnline(userIP);
  134. }
  135. } else {
  136. System.out.println("client is offline...");
  137. break;
  138. }
  139. }
  140. SocketUtil.closeInputStream(inputStream);
  141. }
  142. }
  143. public class SendTask extends Thread {
  144. private DataOutputStream outputStream;
  145. private boolean isCancle;
  146. @Override
  147. public void run() {
  148. while (!isCancle) {
  149. if (!isConnected()) {
  150. isCancle = true;
  151. break;
  152. }
  153. BasicProtocol procotol = dataQueue.poll();
  154. if (procotol == null) {
  155. toWaitAll(dataQueue);
  156. } else if (outputStream != null) {
  157. synchronized (outputStream) {
  158. SocketUtil.write2Stream(procotol, outputStream);
  159. }
  160. }
  161. }
  162. SocketUtil.closeOutputStream(outputStream);
  163. }
  164. }

其中涉及到的ResponseCallback接口如下:

  1. /**
  2. * Created by meishan on 16/12/1.
  3. */
  4. public interface ResponseCallback {
  5. void targetIsOffline(DataProtocol reciveMsg);
  6. void targetIsOnline(String clientIp);
  7. }

上述代码中处理了几种情况下的异常,比如,建立连接后,服务端停止运行,此时客户端的输入流还在阻塞状态,怎么保证客户端不抛出异常,这些处理可以结合SocketUtil类来看。

四、调用封装

1. 客户端

import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;

  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  2. /**
  3. * Created by meishan on 16/12/1.
  4. */
  5. public class ConnectionClient {
  6. private boolean isClosed;
  7. private ClientRequestTask mClientRequestTask;
  8. public ConnectionClient(RequestCallBack requestCallBack) {
  9. mClientRequestTask = new ClientRequestTask(requestCallBack);
  10. new Thread(mClientRequestTask).start();
  11. }
  12. public void addNewRequest(DataProtocol data) {
  13. if (mClientRequestTask != null && !isClosed)
  14. mClientRequestTask.addRequest(data);
  15. }
  16. public void closeConnect() {
  17. isClosed = true;
  18. mClientRequestTask.stop();
  19. }
  20. }

2. 服务端

  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
  2. import java.io.IOException;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. /**
  8. * Created by meishan on 16/12/1.
  9. */
  10. public class ConnectionServer {
  11. private static boolean isStart = true;
  12. private static ServerResponseTask serverResponseTask;
  13. public ConnectionServer() {
  14. }
  15. public static void main(String[] args) {
  16. ServerSocket serverSocket = null;
  17. ExecutorService executorService = Executors.newCachedThreadPool();
  18. try {
  19. serverSocket = new ServerSocket(Config.PORT);
  20. while (isStart) {
  21. Socket socket = serverSocket.accept();
  22. serverResponseTask = new ServerResponseTask(socket,
  23. new ResponseCallback() {
  24. @Override
  25. public void targetIsOffline(DataProtocol reciveMsg) {// 对方不在线
  26. if (reciveMsg != null) {
  27. System.out.println(reciveMsg.getData());
  28. }
  29. }
  30. @Override
  31. public void targetIsOnline(String clientIp) {
  32. System.out.println(clientIp + " is onLine");
  33. System.out.println("-----------------------------------------");
  34. }
  35. });
  36. if (socket.isConnected()) {
  37. executorService.execute(serverResponseTask);
  38. }
  39. }
  40. serverSocket.close();
  41. } catch (IOException e) {
  42. e.printStackTrace();
  43. } finally {
  44. if (serverSocket != null) {
  45. try {
  46. isStart = false;
  47. serverSocket.close();
  48. if (serverSocket != null)
  49. serverResponseTask.stop();
  50. } catch (IOException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }
  55. }
  56. }

总结

实现自定义协议的关键在于协议的拼装和解析,上述已给出了关键的代码,如果需要查看完整的代码以及demo,可以下载源码

注意:先运行服务端demo的main函数,再查看本机的ip地址,然后修改客户端(android)代码中Config.java里面的ip地址,当然,要确保android手机和服务端在同一个局域里面,最后再打开客户端。