القائمة الرئيسية

الصفحات

ماهو MQTT و كيفية "استخدام MQTT" في جافا

What is MQTT and How to "Use MQTT" in Java،MqttCallback،ماهو MQTT و كيفية استخدام MQTT في جافا،ماهو MQTT و كيفية "استخدام MQTT" في جافا،MQTT ،Dependency،ماهو MQTT و كيفية "استخدام MQTT" في جافا،How to Use MQTT in Java،كيفية استخدام MQTT في جافا،كيفية استخدام MQTT،جافا،What is MQTT and How to "Use MQTT" in Java،




ماهو MQTT و كيفية "استخدام MQTT" في جافا



توضح هذه المقالة كيفية استخدام MQTT في مشروع Java لتحقيق وظائف
الاتصال والاشتراك وإلغاء الاشتراك ونشر واستلام الرسائل.

MQTT هو بروتوكول مراسلة قياسي من OASIS لإنترنت الأشياء (IoT). 
إنه مصمم كوسيلة نقل رسائل نشر / اشتراك خفيفة الوزن للغاية ، وهو 
مثالي لتوصيل الأجهزة البعيدة ببصمة رمز صغيرة وعرض نطاق ترددي للشبكة ضئيل.
 تستخدم MQTT اليوم في مجموعة متنوعة من الصناعات ، مثل السيارات
 والتصنيع والاتصالات والنفط والغاز ، إلخ.
تقدم هذه المقالة كيفية استخدام MQTT في مشروع Java لتحقيق وظائف الاتصال
 والاشتراك وإلغاء الاشتراك ونشر واستلام الرسائل بين العميل والوسيط.


كيفية استخدام "MQTT" في جافا



1- أضف التبعية "Dependency"

بيئة التطوير لهذه المقالة هي:
- أداة البناء: Maven
- IDE: IntelliJ IDEA
- جافا: JDK 1.8.0
سنستخدم Eclipse Paho Java Client كعميل ، وهو أكثر
 مكتبة عميل MQTT استخدامًا في لغة Java.

أضف التبعيات التالية إلى pom.xml الملف:

<dependencies>
   <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.5</version>
   </dependency>
</dependencies>



2- قم بإنشاء اتصال MQTT

* وسيط MQTT
ستستخدم هذه المقالة وسيط MQTT العام الذي تم إنشاؤه بناءً على 
EMQX Cloud . معلومات الوصول إلى الخادم هي كما يلي:
- الوسيط: broker.emqx.io
- منفذ TCP: 1883
- منفذ SSL / TLS: 8883

*الاتصال
قم بتعيين معلمات الاتصال الأساسية لـ MQTT. اسم المستخدم وكلمة المرور اختياريان.

String broker = "tcp://broker.emqx.io:1883";
// TLS/SSL
// String broker = "ssl://broker.emqx.io:8883";
String username = "emqx";
String password = "public";
String clientid = "publish_client";

ثم قم بإنشاء عميل MQTT واتصل بالوسيط.

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);




التعليمات:

- MqttClient: 
يوفر MqttClient مجموعة من الطرق التي تحظر وتعيد التحكم إلى 
برنامج التطبيق بمجرد اكتمال إجراء MQTT.
- MqttClientPersistance: 
يمثل مخزن بيانات دائم يستخدم لتخزين الرسائل الصادرة والواردة أثناء الطيران ،
 مما يتيح التسليم إلى QoS المحدد.
- MqttConnectOptions: 
يحتوي على مجموعة الخيارات التي تتحكم في كيفية اتصال العميل بالخادم. فيما يلي بعض الطرق الشائعة:
- setUserName:
 يضبط اسم المستخدم المراد استخدامه للاتصال.
- setPassword:
 لتعيين كلمة المرور لاستخدامها في الاتصال.
- setCleanSession:
 يحدد ما إذا كان يجب على العميل والخادم تذكر الحالة عبر عمليات إعادة التشغيل وإعادة الاتصال.
- setKeepAliveInterval:
 يضبط الفاصل الزمني "البقاء على قيد الحياة". 
- setConnectionTimeout:
 يضبط قيمة مهلة الاتصال. 
- setAutomaticReconnect: 
تعيين ما إذا كان العميل سيحاول تلقائيًا إعادة الاتصال بالخادم في حالة فقد الاتصال.



3-  الاتصال بـ TLS / SSL

إذا كنت تريد استخدام شهادة موقعة ذاتيًا لاتصالات TLS / SSL ، 
فأضف bcpkix-jdk15on إلى pom.xml الملف:

<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
<dependency>
   <groupId>org.bouncycastle</groupId>
   <artifactId>bcpkix-jdk15on</artifactId>
   <version>1.70</version>
</dependency>







ثم قم بإنشاء SSLUtils.javaالملف بالرمز التالي:

package io.emqx.mqtt;

import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

