mqtta.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2020 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. *******************************************************************************/
  16. #include "mqtta.h"
  17. #include "regfunc.h"
  18. static void *client_context;
  19. static MQTTAsync_connectOptions client_conn_opts = MQTTAsync_connectOptions_initializer;
  20. void connlost(void *context, char *cause)
  21. {
  22. printf(" Connection lost cause: %s\n\tReconnecting .... \n", cause);
  23. myReconnctMQTT(context);
  24. }
  25. void onConFailure(void *context, MQTTAsync_failureData *response)
  26. {
  27. printf("Connect failed, rc %d\n", response ? response->code : 0);
  28. myReconnctMQTT(context);
  29. }
  30. //重连之后订阅 20201229新增
  31. void onConnect(void* context, MQTTAsync_successData* response)
  32. {
  33. unuse(context);
  34. unuse(response);
  35. log("Successful connection\n");
  36. // 注册Topic
  37. regTopicFromTable();
  38. }
  39. void myReconnctMQTT(void *contxt)
  40. {
  41. MQTTAsync client = (MQTTAsync)contxt;
  42. MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
  43. int rc;
  44. int count = 100;
  45. conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
  46. conn_opts->cleansession = 0;
  47. //新增重连订阅
  48. conn_opts->onSuccess = onConnect;
  49. do
  50. {
  51. sleep(3);
  52. rc = MQTTAsync_connect(client, conn_opts);
  53. printf(" ret=%d Reconncting MQTT.Server \n", rc);
  54. --count;
  55. } while ((rc != MQTTASYNC_SUCCESS) && (0 != count));
  56. }
  57. int my_mqqta_sendmsg(char *topicName, MQTTAsync_message *pubmsg)
  58. {
  59. MQTTAsync client = (MQTTAsync)client_context;
  60. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  61. int rc;
  62. opts.onFailure = onConFailure;
  63. opts.context = client;
  64. if ((rc = MQTTAsync_sendMessage(client, topicName, pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  65. {
  66. printf("Failed to start sendMessage, return code %d\n", rc);
  67. return (EXIT_FAILURE);
  68. }
  69. debug("Message Send: topic:%s \n", topicName);
  70. return rc;
  71. }
  72. // 注意该回调函数必须返回 1 (此时必须 free (topic和message)); 如果ret=0, 表示错误,会触发上层重传处理,此时不能进行 free操作
  73. int my_mqqta_recvmsg(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
  74. {
  75. UNUSED(context);
  76. UNUSED(topicName);
  77. UNUSED(topicLen);
  78. //debug("Message arrived: topic: %s topic.len=%d payload.len=%d contxt=%p\n", topicName, topicLen, message->payloadlen,context);
  79. // 解码消息,实际的业务逻辑函数
  80. decode_msg_handle(topicName, message);
  81. MQTTAsync_freeMessage(&message);
  82. MQTTAsync_free(topicName);
  83. return 1;
  84. }
  85. // 订阅topic
  86. int my_subsribe_topic(char *topic, int qos)
  87. {
  88. MQTTAsync client = (MQTTAsync)client_context;
  89. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  90. int rc;
  91. log("Subscribing to topic %s for client %s using QoS%d \n", topic, MQTT_CLIENTID_g, qos);
  92. //opts.onFailure = onSubscribeFailure;
  93. opts.context = client;
  94. //LIBMQTT_API int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
  95. rc = MQTTAsync_subscribe(client, topic, qos, &opts);
  96. log("topic:%s,rc :%d \n",topic,rc);
  97. return rc;
  98. }
  99. int my_unsubsribe_topic(char *topic, int qos)
  100. {
  101. UNUSED(qos);
  102. MQTTAsync client = (MQTTAsync)client_context;
  103. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  104. int rc;
  105. log("unSubscribing to topic %s for client %s using QoS%d\n", topic, MQTT_CLIENTID_g, qos);
  106. //opts.onFailure = onUnSubscribeFailure;
  107. opts.context = client;
  108. //LIBMQTT_API int MQTTAsync_unsubscribe(MQTTAsync handle, const char* topic, MQTTAsync_responseOptions* response);
  109. rc = MQTTAsync_unsubscribe(client, topic, &opts);
  110. return rc;
  111. }
  112. int init_mqtt_client()
  113. {
  114. MQTTAsync client;
  115. MQTTAsync_connectOptions *conn_opts = &client_conn_opts;
  116. int rc;
  117. if ((rc = MQTTAsync_create(&client_context, MQTT_SVR_IP_g, MQTT_CLIENTID_g, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
  118. {
  119. printf("Failed to create client object, return code %d\n", rc);
  120. exit(EXIT_FAILURE);
  121. }
  122. client = client_context;
  123. if ((rc = MQTTAsync_setCallbacks(client, client, connlost, my_mqqta_recvmsg, NULL)) != MQTTASYNC_SUCCESS)
  124. {
  125. printf("Failed to set callback, return code %d\n", rc);
  126. exit(EXIT_FAILURE);
  127. }
  128. conn_opts->keepAliveInterval = MQTT_KEEPALIVE_g;
  129. conn_opts->cleansession = 1;
  130. //conn_opts->onSuccess = regTopicFromTable;
  131. conn_opts->onFailure = onConFailure;
  132. conn_opts->context = client;
  133. //conn_opts->minRetryInterval = 1;
  134. // conn_opts->maxRetryInterval = 64;
  135. //注意是指针赋值 , 小心一点
  136. conn_opts->username = MQTT_USERNAME_g;
  137. conn_opts->password = MQTT_PWD_g;
  138. if ((rc = MQTTAsync_connect(client, conn_opts)) != MQTTASYNC_SUCCESS)
  139. {
  140. printf("* <b>1</b>: Connection refused: Unacceptable protocol version<br>\
  141. * <b>2</b>: Connection refused: Identifier rejected<br>\
  142. * <b>3</b>: Connection refused: Server unavailable<br>\
  143. * <b>4</b>: Connection refused: Bad user name or password<br>\
  144. * <b>5</b>: Connection refused: Not authorized<br>\n");
  145. printf("Failed to start connect, return code %d\n", rc);
  146. exit(EXIT_FAILURE);
  147. }
  148. sleep(2);
  149. assert(MQTTASYNC_TRUE == MQTTAsync_isConnected(client));
  150. //MQTTAsync_destroy(&client);
  151. return rc;
  152. }