/*******************************************************************************
* Copyright (c) 2012, 2020 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include "mqtta.h"
#include "regfunc.h"
static void *client_context;
static MQTTAsync_connectOptions client_conn_opts = MQTTAsync_connectOptions_initializer;
void connlost(void *context, char *cause)
{
printf(" Connection lost cause: %s\n\tReconnecting .... \n", cause);
myReconnctMQTT(context);
}
void onConFailure(void *context, MQTTAsync_failureData *response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
myReconnctMQTT(context);
}
//重连之后订阅 20201229新增
void onConnect(void* context, MQTTAsync_successData* response)
{
unuse(context);
unuse(response);
log("Successful connection\n");
// 注册Topic
regTopicFromTable();
}
void myReconnctMQTT(void *contxt)
{
MQTTAsync client = (MQTTAsync)contxt;
MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
int rc;
int count = 100;
conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
conn_opts->cleansession = 0;
//新增重连订阅
conn_opts->onSuccess = onConnect;
do
{
sleep(3);
rc = MQTTAsync_connect(client, conn_opts);
printf(" ret=%d Reconncting MQTT.Server \n", rc);
--count;
} while ((rc != MQTTASYNC_SUCCESS) && (0 != count));
}
int my_mqqta_sendmsg(char *topicName, MQTTAsync_message *pubmsg)
{
MQTTAsync client = (MQTTAsync)client_context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
opts.onFailure = onConFailure;
opts.context = client;
if ((rc = MQTTAsync_sendMessage(client, topicName, pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
return (EXIT_FAILURE);
}
debug("Message Send: topic:%s \n", topicName);
return rc;
}
// 注意该回调函数必须返回 1 (此时必须 free (topic和message)); 如果ret=0, 表示错误,会触发上层重传处理,此时不能进行 free操作
int my_mqqta_recvmsg(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
UNUSED(context);
UNUSED(topicName);
UNUSED(topicLen);
//debug("Message arrived: topic: %s topic.len=%d payload.len=%d contxt=%p\n", topicName, topicLen, message->payloadlen,context);
// 解码消息,实际的业务逻辑函数
decode_msg_handle(topicName, message);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
// 订阅topic
int my_subsribe_topic(char *topic, int qos)
{
MQTTAsync client = (MQTTAsync)client_context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
log("Subscribing to topic %s for client %s using QoS%d \n", topic, MQTT_CLIENTID_g, qos);
//opts.onFailure = onSubscribeFailure;
opts.context = client;
//LIBMQTT_API int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
rc = MQTTAsync_subscribe(client, topic, qos, &opts);
log("topic:%s,rc :%d \n",topic,rc);
return rc;
}
int my_unsubsribe_topic(char *topic, int qos)
{
UNUSED(qos);
MQTTAsync client = (MQTTAsync)client_context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
log("unSubscribing to topic %s for client %s using QoS%d\n", topic, MQTT_CLIENTID_g, qos);
//opts.onFailure = onUnSubscribeFailure;
opts.context = client;
//LIBMQTT_API int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response);
rc = MQTTAsync_unsubscribe(client, topic, &opts);
return rc;
}
int init_mqtt_client()
{
MQTTAsync client;
MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
int rc;
if ((rc = MQTTAsync_create(&client_context, MQTT_SVR_IP_g, MQTT_CLIENTID_g, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to create client object, return code %d\n", rc);
exit(EXIT_FAILURE);
}
client = client_context;
if ((rc = MQTTAsync_setCallbacks(client, client, connlost, my_mqqta_recvmsg, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callback, return code %d\n", rc);
exit(EXIT_FAILURE);
}
conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
conn_opts->cleansession = 1;
//conn_opts->onSuccess = regTopicFromTable;
conn_opts->onFailure = onConFailure;
conn_opts->context = client;
//conn_opts->minRetryInterval = 1;
// conn_opts->maxRetryInterval = 64;
//注意是指针赋值 , 小心一点
conn_opts->username = MQTT_USERNAME_g;
conn_opts->password = MQTT_PWD_g;
if ((rc = MQTTAsync_connect(client, conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("* 1: Connection refused: Unacceptable protocol version
\
* 2: Connection refused: Identifier rejected
\
* 3: Connection refused: Server unavailable
\
* 4: Connection refused: Bad user name or password
\
* 5: Connection refused: Not authorized
\n");
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(2);
assert(MQTTASYNC_TRUE == MQTTAsync_isConnected(client));
//MQTTAsync_destroy(&client);
return rc;
}