public class SSLUtils {
   public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                   final String crtFile, final String keyFile, final String password)
           throws Exception {
       Security.addProvider(new BouncyCastleProvider());

       // load CA certificate
       X509Certificate caCert = null;

       FileInputStream fis = new FileInputStream(caCrtFile);
       BufferedInputStream bis = new BufferedInputStream(fis);
       CertificateFactory cf = CertificateFactory.getInstance("X.509");

       while (bis.available() > 0) {
           caCert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client certificate
       bis = new BufferedInputStream(new FileInputStream(crtFile));
       X509Certificate cert = null;
       while (bis.available() > 0) {
           cert = (X509Certificate) cf.generateCertificate(bis);
      }

       // load client private key
       PEMParser pemParser = new PEMParser(new FileReader(keyFile));
       Object object = pemParser.readObject();
       JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
       KeyPair key = converter.getKeyPair((PEMKeyPair) object);
       pemParser.close();

       // CA certificate is used to authenticate server
       KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
       caKs.load(null, null);
       caKs.setCertificateEntry("ca-certificate", caCert);
       TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
       tmf.init(caKs);

       // client key and certificates are sent to server so it can authenticate
       KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
       ks.load(null, null);
       ks.setCertificateEntry("certificate", cert);
       ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
               new java.security.cert.Certificate[]{cert});
       KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
              .getDefaultAlgorithm());
       kmf.init(ks, password.toCharArray());

       // finally, create SSL socket factory
       SSLContext context = SSLContext.getInstance("TLSv1.2");
       context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

       return context.getSocketFactory();
  }
}






حدد options ما يلي:


String broker = "ssl://broker.emqx.io:8883";
// Set socket factory
String caFilePath = "/cacert.pem";
String clientCrtFilePath = "/client.pem";
String clientKeyFilePath = "/client.key";
SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
options.setSocketFactory(socketFactory);


4- نشر رسائل MQTT

قم بإنشاء فصل PublishSampleدراسي يقوم بنشر Hello MQTT رسالة إلى الموضوع mqtt/test:


package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PublishSample {

   public static void main(String[] args) {

       String broker = "tcp://broker.emqx.io:1883";
       String topic = "mqtt/test";
       String username = "emqx";
       String password = "public";
       String clientid = "publish_client";
       String content = "Hello MQTT";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           MqttConnectOptions options = new MqttConnectOptions();
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // connect
           client.connect(options);
           // create message and setup QoS
           MqttMessage message = new MqttMessage(content.getBytes());
           message.setQos(qos);
           // publish message
           client.publish(topic, message);
           System.out.println("Message published");
           System.out.println("topic: " + topic);
           System.out.println("message content: " + content);
           // disconnect
           client.disconnect();
           // close client
           client.close();
      } catch (MqttException e) {
           throw new RuntimeException(e);
      }
  }
}






5- الإشتراك

قم بإنشاء فصل SubscribeSample دراسي يشترك في الموضوع mqtt/test:


package io.emqx.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubscribeSample {
   public static void main(String[] args) {
       String broker = "tcp://broker.emqx.io:1883";
       String topic = "mqtt/test";
       String username = "emqx";
       String password = "public";
       String clientid = "subscribe_client";
       int qos = 0;

       try {
           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
           // connect options
           MqttConnectOptions options = new MqttConnectOptions();
           options.setUserName(username);
           options.setPassword(password.toCharArray());
           options.setConnectionTimeout(60);
      options.setKeepAliveInterval(60);
           // setup callback
           client.setCallback(new MqttCallback() {

               public void connectionLost(Throwable cause) {
                   System.out.println("connectionLost: " + cause.getMessage());
              }

               public void messageArrived(String topic, MqttMessage message) {
                   System.out.println("topic: " + topic);
                   System.out.println("Qos: " + message.getQos());
                   System.out.println("message content: " + new String(message.getPayload()));

              }

               public void deliveryComplete(IMqttDeliveryToken token) {
                   System.out.println("deliveryComplete---------" + token.isComplete());
              }

          });
           client.connect(options);
           client.subscribe(topic, qos);
      } catch (Exception e) {
           e.printStackTrace();
      }
  }
}


6- الرد MqttCallback:

- ConnectionLost (Throwable reason):
 يتم استدعاء هذه الطريقة عند فقد الاتصال بالخادم.
- وصول الرسالة (موضوع سلسلة ، رسالة MqttMessage):
 يتم استدعاء هذه الطريقة عند وصول رسالة من الخادم.
- deliveryComplete (رمز IMqttDeliveryToken المميز):
 يتم الاتصال به عند اكتمال تسليم رسالة واستلام جميع إقرارات الاستلام.

بعد ذلك ، قم بتشغيل SubscribeSample للاشتراك في mqtt/test الموضوع. 
ثم قم بتشغيل PublishSample لنشر الرسالة حول mqtt/test الموضوع. 
سنرى أن الناشر ينشر الرسالة بنجاح ويستلمها المشترك.
الكود الكامل متاح على جيثب github .


 
جدول المحتويات