123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- /********************************************************************************
- * @file name: regfunc.c
- * @Description: 注册函数,和 业务逻辑模块
- * @version: v1.0.0
- * @Author: bighead
- * @Date: 2020-11-15 11:05:28
- * @LastEditors: sunny.sun
- * @LastEditTime: 2020-11-16 09:33:46
- *******************************************************************************/
- #include "core.h"
- #include "regfunc.h"
- #include "handle.c"
- //------------------------------- 全局变量声明 ------------------------------
- // struct cmd_pro
- // {
- // char topic_name[256]; //频道名字
- // unsigned int cnt; //调用次数
- // void (*bc)(const char *topic, MQTTAsync_message *msg, MYSQL *_db, void *context); //处理函数
- // };
- // int do_A1(char *topicName, MQTTAsync_message *recvmsg, MYSQL *_db, void *context)
- static int ctr_table_size = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //数组大小
- static int topicNumLocal = sizeof(ctr_handle_tab) / sizeof(struct cmd_pro); //配置文件中读取的Topic 数目
- static struct cmd_pro *handleTableLocal; //根据配置文件中的设定,启用向对应的topic_function
- static unsigned int msgcnt = 0; //全局消息计数器
- //------------------------------- 全局变量声明 ------------------------------
- /********************************************************************************
- * @Function:initTopicConf
- * @Brief: 从配置文件中加载Topic内容
- * @Param:
- * @Return: 无
- * @Date: 2020-11-15 17:22:17
- *******************************************************************************/
- int initTopicConf()
- {
- int i, ret, idx;
- char tmpstr[128] = {0};
- char resstr[128] = {0};
- int iDefault = 0;
- ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "DEFAULT"));
- if ((ret == 1) || (ret == 0))
- iDefault = ret;
- elog(" 服务启用=1, 禁用=0, 当前设定的默认值=%d ++++\n", iDefault);
- ret = atoi(GetInitKey(CONFIG_FILE, "TOPIC", "NUM"));
- topicNumLocal = ret ? ret : topicNumLocal;
- if (topicNumLocal > ctr_table_size)
- {
- topicNumLocal = ctr_table_size;
- elog(" 配置的Topic数目超过 程序内置Topic数目, 默认截断处理 num=%d ++++ \n", topicNumLocal);
- }
- handleTableLocal = malloc(sizeof(struct cmd_pro) * topicNumLocal);
- if (handleTableLocal == NULL)
- exit(1);
- memset(handleTableLocal, 0, sizeof(struct cmd_pro) * topicNumLocal);
- // loading topic 配置
- idx = 0;
- for (i = 0; i < topicNumLocal; i++)
- {
- sprintf(tmpstr, "%d", i);
- strcpy(resstr, GetInitKey(CONFIG_FILE, "TOPIC", tmpstr));
- if (0 == strlen(resstr))
- {
- ret = iDefault;
- log("启用缺省配置 Topic[%d]=%d +++++ \n", i, ret);
- }
- else
- {
- ret = atoi(resstr);
- }
- if (ret == 1)
- {
- //表示启用对应的Topic 服务,进行相应的注册
- handleTableLocal[idx].bc = ctr_handle_tab[i].bc;
- handleTableLocal[idx].cnt = 0;
- strcpy(handleTableLocal[idx].topic_name, ctr_handle_tab[i].topic_name);
- log("从配置文件中启用 index=%d TopicName=%s bc=%p 服务启用 \n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc);
- ++idx;
- }
- else
- {
- log("从配置文件中禁用 index=%d TopicName=%s bc=%p 服务禁用+++\n", i, ctr_handle_tab[i].topic_name, ctr_handle_tab[i].bc);
- }
- }
- //修正实际的topic数目
- topicNumLocal = idx;
- log("\n\t\t 总计 启用 %d topic 服务 +++++++++ \n", topicNumLocal);
- //校验topic 配置, bc字段不能为空;
- for (i = 0; i < topicNumLocal; i++)
- {
- assert(handleTableLocal[i].bc);
- }
- return topicNumLocal;
- }
- /********************************************************************************
- * @Function:
- * @Brief: 控制类消息处理函数
- * @Param:
- * @Return: 无
- * @Date: 2020-11-15 17:22:17
- *******************************************************************************/
- int regTopicFromTable(void)
- {
- int idx = 0;
- struct cmd_pro *phandleTable = NULL;
- for (idx = 0; idx < topicNumLocal; idx++)
- {
- phandleTable = handleTableLocal + idx;
- if (phandleTable->bc)
- {
- my_subsribe_topic(phandleTable->topic_name, 0);
- sleep(2);
- }
- }
- return 0;
- }
- /********************************************************************************
- * @Function:
- * @Brief: 控制类消息处理函数
- * @Param:
- * @Return: 无
- * @Date: 2020-11-15 17:22:17
- *******************************************************************************/
- int decode_msg_handle( char *topic, MQTTAsync_message *msg)
- {
- static int idx = 0;
- static MYSQL *_mysqlcon = NULL;
- static struct cmd_pro *pHandle = NULL;
- char seg[] = "/";
- char mac[100];
- char update_sql[512] = {0};
- memset(mac,0x0,100);
- cJSON* cjson;
- cJSON*item;
- msgcnt++; //全局消息计数器 +1
- if (!_mysqlcon)
- {
- // 先申请内存再初始化
- _mysqlcon = malloc(sizeof(MYSQL));
- db_init(_mysqlcon);
- pHandle = handleTableLocal;
- }
- //log("decode msg,this topic=%s func=%p \n", topic, pHandle->bc);
- // Try cache and hit
- if (strcmp(topic, pHandle->topic_name) == 0)
- {
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- debug("hit idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc);
- return 0;
- }
- //共享订阅在此处理
- if(strcmp(topic, "DT/EPD") == 0)
- {
- pHandle = handleTableLocal+0;
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- return 0;
- }
- else if(strcmp(topic, "DT/SDT") == 0)
- {
- pHandle = handleTableLocal+1;
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- return 0;
- }
- else if(strcmp(topic, "DT/COP") == 0)
- {
- pHandle = handleTableLocal+2;
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- return 0;
- }
- else if(strcmp(topic, "DT/PIG") == 0)
- {
- pHandle = handleTableLocal+3;
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- return 0;
- }
- // loop all of it
- for (idx = 0; idx < topicNumLocal; idx++)
- {
- pHandle = handleTableLocal + idx;
- if (strcmp(topic, pHandle->topic_name) == 0)
- {
- pHandle->bc(topic, msg, _mysqlcon);
- pHandle->cnt++; //消息计数器 +1
- debug("idx=%d topic=%s func=%p decode msg\n", idx, topic, pHandle->bc);
- return 0;
- }
-
- }
- // 系统订阅 $SYS/brokers/+/clients/+/disconnected 设备离线
- if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"disconnected") == 0)
- {
- debug("topic:%s ,disconnected 触发 \n",topic);
- if(strlen(find_string(topic,4)) == 24)
- {
- char disconneted_reason[30] = {0};
- strcpy(mac,find_string(topic,4));
- const char *data = (const char*)msg->payload;
- cjson = cJSON_Parse(data);
- item = cJSON_GetObjectItem(cjson,"reason");
- memcpy(disconneted_reason,item->valuestring,strlen(item->valuestring));
- cJSON_Delete(cjson);
- sprintf(update_sql,"update dev_status set current_online = 0,update_time = now() where gateway_mac = '%s'",mac);
- excuteSql(_mysqlcon,update_sql);
- log("◇ 设备:%s 连接断开,reason:%s,disconnected \n",mac,disconneted_reason);
- }
- return 0;
- }
- else if(find_char(topic,seg) == 5 && strcmp(find_string(topic,5),"connected") == 0)
- {
- debug("topic:%s ,connected 触发 \n",topic);
- if(strlen(find_string(topic,4)) == 24)
- {
- char client_ip[20] = {0};
- int client_port = 0;
- strcpy(mac,find_string(topic,4));
- const char *data = (const char*)msg->payload;
- cjson = cJSON_Parse(data);
- item = cJSON_GetObjectItem(cjson,"ipaddress");
- memcpy(client_ip,item->valuestring,strlen(item->valuestring));
- item = cJSON_GetObjectItem(cjson,"sockport");
- client_port = item->valueint;
- cJSON_Delete(cjson);
- sprintf(update_sql,"update dev_status set ipaddress = '%s',sockport = %d,ipaddress_update_time = now() where device_mac = '%s'",client_ip,client_port,mac);
- excuteSql(_mysqlcon,update_sql);
- log("◆ 设备:%s 连接成功,ipaddress:%s,sockport:%d,connected \n",mac,client_ip,client_port);
- }
- return 0;
- }
- //进入这里,说明 该消息没有注册,无对应解码函数,临时打印. 可以考虑归集此类消息
- elog("WARNING: Recv.topic没有注册, Topic=%s\n", topic);
- return 0;
- }
- /********************************************************************************
- * @Function: print_stats
- * @Brief: 打印运行统计
- * @Param: msg:消息报文
- * @Return: 无
- * @Date: 2019-08-26 14:54:18
- *******************************************************************************/
- int print_stats()
- {
- int i;
- int sec;
- sec = timeGloble_g - timeBegin_g;
- log("__STATS Running.time.cost=%dh%dm%ds second.total=%d nowTime=%d\n", sec / 3600, sec / 60, sec % 60, sec, timeGloble_g);
- log("__STATS Rcv.msg.total=%d\n", msgcnt);
- for (i = 0; i < topicNumLocal; i++)
- {
- log("__STATS topic=%s num=%d func=%p\n", handleTableLocal[i].topic_name, handleTableLocal[i].cnt, handleTableLocal[i].bc);
- }
- return 0;
- }
|