/******************************************************************************* * Copyright (c) 2012, 2020 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution *******************************************************************************/ #include "mqtta.h" #include "regfunc.h" static void *client_context; static MQTTAsync_connectOptions client_conn_opts = MQTTAsync_connectOptions_initializer; void connlost(void *context, char *cause) { printf(" Connection lost cause: %s\n\tReconnecting .... \n", cause); myReconnctMQTT(context); } void onConFailure(void *context, MQTTAsync_failureData *response) { printf("Connect failed, rc %d\n", response ? response->code : 0); myReconnctMQTT(context); } //重连之后订阅 20201229新增 void onConnect(void* context, MQTTAsync_successData* response) { unuse(context); unuse(response); log("Successful connection\n"); // 注册Topic regTopicFromTable(); } void myReconnctMQTT(void *contxt) { MQTTAsync client = (MQTTAsync)contxt; MQTTAsync_connectOptions *conn_opts = &client_conn_opts; int rc; int count = 100; conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g; conn_opts->cleansession = 0; //新增重连订阅 conn_opts->onSuccess = onConnect; do { sleep(3); rc = MQTTAsync_connect(client, conn_opts); printf(" ret=%d Reconncting MQTT.Server \n", rc); --count; } while ((rc != MQTTASYNC_SUCCESS) && (0 != count)); } int my_mqqta_sendmsg(char *topicName, MQTTAsync_message *pubmsg) { MQTTAsync client = (MQTTAsync)client_context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; opts.onFailure = onConFailure; opts.context = client; if ((rc = MQTTAsync_sendMessage(client, topicName, pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); return (EXIT_FAILURE); } debug("Message Send: topic:%s \n", topicName); return rc; } // 注意该回调函数必须返回 1 (此时必须 free (topic和message)); 如果ret=0, 表示错误,会触发上层重传处理,此时不能进行 free操作 int my_mqqta_recvmsg(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { UNUSED(context); UNUSED(topicName); UNUSED(topicLen); //debug("Message arrived: topic: %s topic.len=%d payload.len=%d contxt=%p\n", topicName, topicLen, message->payloadlen,context); // 解码消息,实际的业务逻辑函数 decode_msg_handle(topicName, message); MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } // 订阅topic int my_subsribe_topic(char *topic, int qos) { MQTTAsync client = (MQTTAsync)client_context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; log("Subscribing to topic %s for client %s using QoS%d \n", topic, MQTT_CLIENTID_g, qos); //opts.onFailure = onSubscribeFailure; opts.context = client; //LIBMQTT_API int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response); rc = MQTTAsync_subscribe(client, topic, qos, &opts); log("topic:%s,rc :%d \n",topic,rc); return rc; } int my_unsubsribe_topic(char *topic, int qos) { UNUSED(qos); MQTTAsync client = (MQTTAsync)client_context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; log("unSubscribing to topic %s for client %s using QoS%d\n", topic, MQTT_CLIENTID_g, qos); //opts.onFailure = onUnSubscribeFailure; opts.context = client; //LIBMQTT_API int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response); rc = MQTTAsync_unsubscribe(client, topic, &opts); return rc; } int init_mqtt_client() { MQTTAsync client; MQTTAsync_connectOptions *conn_opts = &client_conn_opts; int rc; if ((rc = MQTTAsync_create(&client_context, MQTT_SVR_IP_g, MQTT_CLIENTID_g, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to create client object, return code %d\n", rc); exit(EXIT_FAILURE); } client = client_context; if ((rc = MQTTAsync_setCallbacks(client, client, connlost, my_mqqta_recvmsg, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callback, return code %d\n", rc); exit(EXIT_FAILURE); } conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g; conn_opts->cleansession = 1; //conn_opts->onSuccess = regTopicFromTable; conn_opts->onFailure = onConFailure; conn_opts->context = client; //conn_opts->minRetryInterval = 1; // conn_opts->maxRetryInterval = 64; //注意是指针赋值 , 小心一点 conn_opts->username = MQTT_USERNAME_g; conn_opts->password = MQTT_PWD_g; if ((rc = MQTTAsync_connect(client, conn_opts)) != MQTTASYNC_SUCCESS) { printf("* 1: Connection refused: Unacceptable protocol version
\ * 2: Connection refused: Identifier rejected
\ * 3: Connection refused: Server unavailable
\ * 4: Connection refused: Bad user name or password
\ * 5: Connection refused: Not authorized
\n"); printf("Failed to start connect, return code %d\n", rc); exit(EXIT_FAILURE); } sleep(2); assert(MQTTASYNC_TRUE == MQTTAsync_isConnected(client)); //MQTTAsync_destroy(&client); return rc; }