/******************************************************************************** * @file name: regfunc.c * @Description: 注册函数,和 业务逻辑模块 * @version: v1.0.0 * @Author: bighead * @Date: 2020-11-15 11:05:28 * @LastEditors: sunny.sun * @LastEditTime: 2020-11-16 09:33:46 *******************************************************************************/ #include "core.h" #include "regfunc.h" #include "handle.c" //------------------------------- 全局变量声明 ------------------------------ // struct cmd_pro // { // char topic_name[256]; //频道名字 // unsigned int cnt; //调用次数 // void (*bc)(const char *topic, MQTTAsync_message *msg, MYSQL *_db, void *context); //处理函数 // }; // int do_A1(char *topicName, MQTTAsync_message *recvmsg, MYSQL *_db, void *context) static int ctr_table_size = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //数组大小 static int topicNumLocal = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //配置文件中读取的Topic 数目 static struct cmd_pro *handleTableLocal; //根据配置文件中的设定,启用向对应的topic_function static unsigned int msgcnt = 0; //全局消息计数器 //------------------------------- 全局变量声明 ------------------------------ /******************************************************************************** * @Function:initTopicConf * @Brief: 从配置文件中加载Topic内容 * @Param: * @Return: 无 * @Date: 2020-11-15 17:22:17 *******************************************************************************/ int initTopicConf() { int i, ret, idx; char tmpstr[128] = {0}; char resstr[128] = {0}; int iDefault = 0; ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "DEFAULT")); if ((ret == 1) || (ret == 0)) iDefault = ret; elog(" 服务启用=1, 禁用=0, 当前设定的默认值=%d ++++\n", iDefault); ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "NUM")); topicNumLocal = ret ? ret : topicNumLocal; if (topicNumLocal > ctr_table_size) { topicNumLocal = ctr_table_size; elog(" 配置的Topic数目超过 程序内置Topic数目, 默认截断处理 num=%d ++++ \n", topicNumLocal); } handleTableLocal = malloc(sizeof(struct cmd_pro) * topicNumLocal); if (handleTableLocal == NULL) exit(1); memset(handleTableLocal, 0, sizeof(struct cmd_pro) * topicNumLocal); // loading topic 配置 idx = 0; for (i = 0; i < topicNumLocal; i++) { sprintf(tmpstr, "%d", i); strcpy(resstr, GetInitKey(CONFIG_FILE, "TOPIC", tmpstr)); if (0 == strlen(resstr)) { ret = iDefault; log("启用缺省配置 Topic[%d]=%d +++++ \n", i, ret); } else { ret = atoi(resstr); } if (ret == 1) { //表示启用对应的Topic 服务,进行相应的注册 handleTableLocal[idx].bc = ctr_handle_tab[i].bc; handleTableLocal[idx].cnt = 0; strcpy(handleTableLocal[idx].topic_name, ctr_handle_tab[i].topic_name); log("从配置文件中启用 index=%d TopicName=%s bc=%p 服务启用 \n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc); ++idx; } else { log("从配置文件中禁用 index=%d TopicName=%s bc=%p 服务禁用+++\n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc); } } //修正实际的topic数目 topicNumLocal = idx; log("\n\t\t 总计 启用 %d topic 服务 +++++++++ \n", topicNumLocal); //校验topic 配置, bc字段不能为空; for (i = 0; i < topicNumLocal; i++) { assert(handleTableLocal[i].bc); } return topicNumLocal; } /******************************************************************************** * @Function: * @Brief: 控制类消息处理函数 * @Param: * @Return: 无 * @Date: 2020-11-15 17:22:17 *******************************************************************************/ int regTopicFromTable(void) { int idx = 0; struct cmd_pro *phandleTable = NULL; for (idx = 0; idx < topicNumLocal; idx++) { phandleTable = handleTableLocal + idx; if (phandleTable->bc) { my_subsribe_topic(phandleTable->topic_name, 0); sleep(2); } } return 0; } /******************************************************************************** * @Function: * @Brief: 控制类消息处理函数 * @Param: * @Return: 无 * @Date: 2020-11-15 17:22:17 *******************************************************************************/ int decode_msg_handle( char *topic, MQTTAsync_message *msg) { static int idx = 0; static MYSQL *_mysqlcon = NULL; static struct cmd_pro *pHandle = NULL; char seg[] = "/"; char mac[100]; char update_sql[512] = {0}; memset(mac,0x0,100); cJSON* cjson; cJSON*item; msgcnt++; //全局消息计数器 +1 if (!_mysqlcon) { // 先申请内存再初始化 _mysqlcon = malloc(sizeof(MYSQL)); db_init(_mysqlcon); pHandle = handleTableLocal; } //log("decode msg,this topic=%s func=%p \n", topic, pHandle->bc); // Try cache and hit if (strcmp(topic, pHandle->topic_name) == 0) { pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 debug("hit idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc); return 0; } //共享订阅在此处理 if(strcmp(topic, "DT/EPD") == 0) { pHandle = handleTableLocal+0; pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 return 0; } else if(strcmp(topic, "DT/SDT") == 0) { pHandle = handleTableLocal+1; pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 return 0; } else if(strcmp(topic, "DT/COP") == 0) { pHandle = handleTableLocal+2; pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 return 0; } else if(strcmp(topic, "DT/PIG") == 0) { pHandle = handleTableLocal+3; pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 return 0; } // loop all of it for (idx = 0; idx < topicNumLocal; idx++) { pHandle = handleTableLocal + idx; if (strcmp(topic, pHandle->topic_name) == 0) { pHandle->bc(topic, msg, _mysqlcon); pHandle->cnt++; //消息计数器 +1 debug("idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc); return 0; } } // 系统订阅 $SYS/brokers/+/clients/+/disconnected 设备离线 if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"disconnected") == 0) { debug("topic:%s ,disconnected 触发 \n",topic); if(strlen(find_string(topic,4)) == 24) { char disconneted_reason[30] = {0}; strcpy(mac,find_string(topic,4)); const char *data = (const char*)msg->payload; cjson = cJSON_Parse(data); item = cJSON_GetObjectItem(cjson,"reason"); memcpy(disconneted_reason,item->valuestring,strlen(item->valuestring)); cJSON_Delete(cjson); sprintf(update_sql,"update dev_status set current_online = 0,update_time = now() where gateway_mac = '%s'",mac); excuteSql(_mysqlcon,update_sql); log("◇ 设备:%s 连接断开,reason:%s,disconnected \n",mac,disconneted_reason); } return 0; } else if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"connected") == 0) { debug("topic:%s ,connected 触发 \n",topic); if(strlen(find_string(topic,4)) == 24) { char client_ip[20] = {0}; int client_port = 0; strcpy(mac,find_string(topic,4)); const char *data = (const char*)msg->payload; cjson = cJSON_Parse(data); item = cJSON_GetObjectItem(cjson,"ipaddress"); memcpy(client_ip,item->valuestring,strlen(item->valuestring)); item = cJSON_GetObjectItem(cjson,"sockport"); client_port = item->valueint; cJSON_Delete(cjson); sprintf(update_sql,"update dev_status set ipaddress = '%s',sockport = %d,ipaddress_update_time = now() where device_mac = '%s'",client_ip,client_port,mac); excuteSql(_mysqlcon,update_sql); log("◆ 设备:%s 连接成功,ipaddress:%s,sockport:%d,connected \n",mac,client_ip,client_port); } return 0; } //进入这里,说明 该消息没有注册,无对应解码函数,临时打印. 可以考虑归集此类消息 elog("WARNING: Recv.topic没有注册, Topic=%s\n", topic); return 0; } /******************************************************************************** * @Function: print_stats * @Brief: 打印运行统计 * @Param: msg:消息报文 * @Return: 无 * @Date: 2019-08-26 14:54:18 *******************************************************************************/ int print_stats() { int i; int sec; sec = timeGloble_g - timeBegin_g; log("__STATS Running.time.cost=%dh%dm%ds second.total=%d nowTime=%d\n", sec / 3600, sec / 60, sec % 60, sec, timeGloble_g); log("__STATS Rcv.msg.total=%d\n", msgcnt); for (i = 0; i < topicNumLocal; i++) { log("__STATS topic=%s num=%d func=%p\n", handleTableLocal[i].topic_name, handleTableLocal[i].cnt, handleTableLocal[i].bc); } return 0; }