作者:wildma,連結:https://www.jianshu.com/p/73436a5cf855
1 前言年初做了一款Android TV 應用,用到了MQTT。主要實現的是類似一些景區利用大屏幕實時顯示景點人數,超過人數就不允許進入。即利用閘機設備監控到進景區的遊客,然後通過MQTT將消息發送給大屏幕,最後大屏幕實時顯示景區人數,並響應一個消息通知閘機設備已經收到了它發過來的消息(確保消息到達)。這篇文章會模擬真實的使用流程進行講解,即閘機發布消息——伺服器(代理)收到消息轉發給大屏幕——大屏幕收到消息後響應回去(發布消息)——伺服器收到消息轉發給閘機設備。
2 關於MQTT2.1 簡介MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議。它是一種發布/訂閱,極其簡單和輕量級的消息傳遞協議,專為受限設備和低帶寬,高延遲或不可靠的網絡而設計。它的設計思想是輕巧、開放、簡單、規範,易於實現。這些特點使得它對很多場景來說都是很好的選擇,特別是對於受限的環境如機器與機器的通信(M2M)以及物聯網環境。相對於XMPP,MQTT更加輕量級,並且佔用的寬帶低。
2.2 特點MQTT協議有以下特點:
使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程式耦合。qos為0:「至多一次」,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久後還會有第二次發送。qos為1:「至少一次」,確保消息到達,但消息重複可能會發生。這一級別可用於如下情況,你需要獲得每一條消息,並且消息重複發送對你的使用場景無影響。qos為2:「只有一次」,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重複或丟失會導致不正確的結果。小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量。使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。2.3 MQTT體系結構該體系結構圖是結合文章開頭說的例子畫出來的,能很好的描述MQTT在實際運用中的三種身份。即進景區入口配置一臺閘機設備作為發布者(Publisher),當閘機設備監控到有遊客進入的時候會發布一個帶主題(Topic)的消息(例如主題為「tourist_enter」)給伺服器(MQTT-Broker),當伺服器接收到發布過來的消息後,會進行基於主題的過濾,將消息轉發給訂閱了該主題的訂閱者。而景區大屏幕作為訂閱者(Subscriber),訂閱的主題也是「tourist_enter」,這樣就能接收到伺服器轉發過來的消息,收到消息後在大屏幕上實時顯示當前景區人數即可。
該結構圖中的閘機設備和大屏幕都是客戶端,都可以進行發布和訂閱。例如大屏幕收到消息後也可以發布一個消息通知閘機設備已經收到了它發過來的消息。
3 MQTT伺服器搭建想要使用MQTT,首先需要搭建一個MQTT的伺服器(在公司一般是後臺人員負責搭建)。一般前端人員為了方便測試都會先使用第三方提供的伺服器,官方推薦了很多種伺服器,我這裡選用的是Apollo(屬於Apache ActiveMQ)。
1. 下載、解壓點擊下載地址,選擇最適合你的作業系統的版本進行下載,我這裡用的是Windows,進行如下選擇:
下載後進行解壓,我這裡解壓到D盤根目錄下(D:\apache-apollo-1.7.1)。
2. 創建伺服器實例命令行進入解壓文件的bin目錄下(例如:cd D:\apache-apollo-1.7.1\bin),然後輸入apollo create mybroker(其中mybroker為自定義的伺服器名稱)創建伺服器實例。具體如下圖:
之後會在bin目錄下生成mybroker文件夾,其中mybroker文件夾下的etc\apollo.xml文件下是配置伺服器信息的文件,etc\users.properties文件包含連接MQTT伺服器時用到的用戶名和密碼,注意這裡只能修改密碼(發現很多博客在沒有驗證的情況下就說用戶名和密碼都在這裡修改),如果要修改用戶名需要到etc\groups.properties文件下去修改。etc\groups.properties文件下的用戶名與etc\users.properties文件下的密碼是一一對應的,如下表示一個組中配置了兩個用戶分別是admin與wildma,然後這兩個用戶名對應的密碼分別是password與123456
3. 開啟伺服器進入mybroker文件夾下的bin目錄下,輸入apollo-broker.cmd run開啟伺服器。看到如下界面表示開啟成功。
4. 驗證是否安裝成功最後在瀏覽器輸入http://127.0.0.1:61680/,能成功打開界面就表示安裝成功了。可以用上面配置的兩個用戶名進行登錄。
4 調試MQTT的客戶端——mqttfx 的使用為了方便調試MQTT,我這裡選用mqttfx作為閘機設備客戶端。具體使用如下:
1. 下載點擊下載地址,選擇最適合你的作業系統的版本進行下載。如下圖:
2. 安裝下載後一路點擊下一步即可安裝成功,安裝成功後打開軟體界面。如下圖:
3. 配置點擊上圖中的設置,添加一個新的配置文件。分別填寫配置文件名稱、伺服器地址(由於伺服器就是本機,所以這裡用本機的IP位址即可,ipconfig/all可獲取IP位址)、埠號(開啟伺服器後會顯示接受連接的地址:Accepting connections at: tcp://0.0.0.0:61613,用這裡的埠號61613即可,見上文中「開啟伺服器」後的圖片)、用戶名、密碼,點擊OK即可。如下圖:
4. 訂閱消息選擇剛剛添加的配置文件「閘機設備」,點擊"Connect"連接伺服器。點擊「Subscribe」,設置一個Topic(例如tourist_enter),點擊Topic右側的「Subscribe」進行消息訂閱。如下圖:
5. 發布消息點擊「Publish」,輸入剛剛訂閱的Topic (tourist_enter),輸入需要發布的消息內容(tourist enter),點擊Topic右側的「Publish」進行消息發布。如下圖:
再返回訂閱界面就能看到剛剛發布的消息,如下圖:
5 Android中MQTT的使用Android中使用MQTT需要使用到Paho Android Service庫,Paho Android Service是一個用Java編寫的MQTT客戶端庫。GitHub地址:https://github.com/eclipse/paho.mqtt.android
5.1 集成在module的build.gradle文件中添加依賴
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}}
在 AndroidManifest.xml 添加限權
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />在 AndroidManifest.xml 註冊Service (MyMqttService為自己寫的服務,下文會講到)
<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
<service android:name="com.dongyk.service.MyMqttService"/> <!--MyMqttService-->
5.2 具體代碼Android中使用MQTT最主要的就是以下幾個方法:
connect:連接MQTT伺服器,這裡主要講3個參數的方法,如下:@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext,
IMqttActionListener callback) throws MqttException {
//...
}
參數options:用來攜帶連接伺服器的一系列參數,例如用戶名、密碼等。參數userContext:可選對象,用於向回調傳遞上下文。一般傳null即可。參數callback:用來監聽MQTT是否連接成功的回調publish:發布消息,這裡使用四個參數的方法,如下:@Override
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
boolean retained) throws MqttException, MqttPersistenceException {
//...
}
參數retained:是否在伺服器保留斷開連接後的最後一條消息subscribe:訂閱消息,這裡主要講2個參數的方法,如下:@Override
public IMqttToken subscribe(String topic, int qos) throws MqttException,
MqttSecurityException {
//...
}
5.2.2 MQTT服務——MyMqttService下面寫一個 Service 來實現MQTT在Android運用中的connect、publish、subscribe
package com.wildma.mqttandroidclient;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.support.annotation.RequiresApi;
import android.util.Log;
import android.widget.Toast;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Author wildma
* Github https://github.com/wildma
* CreateDate 2018/11/08
* Desc ${MQTT服務}
*/
public class MyMqttService extends Service {
public final String TAG = MyMqttService.class.getSimpleName();
private static MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
public String HOST = "tcp://192.168.0.102:61613";//伺服器地址(協議+地址+埠號)
public String USERNAME = "admin";//用戶名
public String PASSWORD = "password";//密碼
public static String PUBLISH_TOPIC = "tourist_enter";//發布主題
public static String RESPONSE_TOPIC = "message_arrived";//響應主題
@RequiresApi(api = 26)
public String CLIENTID = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
? Build.getSerial() : Build.SERIAL;//客戶端ID,一般以客戶端唯一標識符表示,這裡用設備序列號表示
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
/**
* 開啟服務
*/
public static void startService(Context mContext) {
mContext.startService(new Intent(mContext, MyMqttService.class));
}
/**
* 發布 (模擬其他客戶端發布消息)
*
* @param message 消息
*/
public static void publish(String message) {
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//參數分別為:主題、消息的字節數組、服務質量、是否在伺服器保留斷開連接後的最後一條消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 響應 (收到其他客戶端的消息後,響應給對方告知消息已到達或者消息有問題等)
*
* @param message 消息
*/
public void response(String message) {
String topic = RESPONSE_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//參數分別為:主題、消息的字節數組、服務質量、是否在伺服器保留斷開連接後的最後一條消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 初始化
*/
private void init() {
String serverURI = HOST; //伺服器地址(協議+地址+埠號)
mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //設置監聽訂閱消息的回調
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //設置是否清除緩存
mMqttConnectOptions.setConnectionTimeout(10); //設置超時時間,單位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //設置心跳包發送間隔,單位:秒
mMqttConnectOptions.setUserName(USERNAME); //設置用戶名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //設置密碼
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + CLIENTID + "\"}";
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
if ((!message.equals("")) || (!topic.equals(""))) {
// 最後的遺囑
try {
mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
Log.i(TAG, "Exception Occured", e);
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
/**
* 連接MQTT伺服器
*/
private void doClientConnection() {
if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 判斷網絡是否連接
*/
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "當前網絡名稱:" + name);
return true;
} else {
Log.i(TAG, "沒有可用網絡");
/*沒有可用網絡的時候,延遲3秒再嘗試重連*/
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
doClientConnection();
}
}, 3000);
return false;
}
}
//MQTT是否連接成功的監聽
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "連接成功 ");
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//訂閱主題,參數:主題、服務質量
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "連接失敗 ");
doClientConnection();//連接失敗,重連(可關閉伺服器進行模擬)
}
};
//訂閱主題的回調
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
//收到消息,這裡彈出Toast表示。如果需要更新UI,可以使用廣播或者EventBus進行發送
Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show();
//收到其他客戶端的消息後,響應給對方告知消息已到達或者消息有問題等
response("message arrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "連接斷開 ");
doClientConnection();//連接斷開,重連
}
};
@Override
public void onDestroy() {
try {
mqttAndroidClient.disconnect(); //斷開連接
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
}該 MyMqttService 類的大概邏輯就是開啟服務後,調用init()方法初始化各個參數,包括伺服器地址、用戶名、密碼等等,然後調用doClientConnection()方法連接MQTT伺服器,iMqttActionListener用來監聽MQTT是否連接成功,連接成功則訂閱主題。mqttCallback為訂閱主題的回調,收到消息後會執行該回調中的messageArrived()方法,拿到消息後進行UI更新,並調用response()方法響應給對方告知消息已到達或者消息有問題等。
5.2.3 開啟服務在MainActivity中開啟服務,這裡為了方便不做UI更新,所以就一行開啟服務的代碼,如下:
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
MyMqttService.startService(this); //開啟服務
}
}
6 模擬真實場景還是以文章開頭說的例子來講,現在拿mqttfx客戶端作為閘機設備,上面的Android代碼運行後作為大屏幕。
將大屏幕與伺服器連接即將大屏幕APK運行到Android TV上,沒有TV可以用Android手機代替。記得代碼中的發布主題設置為「tourist_enter」,響應主題設置為「message_arrived」。將閘機設備與伺服器連接選擇閘機設備——點擊連接——發布主題設置為「tourist_enter」,如下圖:切換到Subscribe界面——響應主題設置為「message_arrived」——點擊Subscribe按鈕進行訂閱,如下圖:
大屏幕收到消息這時候大屏幕收到伺服器轉發過來的消息,就會在大屏幕上顯示進場人數,並響應給對方告知消息已到達。代碼中為了簡單就彈個Toast表示,具體顯示就不貼圖了。
閘機設備收到消息這時候mqttfx切換到Subscribe界面就可以看到大屏幕響應回來的消息,如下:
如上流程就是大概模擬我在開發中用到的MQTT使用流程,當然我的真實項目並沒有那麼簡單,還包括各種數據和UI交互顯示。希望模擬這種真實的使用流程進行講解能讓各位更好的理解MQTT的使用,有不足的請指出。
項目地址:MqttAndroidClient:https://github.com/wildma/MqttAndroidClient
參考資料:
MQTT 101 – How to Get Started with the lightweight IoT ProtocolMQTT Client Library Encyclopedia – Paho Android ServiceAndroid APP必備高級功能,消息推送之MQTT