基于Mqtt的即时通信

时间:2021-10-15 13:55:26
Mqtt是IBM开发的开源的及时通信协议,基于Tcp通信协议

/**
* Created by 95 on 2016/5/18.
*/
public class MyMqttClient {
private boolean cleanSession = true;//表示是否清除session
private int timeout = 10;表示连接超时时间单位为秒
private int keepalive = 30;表示30秒发送一次心跳包,保持长连接
private static int qos = 2;表示发送质量
private static boolean retained = false;表示是否保持记录
public static MyMqttClient myMqttClient;
private Context context;
private final MqttAndroidClient mqttAndroidClient;
private final MqttConnectOptions connectOptions;
private MyMqttListener myPublishListener;

public static MyMqttClient getInstance() {
if (myMqttClient == null) {
myMqttClient = new MyMqttClient();
}
return myMqttClient;
}

public static MyMqttClient getMyMqttClient() {
return myMqttClient;
}

private MyMqttClient() {
String ClientId = MyApplication.getInstance().getCurrentAccount().getClientId();
Log.e(ApplicationParams.TAG, "clientid===" + ClientId);
context = MyApplication.getInstance().getContext();
mqttAndroidClient = new MqttAndroidClient(context, ApplicationParams.CONNECTMQTTURI, ClientId);创建mqtt客户端连接
connectOptions = new MqttConnectOptions();创建mqtt属性
SslUtility.newInstance(context);
connectOptions.setSocketFactory(SslUtility.getInstance().getSocketFactory(R.raw.kalamodo, ApplicationParams.SSLSECRET));
connectOptions.setCleanSession(cleanSession);
connectOptions.setConnectionTimeout(timeout);
connectOptions.setKeepAliveInterval(keepalive);

}

public static MyMqttClient CreateNewClient() {

return new MyMqttClient();

}

public void ClearMqtt() {
myMqttClient = null;
}

/**
* 连接Mqtt
*/
public void ConnectMqtt(String username, String pwd, MyLogginListener myLogginListener) {
try {

Log.e(ApplicationParams.TAG, "username==" + username + "pwd==" + pwd);
MyMqttListener myMqttListener = new MyMqttListener(context, MyMqttListener.Action.CONNECT);
mqttAndroidClient.setCallback(MyMqttCallback.getInstance());
connectOptions.setUserName(username);设置用户名或密码,如果服务器端不设置,那就不需要
connectOptions.setPassword(pwd.toCharArray());
myMqttListener.setOnLogginListener(myLogginListener);
mqttAndroidClient.connect(connectOptions, null, myMqttListener);
} catch (MqttException e) {
Log.e(ApplicationParams.TAG, "连接异常");
e.printStackTrace();
}
}

public void setOnAiConnectListener(AiConnecttion onAiConnectListener) {
MyMqttCallback.getInstance().setAiConnecttion(onAiConnectListener);
}

/**
* 关闭释放资源
*/
public void CloseMqtt() {
mqttAndroidClient.unregisterResources();
// mqttAndroidClient.close();
}

public void disconnect(IMqttActionListener iMqttActionListener) throws MqttException {
mqttAndroidClient.disconnect(null, iMqttActionListener);
}

/**
* 断开连接
*
* @param
*/
public void disconnect() {
try {
mqttAndroidClient.disconnect();
Log.e("Kavenir", "断开连接");
} catch (MqttException e) {
Log.v("kavenir", "client disconnect error");
}
}

/**
* 发布
*/
public void publish(String topic, byte[] payload, MyPublishListener PublishListener) {
try {
if (myPublishListener == null) {
myPublishListener = new MyMqttListener(context, MyMqttListener.Action.PUBLISH);
}
myPublishListener.setOnPublishListener(PublishListener);
Log.e(ApplicationParams.TAG, "设置publish监听");
mqttAndroidClient.registerResources(context);
mqttAndroidClient.publish(topic, payload, qos, retained, null, myPublishListener);
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* 订阅主题
*/
public void subscribe(String topic) {
try {
mqttAndroidClient.subscribe(topic, 2, null, new MyMqttListener(context, MyMqttListener.Action.SUBSCRIBE));
} catch (MqttException e) {
Log.v("kavenir", "client subscribe error");
}
}

/**
* 判断是否连接
*
* @return
*/
public boolean isConnected() {
if (mqttAndroidClient != null) {
// try {
Log.e(ApplicationParams.TAG, "isConnected===" + mqttAndroidClient.isConnected());
return mqttAndroidClient.isConnected();
// }catch (Exception e){
// return false;
// }
} else {
return false;
}
}
}

//接收消息的回调接口


/*******************************************************************************
* Copyright (c) 1999, 2014 IBM Corp.
* <p/>
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
* <p/>
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*/
package com.kalamodo.kavenir.mqtt;

import android.app.Activity;
import android.content.Context;
import android.content.DialogInterface;
import android.content.Intent;
import android.content.SharedPreferences;
import android.location.Location;
import android.location.LocationManager;
import android.net.Uri;
import android.os.Bundle;
import android.os.Message;
import android.os.Vibrator;
import android.telephony.SmsManager;
import android.text.TextUtils;
import android.util.Log;
import android.widget.Toast;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.kalamodo.kavenir.MyApplication;
import com.kalamodo.kavenir.activity.LoginActivity;
import com.kalamodo.kavenir.activity.ShowResultWebViewActivity;
import com.kalamodo.kavenir.bluetooth.BlueToothRecoder;
import com.kalamodo.kavenir.bluetooth.BlueToothUtils;
import com.kalamodo.kavenir.db.DbOperate;
import com.kalamodo.kavenir.db.FriendDao;
import com.kalamodo.kavenir.db.TopicDao;
import com.kalamodo.kavenir.domain.db.Friend;
import com.kalamodo.kavenir.domain.db.Messager;
import com.kalamodo.kavenir.domain.db.MuiltMediaItemBean;
import com.kalamodo.kavenir.domain.db.MulitMediaList;
import com.kalamodo.kavenir.listener.MyLocationListener;
import com.kalamodo.kavenir.receiver.PushReceiver;
import com.kalamodo.kavenir.thread.ThreadManager;
import com.kalamodo.kavenir.utils.MqttAgreement;
import com.kalamodo.kavenir.utils.common.ApplicationParams;
import com.kalamodo.kavenir.utils.common.EventBusConstant;
import com.kalamodo.kavenir.utils.common.Json_user;
import com.kalamodo.kavenir.utils.common.SpUtils;
import com.kalamodo.kavenir.utils.common.Utils;
import com.kalamodo.kavenir.utils.voicetrans.TxtSwitchUtils;
import com.lidroid.xutils.HttpUtils;
import com.lidroid.xutils.exception.HttpException;
import com.lidroid.xutils.http.RequestParams;
import com.lidroid.xutils.http.ResponseInfo;
import com.lidroid.xutils.http.callback.RequestCallBack;
import com.lidroid.xutils.http.client.HttpRequest;

import org.apache.http.entity.StringEntity;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import de.greenrobot.event.EventBus;

/**
* Handles call backs from the MQTT Client
*/
public class MyMqttCallback implements MqttCallback {
private AiConnecttion aiConnecttion;
private final Context context;
SharedPreferences callback_sp;
FriendDao friendDao;
TopicDao topicDao;
private Intent messagerintent = new Intent(PushReceiver.KEY_FROM);
;
private Intent messager = new Intent(PushReceiver.MESSAGER_ACTION);

private android.os.Handler handler = new android.os.Handler() {
@Override
public void handleMessage(Message msg) {
Utils.showToast(context, "小主人,获取您的联系人失败,可能未获取权限哟");

super.handleMessage(msg);
}
};

private MyMqttCallback() {
context = MyApplication.getInstance().getContext();

}

public static MyMqttCallback myMqttCallback;

public static MyMqttCallback getInstance() {
if (myMqttCallback == null) {
myMqttCallback = new MyMqttCallback();
}
return myMqttCallback;
}

public void setAiConnecttion(AiConnecttion connecttion) {
aiConnecttion = connecttion;
}

@Override
public void connectionLost(Throwable throwable) {
if (throwable == null) {
Log.e("kavenir", "主动断开连接");
MyMqttClient.getInstance().CloseMqtt();
} else {
MyMqttClient.getInstance().CloseMqtt();
}
}


@Override
public void messageArrived(String Topic, MqttMessage mqttMessage) throws Exception {
//topic 为对方和己方共同的房间号,
byte[] mqttMessage.getPaload();//获取消息内容的消息体,格式为byte[]数组        }
package com.kalamodo.kavenir.mqtt;import android.content.Context;import android.net.wifi.p2p.WifiP2pManager.ActionListener;import android.util.Log;import org.eclipse.paho.client.mqttv3.IMqttActionListener;import org.eclipse.paho.client.mqttv3.IMqttToken;import java.sql.Connection;/** * This Class handles receiving information from the {@link } * and updating the {@link Connection} associated with the action 这个类是用来监听登录成功,或者发布成功等等监听的类 */public class MyMqttListener implements IMqttActionListener {private MyLogginListener logginListener;private Context context;private MyPublishListener myPublishListener;private DisConnectionListener disConnectionListener;private static MyMqttListener myMqttListener;/** * Actions that can be performed Asynchronously <strong>and</strong> * associated with a {@link ActionListener} object *  */public enum Action {/** Connect Action **/CONNECT,/** Disconnect Action **/DISCONNECT,/** Subscribe Action **/SUBSCRIBE,/** Publish Action **/PUBLISH}/** * The {@link Action} that is associated with this instance of * <code>ActionListener</code> **/private Action action;/** The arguments passed to be used for formatting strings **/// private String[] additionalArgs;/** Handle of the {@link Connection} this action was being executed on **/// private String clientHandle;/** {@link Context} for performing various operations **//** * Creates a generic action listener for actions performed form any activity *  * @param context *            The application context * @param action *            The action that is being performed * @param clientHandle *            The handle for the client which the action is being performed *            on * @param additionalArgs *            Used for as arguments for string formating *//** * 登录监听 *  * @param */public void setOnLogginListener(MyLogginListener logginListener) {this.logginListener = logginListener;}/** * 发布监听 * @param onPublishListener */public void setOnPublishListener(MyPublishListener onPublishListener){this.myPublishListener=onPublishListener;}/** * 断开监听 * @param onDisConnectionListener */public MyMqttListener setOnDisConnectionListener(DisConnectionListener onDisConnectionListener){disConnectionListener=onDisConnectionListener;return  myMqttListener;}public static MyMqttListener getMyMqttListener(Context context, Action action){myMqttListener = new MyMqttListener( context,action);return myMqttListener;}public MyMqttListener(Context context, Action action) {this.context = context;this.action = action;}/** * The action associated with this listener has been successful. *  * @param asyncActionToken *            This argument is not used */@Overridepublic void onSuccess(IMqttToken asyncActionToken) {switch (action) {case CONNECT:connect(asyncActionToken);break;case DISCONNECT:disconnect();break;case SUBSCRIBE:subscribe();break;case PUBLISH:publish();break;}}/** * A publish action has been successfully completed, update connection * object associated with the client this action belongs to, then notify the * user of success */private void publish() {if(myPublishListener!=null){myPublishListener.Success();}}/** * A subscribe action has been successfully completed, update the connection * object associated with the client this action belongs to and then notify * the user of success */private void subscribe() {Log.v("kavenir", "subscribe");}/** * A disconnection action has been successfully completed, update the * connection object associated with the client this action belongs to and * then notify the user of success. */private void disconnect() {Log.e("kavenir", "Client disConnected");if(disConnectionListener!=null){disConnectionListener.DisConnectionSuccess();}}/** * A connection action has been successfully completed, update the * connection object associated with the client this action belongs to and * then notify the user of success. */private void connect(IMqttToken asyncActionToken) {Log.v("kavenir", "Client Connected");if (logginListener != null) {logginListener.Sucess(asyncActionToken);}}/** * The action associated with the object was a failure *  * @param token *            This argument is not used * @param exception *            The exception which indicates why the action failed */@Overridepublic void onFailure(IMqttToken token, Throwable exception) {switch (action) {case CONNECT:connect(exception);break;case DISCONNECT:disconnect(exception);break;case SUBSCRIBE:subscribe(exception);break;case PUBLISH:publish(exception);break;}}/** * A publish action was unsuccessful, notify user and update client history *  * @param exception *            This argument is not used */private void publish(Throwable exception) {Log.v("kavenir", "publish Failed - an error occured");if(myPublishListener!=null){myPublishListener.Filure();}}/** * A subscribe action was unsuccessful, notify user and update client * history *  * @param exception *            This argument is not used */private void subscribe(Throwable exception) {Log.v("kavenir", "subscribe Failed - an error occured");}/** * A disconnect action was unsuccessful, notify user and update client * history *  * @param exception *            This argument is not used */private void disconnect(Throwable exception) {Log.v("kavenir", "Disconnect Failed - an error occured");if(disConnectionListener!=null){disConnectionListener.DisConnectionFailure();}}/** * A connect action was unsuccessful, notify the user and update client * history *  * @param *            This argument is not used */private void connect(Throwable exception) {if(logginListener!=null){logginListener.Failure(exception);}Log.e("kavenir", "Client failed to connect");}}