123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- /*******************************************************************************
- * Copyright (c) 2012, 2020 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v2.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * https://www.eclipse.org/legal/epl-2.0/
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial contribution
- *******************************************************************************/
- #include "mqtta.h"
- #include "regfunc.h"
- static void *client_context;
- static MQTTAsync_connectOptions client_conn_opts = MQTTAsync_connectOptions_initializer;
- void connlost(void *context, char *cause)
- {
- printf(" Connection lost cause: %s\n\tReconnecting .... \n", cause);
- myReconnctMQTT(context);
- }
- void onConFailure(void *context, MQTTAsync_failureData *response)
- {
- printf("Connect failed, rc %d\n", response ? response->code : 0);
- myReconnctMQTT(context);
- }
- //重连之后订阅 20201229新增
- void onConnect(void* context, MQTTAsync_successData* response)
- {
- unuse(context);
- unuse(response);
- log("Successful connection\n");
- // 注册Topic
- regTopicFromTable();
- }
- void myReconnctMQTT(void *contxt)
- {
- MQTTAsync client = (MQTTAsync)contxt;
- MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
- int rc;
- int count = 100;
- conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
- conn_opts->cleansession = 0;
- //新增重连订阅
- conn_opts->onSuccess = onConnect;
- do
- {
- sleep(3);
- rc = MQTTAsync_connect(client, conn_opts);
- printf(" ret=%d Reconncting MQTT.Server \n", rc);
- --count;
- } while ((rc != MQTTASYNC_SUCCESS) && (0 != count));
- }
- int my_mqqta_sendmsg(char *topicName, MQTTAsync_message *pubmsg)
- {
- MQTTAsync client = (MQTTAsync)client_context;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- int rc;
- opts.onFailure = onConFailure;
- opts.context = client;
- if ((rc = MQTTAsync_sendMessage(client, topicName, pubmsg, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start sendMessage, return code %d\n", rc);
- return (EXIT_FAILURE);
- }
- debug("Message Send: topic:%s \n", topicName);
- return rc;
- }
- // 注意该回调函数必须返回 1 (此时必须 free (topic和message)); 如果ret=0, 表示错误,会触发上层重传处理,此时不能进行 free操作
- int my_mqqta_recvmsg(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
- {
- UNUSED(context);
- UNUSED(topicName);
- UNUSED(topicLen);
- //debug("Message arrived: topic: %s topic.len=%d payload.len=%d contxt=%p\n", topicName, topicLen, message->payloadlen,context);
- // 解码消息,实际的业务逻辑函数
- decode_msg_handle(topicName, message);
- MQTTAsync_freeMessage(&message);
- MQTTAsync_free(topicName);
- return 1;
- }
- // 订阅topic
- int my_subsribe_topic(char *topic, int qos)
- {
- MQTTAsync client = (MQTTAsync)client_context;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- int rc;
- log("Subscribing to topic %s for client %s using QoS%d \n", topic, MQTT_CLIENTID_g, qos);
- //opts.onFailure = onSubscribeFailure;
- opts.context = client;
- //LIBMQTT_API int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
- rc = MQTTAsync_subscribe(client, topic, qos, &opts);
- log("topic:%s,rc :%d \n",topic,rc);
- return rc;
- }
- int my_unsubsribe_topic(char *topic, int qos)
- {
- UNUSED(qos);
- MQTTAsync client = (MQTTAsync)client_context;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- int rc;
- log("unSubscribing to topic %s for client %s using QoS%d\n", topic, MQTT_CLIENTID_g, qos);
- //opts.onFailure = onUnSubscribeFailure;
- opts.context = client;
- //LIBMQTT_API int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response);
- rc = MQTTAsync_unsubscribe(client, topic, &opts);
- return rc;
- }
- int init_mqtt_client()
- {
- MQTTAsync client;
- MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
- int rc;
- if ((rc = MQTTAsync_create(&client_context, MQTT_SVR_IP_g, MQTT_CLIENTID_g, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to create client object, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- client = client_context;
- if ((rc = MQTTAsync_setCallbacks(client, client, connlost, my_mqqta_recvmsg, NULL)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to set callback, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
- conn_opts->cleansession = 1;
- //conn_opts->onSuccess = regTopicFromTable;
- conn_opts->onFailure = onConFailure;
- conn_opts->context = client;
- //conn_opts->minRetryInterval = 1;
- // conn_opts->maxRetryInterval = 64;
- //注意是指针赋值 , 小心一点
- conn_opts->username = MQTT_USERNAME_g;
- conn_opts->password = MQTT_PWD_g;
- if ((rc = MQTTAsync_connect(client, conn_opts)) != MQTTASYNC_SUCCESS)
- {
- printf("* <b>1</b>: Connection refused: Unacceptable protocol version<br>\
- * <b>2</b>: Connection refused: Identifier rejected<br>\
- * <b>3</b>: Connection refused: Server unavailable<br>\
- * <b>4</b>: Connection refused: Bad user name or password<br>\
- * <b>5</b>: Connection refused: Not authorized<br>\n");
- printf("Failed to start connect, return code %d\n", rc);
- exit(EXIT_FAILURE);
- }
- sleep(2);
- assert(MQTTASYNC_TRUE == MQTTAsync_isConnected(client));
- //MQTTAsync_destroy(&client);
- return rc;
- }
|