regfunc.c 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. /********************************************************************************
  2. * @file name: regfunc.c
  3. * @Description: 注册函数,和 业务逻辑模块
  4. * @version: v1.0.0
  5. * @Author: bighead
  6. * @Date: 2020-11-15 11:05:28
  7. * @LastEditors: sunny.sun
  8. * @LastEditTime: 2020-11-16 09:33:46
  9. *******************************************************************************/
  10. #include "core.h"
  11. #include "regfunc.h"
  12. #include "handle.c"
  13. //------------------------------- 全局变量声明 ------------------------------
  14. // struct cmd_pro
  15. // {
  16. // char topic_name[256]; //频道名字
  17. // unsigned int cnt; //调用次数
  18. // void (*bc)(const char *topic, MQTTAsync_message *msg, MYSQL *_db, void *context); //处理函数
  19. // };
  20. // int do_A1(char *topicName, MQTTAsync_message *recvmsg, MYSQL *_db, void *context)
  21. static int ctr_table_size = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //数组大小
  22. static int topicNumLocal = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //配置文件中读取的Topic 数目
  23. static struct cmd_pro *handleTableLocal; //根据配置文件中的设定,启用向对应的topic_function
  24. static unsigned int msgcnt = 0; //全局消息计数器
  25. //------------------------------- 全局变量声明 ------------------------------
  26. /********************************************************************************
  27. * @Function:initTopicConf
  28. * @Brief: 从配置文件中加载Topic内容
  29. * @Param:
  30. * @Return: 无
  31. * @Date: 2020-11-15 17:22:17
  32. *******************************************************************************/
  33. int initTopicConf()
  34. {
  35. int i, ret, idx;
  36. char tmpstr[128] = {0};
  37. char resstr[128] = {0};
  38. int iDefault = 0;
  39. ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "DEFAULT"));
  40. if ((ret == 1) || (ret == 0))
  41. iDefault = ret;
  42. elog(" 服务启用=1, 禁用=0, 当前设定的默认值=%d ++++\n", iDefault);
  43. ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "NUM"));
  44. topicNumLocal = ret ? ret : topicNumLocal;
  45. if (topicNumLocal > ctr_table_size)
  46. {
  47. topicNumLocal = ctr_table_size;
  48. elog(" 配置的Topic数目超过 程序内置Topic数目, 默认截断处理 num=%d ++++ \n", topicNumLocal);
  49. }
  50. handleTableLocal = malloc(sizeof(struct cmd_pro) * topicNumLocal);
  51. if (handleTableLocal == NULL)
  52. exit(1);
  53. memset(handleTableLocal, 0, sizeof(struct cmd_pro) * topicNumLocal);
  54. // loading topic 配置
  55. idx = 0;
  56. for (i = 0; i < topicNumLocal; i++)
  57. {
  58. sprintf(tmpstr, "%d", i);
  59. strcpy(resstr, GetInitKey(CONFIG_FILE, "TOPIC", tmpstr));
  60. if (0 == strlen(resstr))
  61. {
  62. ret = iDefault;
  63. log("启用缺省配置 Topic[%d]=%d +++++ \n", i, ret);
  64. }
  65. else
  66. {
  67. ret = atoi(resstr);
  68. }
  69. if (ret == 1)
  70. {
  71. //表示启用对应的Topic 服务,进行相应的注册
  72. handleTableLocal[idx].bc = ctr_handle_tab[i].bc;
  73. handleTableLocal[idx].cnt = 0;
  74. strcpy(handleTableLocal[idx].topic_name, ctr_handle_tab[i].topic_name);
  75. log("从配置文件中启用 index=%d TopicName=%s bc=%p 服务启用 \n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc);
  76. ++idx;
  77. }
  78. else
  79. {
  80. log("从配置文件中禁用 index=%d TopicName=%s bc=%p 服务禁用+++\n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc);
  81. }
  82. }
  83. //修正实际的topic数目
  84. topicNumLocal = idx;
  85. log("\n\t\t 总计 启用 %d topic 服务 +++++++++ \n", topicNumLocal);
  86. //校验topic 配置, bc字段不能为空;
  87. for (i = 0; i < topicNumLocal; i++)
  88. {
  89. assert(handleTableLocal[i].bc);
  90. }
  91. return topicNumLocal;
  92. }
  93. /********************************************************************************
  94. * @Function:
  95. * @Brief: 控制类消息处理函数
  96. * @Param:
  97. * @Return: 无
  98. * @Date: 2020-11-15 17:22:17
  99. *******************************************************************************/
  100. int regTopicFromTable(void)
  101. {
  102. int idx = 0;
  103. struct cmd_pro *phandleTable = NULL;
  104. for (idx = 0; idx < topicNumLocal; idx++)
  105. {
  106. phandleTable = handleTableLocal + idx;
  107. if (phandleTable->bc)
  108. {
  109. my_subsribe_topic(phandleTable->topic_name, 0);
  110. sleep(2);
  111. }
  112. }
  113. return 0;
  114. }
  115. /********************************************************************************
  116. * @Function:
  117. * @Brief: 控制类消息处理函数
  118. * @Param:
  119. * @Return: 无
  120. * @Date: 2020-11-15 17:22:17
  121. *******************************************************************************/
  122. int decode_msg_handle( char *topic, MQTTAsync_message *msg)
  123. {
  124. static int idx = 0;
  125. static MYSQL *_mysqlcon = NULL;
  126. static struct cmd_pro *pHandle = NULL;
  127. char seg[] = "/";
  128. char mac[100];
  129. char update_sql[512] = {0};
  130. memset(mac,0x0,100);
  131. cJSON* cjson;
  132. cJSON*item;
  133. msgcnt++; //全局消息计数器 +1
  134. if (!_mysqlcon)
  135. {
  136. // 先申请内存再初始化
  137. _mysqlcon = malloc(sizeof(MYSQL));
  138. db_init(_mysqlcon);
  139. pHandle = handleTableLocal;
  140. }
  141. //log("decode msg,this topic=%s func=%p \n", topic, pHandle->bc);
  142. // Try cache and hit
  143. if (strcmp(topic, pHandle->topic_name) == 0)
  144. {
  145. pHandle->bc(topic, msg, _mysqlcon);
  146. pHandle->cnt++; //消息计数器 +1
  147. debug("hit idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc);
  148. return 0;
  149. }
  150. //共享订阅在此处理
  151. if(strcmp(topic, "DT/EPD") == 0)
  152. {
  153. pHandle = handleTableLocal+0;
  154. pHandle->bc(topic, msg, _mysqlcon);
  155. pHandle->cnt++; //消息计数器 +1
  156. return 0;
  157. }
  158. else if(strcmp(topic, "DT/SDT") == 0)
  159. {
  160. pHandle = handleTableLocal+1;
  161. pHandle->bc(topic, msg, _mysqlcon);
  162. pHandle->cnt++; //消息计数器 +1
  163. return 0;
  164. }
  165. else if(strcmp(topic, "DT/COP") == 0)
  166. {
  167. pHandle = handleTableLocal+2;
  168. pHandle->bc(topic, msg, _mysqlcon);
  169. pHandle->cnt++; //消息计数器 +1
  170. return 0;
  171. }
  172. else if(strcmp(topic, "DT/PIG") == 0)
  173. {
  174. pHandle = handleTableLocal+3;
  175. pHandle->bc(topic, msg, _mysqlcon);
  176. pHandle->cnt++; //消息计数器 +1
  177. return 0;
  178. }
  179. // loop all of it
  180. for (idx = 0; idx < topicNumLocal; idx++)
  181. {
  182. pHandle = handleTableLocal + idx;
  183. if (strcmp(topic, pHandle->topic_name) == 0)
  184. {
  185. pHandle->bc(topic, msg, _mysqlcon);
  186. pHandle->cnt++; //消息计数器 +1
  187. debug("idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc);
  188. return 0;
  189. }
  190. }
  191. // 系统订阅 $SYS/brokers/+/clients/+/disconnected 设备离线
  192. if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"disconnected") == 0)
  193. {
  194. debug("topic:%s ,disconnected 触发 \n",topic);
  195. if(strlen(find_string(topic,4)) == 24)
  196. {
  197. char disconneted_reason[30] = {0};
  198. strcpy(mac,find_string(topic,4));
  199. const char *data = (const char*)msg->payload;
  200. cjson = cJSON_Parse(data);
  201. item = cJSON_GetObjectItem(cjson,"reason");
  202. memcpy(disconneted_reason,item->valuestring,strlen(item->valuestring));
  203. cJSON_Delete(cjson);
  204. sprintf(update_sql,"update dev_status set current_online = 0,update_time = now() where gateway_mac = '%s'",mac);
  205. excuteSql(_mysqlcon,update_sql);
  206. log("◇ 设备:%s 连接断开,reason:%s,disconnected \n",mac,disconneted_reason);
  207. }
  208. return 0;
  209. }
  210. else if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"connected") == 0)
  211. {
  212. debug("topic:%s ,connected 触发 \n",topic);
  213. if(strlen(find_string(topic,4)) == 24)
  214. {
  215. char client_ip[20] = {0};
  216. int client_port = 0;
  217. strcpy(mac,find_string(topic,4));
  218. const char *data = (const char*)msg->payload;
  219. cjson = cJSON_Parse(data);
  220. item = cJSON_GetObjectItem(cjson,"ipaddress");
  221. memcpy(client_ip,item->valuestring,strlen(item->valuestring));
  222. item = cJSON_GetObjectItem(cjson,"sockport");
  223. client_port = item->valueint;
  224. cJSON_Delete(cjson);
  225. sprintf(update_sql,"update dev_status set ipaddress = '%s',sockport = %d,ipaddress_update_time = now() where device_mac = '%s'",client_ip,client_port,mac);
  226. excuteSql(_mysqlcon,update_sql);
  227. log("◆ 设备:%s 连接成功,ipaddress:%s,sockport:%d,connected \n",mac,client_ip,client_port);
  228. }
  229. return 0;
  230. }
  231. //进入这里,说明 该消息没有注册,无对应解码函数,临时打印. 可以考虑归集此类消息
  232. elog("WARNING: Recv.topic没有注册, Topic=%s\n", topic);
  233. return 0;
  234. }
  235. /********************************************************************************
  236. * @Function: print_stats
  237. * @Brief: 打印运行统计
  238. * @Param: msg:消息报文
  239. * @Return: 无
  240. * @Date: 2019-08-26 14:54:18
  241. *******************************************************************************/
  242. int print_stats()
  243. {
  244. int i;
  245. int sec;
  246. sec = timeGloble_g - timeBegin_g;
  247. log("__STATS Running.time.cost=%dh%dm%ds second.total=%d nowTime=%d\n", sec / 3600, sec / 60, sec % 60, sec, timeGloble_g);
  248. log("__STATS Rcv.msg.total=%d\n", msgcnt);
  249. for (i = 0; i < topicNumLocal; i++)
  250. {
  251. log("__STATS topic=%s num=%d func=%p\n", handleTableLocal[i].topic_name, handleTableLocal[i].cnt, handleTableLocal[i].bc);
  252. }
  253. return 0;
  254. }