test8.c 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2020 IBM Corp. and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. *******************************************************************************/
  16. /**
  17. * @file
  18. * Tests for the Paho MQTT Async C client
  19. */
  20. #include "MQTTAsync.h"
  21. #include <string.h>
  22. #include <stdlib.h>
  23. #if !defined(_WINDOWS)
  24. #include <sys/time.h>
  25. #include <sys/socket.h>
  26. #include <unistd.h>
  27. #include <errno.h>
  28. #else
  29. #include <windows.h>
  30. #endif
  31. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  32. void usage(void)
  33. {
  34. printf("help!!\n");
  35. exit(EXIT_FAILURE);
  36. }
  37. struct Options
  38. {
  39. char* connection; /**< connection to system under test. */
  40. int verbose;
  41. int test_no;
  42. int size; /**< size of big message */
  43. } options =
  44. {
  45. "tcp://localhost:1883",
  46. 0,
  47. -1,
  48. 5000000,
  49. };
  50. void getopts(int argc, char** argv)
  51. {
  52. int count = 1;
  53. while (count < argc)
  54. {
  55. if (strcmp(argv[count], "--test_no") == 0)
  56. {
  57. if (++count < argc)
  58. options.test_no = atoi(argv[count]);
  59. else
  60. usage();
  61. }
  62. else if (strcmp(argv[count], "--size") == 0)
  63. {
  64. if (++count < argc)
  65. options.size = atoi(argv[count]);
  66. else
  67. usage();
  68. }
  69. else if (strcmp(argv[count], "--connection") == 0)
  70. {
  71. if (++count < argc)
  72. options.connection = argv[count];
  73. else
  74. usage();
  75. }
  76. else if (strcmp(argv[count], "--verbose") == 0)
  77. options.verbose = 1;
  78. count++;
  79. }
  80. }
  81. #define LOGA_DEBUG 0
  82. #define LOGA_INFO 1
  83. #include <stdarg.h>
  84. #include <time.h>
  85. #include <sys/timeb.h>
  86. void MyLog(int LOGA_level, char* format, ...)
  87. {
  88. static char msg_buf[256];
  89. va_list args;
  90. #if defined(_WIN32) || defined(_WINDOWS)
  91. struct timeb ts;
  92. #else
  93. struct timeval ts;
  94. #endif
  95. struct tm timeinfo;
  96. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  97. return;
  98. #if defined(_WIN32) || defined(_WINDOWS)
  99. ftime(&ts);
  100. localtime_s(&timeinfo, &ts.time);
  101. #else
  102. gettimeofday(&ts, NULL);
  103. localtime_r(&ts.tv_sec, &timeinfo);
  104. #endif
  105. strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
  106. #if defined(_WIN32) || defined(_WINDOWS)
  107. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  108. #else
  109. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000);
  110. #endif
  111. va_start(args, format);
  112. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  113. va_end(args);
  114. printf("%s\n", msg_buf);
  115. fflush(stdout);
  116. }
  117. #if defined(_WIN32) || defined(_WINDOWS)
  118. #define mqsleep(A) Sleep(1000*A)
  119. #define START_TIME_TYPE DWORD
  120. static DWORD start_time = 0;
  121. START_TIME_TYPE start_clock(void)
  122. {
  123. return GetTickCount();
  124. }
  125. #elif defined(AIX)
  126. #define mqsleep sleep
  127. #define START_TIME_TYPE struct timespec
  128. START_TIME_TYPE start_clock(void)
  129. {
  130. static struct timespec start;
  131. clock_gettime(CLOCK_REALTIME, &start);
  132. return start;
  133. }
  134. #else
  135. #define mqsleep sleep
  136. #define START_TIME_TYPE struct timeval
  137. /* TODO - unused - remove? static struct timeval start_time; */
  138. START_TIME_TYPE start_clock(void)
  139. {
  140. struct timeval start_time;
  141. gettimeofday(&start_time, NULL);
  142. return start_time;
  143. }
  144. #endif
  145. #if defined(_WIN32)
  146. long elapsed(START_TIME_TYPE start_time)
  147. {
  148. return GetTickCount() - start_time;
  149. }
  150. #elif defined(AIX)
  151. #define assert(a)
  152. long elapsed(struct timespec start)
  153. {
  154. struct timespec now, res;
  155. clock_gettime(CLOCK_REALTIME, &now);
  156. ntimersub(now, start, res);
  157. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  158. }
  159. #else
  160. long elapsed(START_TIME_TYPE start_time)
  161. {
  162. struct timeval now, res;
  163. gettimeofday(&now, NULL);
  164. timersub(&now, &start_time, &res);
  165. return (res.tv_sec)*1000 + (res.tv_usec)/1000;
  166. }
  167. #endif
  168. START_TIME_TYPE global_start_time;
  169. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  170. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  171. int tests = 0;
  172. int failures = 0;
  173. void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
  174. {
  175. ++tests;
  176. if (!value)
  177. {
  178. va_list args;
  179. ++failures;
  180. printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
  181. va_start(args, format);
  182. vprintf(format, args);
  183. va_end(args);
  184. }
  185. else
  186. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
  187. }
  188. volatile int test_finished = 0;
  189. char* test_topic = "async test topic";
  190. void test1_onDisconnect(void* context, MQTTAsync_successData* response)
  191. {
  192. MQTTAsync c = (MQTTAsync)context;
  193. MyLog(LOGA_DEBUG, "In onDisconnect callback %p", c);
  194. test_finished = 1;
  195. }
  196. void test1_onUnsubscribe(void* context, MQTTAsync_successData* response)
  197. {
  198. MQTTAsync c = (MQTTAsync)context;
  199. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  200. int rc;
  201. MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback %p", c);
  202. opts.onSuccess = test1_onDisconnect;
  203. opts.context = c;
  204. rc = MQTTAsync_disconnect(c, &opts);
  205. assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  206. }
  207. int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  208. {
  209. MQTTAsync c = (MQTTAsync)context;
  210. static int message_count = 0;
  211. int rc;
  212. MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
  213. if (++message_count == 1)
  214. {
  215. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  216. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  217. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  218. pubmsg.payloadlen = 11;
  219. pubmsg.qos = 2;
  220. pubmsg.retained = 0;
  221. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  222. }
  223. else
  224. {
  225. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  226. opts.onSuccess = test1_onUnsubscribe;
  227. opts.context = c;
  228. rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
  229. assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  230. }
  231. MQTTAsync_freeMessage(&message);
  232. MQTTAsync_free(topicName);
  233. return 1;
  234. }
  235. void test1_onSubscribe(void* context, MQTTAsync_successData* response)
  236. {
  237. MQTTAsync c = (MQTTAsync)context;
  238. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  239. int rc;
  240. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p granted qos %d", c, response->alt.qos);
  241. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  242. pubmsg.payloadlen = 11;
  243. pubmsg.qos = 2;
  244. pubmsg.retained = 0;
  245. rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
  246. }
  247. void test1_onConnect(void* context, MQTTAsync_successData* response)
  248. {
  249. MQTTAsync c = (MQTTAsync)context;
  250. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  251. int rc;
  252. MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
  253. opts.onSuccess = test1_onSubscribe;
  254. opts.context = c;
  255. rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
  256. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  257. if (rc != MQTTASYNC_SUCCESS)
  258. test_finished = 1;
  259. }
  260. void test1_onConnectFailure(void* context, MQTTAsync_failureData* response)
  261. {
  262. MQTTAsync c = (MQTTAsync)context;
  263. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  264. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  265. test_finished = 1;
  266. }
  267. /*********************************************************************
  268. Test1: Basic connect, subscribe send and receive.
  269. *********************************************************************/
  270. int test1(struct Options options)
  271. {
  272. int subsqos = 2;
  273. MQTTAsync c;
  274. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  275. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  276. int rc = 0;
  277. char* test_topic = "C client test1";
  278. char* serverURIs[2] = {"tcp://localhost:1882", options.connection};
  279. failures = 0;
  280. MyLog(LOGA_INFO, "Starting test 1 - asynchronous connect");
  281. rc = MQTTAsync_create(&c, options.connection, "async_8_test1",
  282. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  283. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  284. if (rc != MQTTASYNC_SUCCESS)
  285. {
  286. MQTTAsync_destroy(&c);
  287. goto exit;
  288. }
  289. rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
  290. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  291. opts.keepAliveInterval = 20;
  292. opts.cleansession = 1;
  293. opts.username = "testuser";
  294. opts.password = "testpassword";
  295. opts.will = &wopts;
  296. opts.will->message = "will message";
  297. opts.will->qos = 1;
  298. opts.will->retained = 0;
  299. opts.will->topicName = "will topic";
  300. opts.will = NULL;
  301. opts.onSuccess = test1_onConnect;
  302. opts.onFailure = test1_onConnectFailure;
  303. opts.context = c;
  304. opts.serverURIcount = 2;
  305. opts.serverURIs = serverURIs;
  306. MyLog(LOGA_DEBUG, "Connecting");
  307. rc = MQTTAsync_connect(c, &opts);
  308. rc = 0;
  309. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  310. if (rc != MQTTASYNC_SUCCESS)
  311. goto exit;
  312. while (!test_finished)
  313. #if defined(_WIN32)
  314. Sleep(100);
  315. #else
  316. usleep(10000L);
  317. #endif
  318. MQTTAsync_destroy(&c);
  319. exit:
  320. MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
  321. (failures == 0) ? "passed" : "failed", tests, failures);
  322. return failures;
  323. }
  324. int test2_onFailure_called = 0;
  325. void test2_onFailure(void* context, MQTTAsync_failureData* response)
  326. {
  327. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  328. test2_onFailure_called++;
  329. test_finished = 1;
  330. }
  331. void test2_onConnect(void* context, MQTTAsync_successData* response)
  332. {
  333. MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p\n", context);
  334. assert("Connect should not succeed", 0, "connect success callback was called", 0);
  335. test_finished = 1;
  336. }
  337. /*********************************************************************
  338. Test2: connect timeout
  339. *********************************************************************/
  340. int test2(struct Options options)
  341. {
  342. int subsqos = 2;
  343. MQTTAsync c;
  344. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  345. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  346. int rc = 0;
  347. char* test_topic = "C client test2";
  348. test_finished = 0;
  349. MyLog(LOGA_INFO, "Starting test 2 - connect timeout");
  350. rc = MQTTAsync_create(&c, "tcp://9.20.96.160:66", "connect timeout",
  351. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  352. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  353. if (rc != MQTTASYNC_SUCCESS)
  354. {
  355. MQTTAsync_destroy(&c);
  356. goto exit;
  357. }
  358. rc = MQTTAsync_setCallbacks(c, c, NULL, test1_messageArrived, NULL);
  359. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  360. opts.connectTimeout = 5;
  361. opts.keepAliveInterval = 20;
  362. opts.cleansession = 1;
  363. opts.username = "testuser";
  364. opts.password = "testpassword";
  365. opts.will = &wopts;
  366. opts.will->message = "will message";
  367. opts.will->qos = 1;
  368. opts.will->retained = 0;
  369. opts.will->topicName = "will topic";
  370. opts.will = NULL;
  371. opts.onSuccess = test2_onConnect;
  372. opts.onFailure = test2_onFailure;
  373. opts.context = c;
  374. MyLog(LOGA_DEBUG, "Connecting");
  375. rc = MQTTAsync_connect(c, &opts);
  376. rc = 0;
  377. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  378. if (rc != MQTTASYNC_SUCCESS)
  379. goto exit;
  380. while (!test_finished)
  381. #if defined(_WIN32)
  382. Sleep(100);
  383. #else
  384. usleep(10000L);
  385. #endif
  386. MQTTAsync_destroy(&c);
  387. exit:
  388. assert("Connect onFailure should be called once", test2_onFailure_called == 1,
  389. "connect onFailure was called %d times", test2_onFailure_called);
  390. MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
  391. (failures == 0) ? "passed" : "failed", tests, failures);
  392. return failures;
  393. }
  394. typedef struct
  395. {
  396. MQTTAsync c;
  397. int index;
  398. char clientid[24];
  399. char test_topic[100];
  400. int message_count;
  401. } client_data;
  402. void test3_onDisconnect(void* context, MQTTAsync_successData* response)
  403. {
  404. client_data* cd = (client_data*)context;
  405. MyLog(LOGA_DEBUG, "In onDisconnect callback for client \"%s\"", cd->clientid);
  406. test_finished++;
  407. }
  408. void test3_onPublish(void* context, MQTTAsync_successData* response)
  409. {
  410. client_data* cd = (client_data*)context;
  411. MyLog(LOGA_DEBUG, "In QoS 0 onPublish callback for client \"%s\"", cd->clientid);
  412. }
  413. void test3_onUnsubscribe(void* context, MQTTAsync_successData* response)
  414. {
  415. client_data* cd = (client_data*)context;
  416. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  417. int rc;
  418. MyLog(LOGA_DEBUG, "In onUnsubscribe onSuccess callback \"%s\"", cd->clientid);
  419. opts.onSuccess = test3_onDisconnect;
  420. opts.context = cd;
  421. rc = MQTTAsync_disconnect(cd->c, &opts);
  422. assert("Disconnect successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  423. }
  424. int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  425. {
  426. client_data* cd = (client_data*)context;
  427. int rc;
  428. MyLog(LOGA_DEBUG, "In messageArrived callback \"%s\" message count ", cd->clientid);
  429. if (++cd->message_count == 1)
  430. {
  431. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  432. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  433. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  434. pubmsg.payloadlen = 25;
  435. pubmsg.qos = 1;
  436. pubmsg.retained = 0;
  437. rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
  438. assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  439. }
  440. else if (cd->message_count == 2)
  441. {
  442. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  443. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  444. pubmsg.payload = "a QoS 0 message that we can shorten to the extent that we need to payload up to 11";
  445. pubmsg.payloadlen = 29;
  446. pubmsg.qos = 0;
  447. pubmsg.retained = 0;
  448. opts.context = cd;
  449. opts.onSuccess = test3_onPublish;
  450. rc = MQTTAsync_sendMessage(cd->c, cd->test_topic, &pubmsg, &opts);
  451. assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  452. }
  453. else
  454. {
  455. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  456. opts.onSuccess = test3_onUnsubscribe;
  457. opts.context = cd;
  458. rc = MQTTAsync_unsubscribe(cd->c, cd->test_topic, &opts);
  459. assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  460. }
  461. MQTTAsync_freeMessage(&message);
  462. MQTTAsync_free(topicName);
  463. return 1;
  464. }
  465. void test3_onSubscribe(void* context, MQTTAsync_successData* response)
  466. {
  467. client_data* cd = (client_data*)context;
  468. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  469. int rc;
  470. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback \"%s\"", cd->clientid);
  471. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  472. pubmsg.payloadlen = 11;
  473. pubmsg.qos = 2;
  474. pubmsg.retained = 0;
  475. rc = MQTTAsync_send(cd->c, cd->test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
  476. assert("Good rc from publish", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  477. }
  478. void test3_onConnect(void* context, MQTTAsync_successData* response)
  479. {
  480. client_data* cd = (client_data*)context;
  481. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  482. int rc;
  483. MyLog(LOGA_DEBUG, "In connect onSuccess callback, \"%s\"", cd->clientid);
  484. opts.onSuccess = test3_onSubscribe;
  485. opts.context = cd;
  486. rc = MQTTAsync_subscribe(cd->c, cd->test_topic, 2, &opts);
  487. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  488. if (rc != MQTTASYNC_SUCCESS)
  489. test_finished++;
  490. }
  491. void test3_onFailure(void* context, MQTTAsync_failureData* response)
  492. {
  493. client_data* cd = (client_data*)context;
  494. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  495. assert("Should have connected", 0, "failed to connect", NULL);
  496. MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\" rc %d\n", cd->clientid, response->code);
  497. if (response->message)
  498. MyLog(LOGA_DEBUG, "In connect onFailure callback, \"%s\"\n", response->message);
  499. test_finished++;
  500. }
  501. /*********************************************************************
  502. Test3: More than one client object - simultaneous working.
  503. *********************************************************************/
  504. int test3(struct Options options)
  505. {
  506. #define TEST3_CLIENTS 10
  507. int num_clients = TEST3_CLIENTS;
  508. int subsqos = 2;
  509. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  510. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  511. int rc = 0;
  512. int i;
  513. client_data clientdata[TEST3_CLIENTS];
  514. test_finished = 0;
  515. MyLog(LOGA_INFO, "Starting test 3 - multiple connections");
  516. for (i = 0; i < num_clients; ++i)
  517. {
  518. sprintf(clientdata[i].clientid, "async_test3_num_%d", i);
  519. sprintf(clientdata[i].test_topic, "async test3 topic num %d", i);
  520. clientdata[i].index = i;
  521. clientdata[i].message_count = 0;
  522. rc = MQTTAsync_create(&(clientdata[i].c), options.connection, clientdata[i].clientid,
  523. MQTTCLIENT_PERSISTENCE_NONE, NULL);
  524. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  525. rc = MQTTAsync_setCallbacks(clientdata[i].c, &clientdata[i], NULL, test3_messageArrived, NULL);
  526. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  527. opts.keepAliveInterval = 20;
  528. opts.cleansession = 1;
  529. opts.username = "testuser";
  530. opts.password = "testpassword";
  531. opts.will = &wopts;
  532. opts.will->message = "will message";
  533. opts.will->qos = 1;
  534. opts.will->retained = 0;
  535. opts.will->topicName = "will topic";
  536. opts.onSuccess = test3_onConnect;
  537. opts.onFailure = test3_onFailure;
  538. opts.context = &clientdata[i];
  539. MyLog(LOGA_DEBUG, "Connecting");
  540. rc = MQTTAsync_connect(clientdata[i].c, &opts);
  541. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  542. }
  543. while (test_finished < num_clients)
  544. {
  545. MyLog(LOGA_DEBUG, "num_clients %d test_finished %d\n", num_clients, test_finished);
  546. #if defined(_WIN32)
  547. Sleep(100);
  548. #else
  549. usleep(10000L);
  550. #endif
  551. }
  552. MyLog(LOGA_DEBUG, "TEST3: destroying clients");
  553. for (i = 0; i < num_clients; ++i)
  554. MQTTAsync_destroy(&clientdata[i].c);
  555. /*exit:*/
  556. MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
  557. (failures == 0) ? "passed" : "failed", tests, failures);
  558. return failures;
  559. }
  560. void* test4_payload = NULL;
  561. int test4_payloadlen = 0;
  562. void test4_onPublish(void* context, MQTTAsync_successData* response)
  563. {
  564. MQTTAsync c = (MQTTAsync)context;
  565. MyLog(LOGA_DEBUG, "In publish onSuccess callback, context %p", context);
  566. }
  567. int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  568. {
  569. MQTTAsync c = (MQTTAsync)context;
  570. static int message_count = 0;
  571. int rc, i;
  572. MyLog(LOGA_DEBUG, "In messageArrived callback %p", c);
  573. assert("Message size correct", message->payloadlen == test4_payloadlen,
  574. "message size was %d", message->payloadlen);
  575. for (i = 0; i < options.size; ++i)
  576. {
  577. if (((char*)test4_payload)[i] != ((char*)message->payload)[i])
  578. {
  579. assert("Message contents correct", ((char*)test4_payload)[i] != ((char*)message->payload)[i],
  580. "message content was %c", ((char*)message->payload)[i]);
  581. break;
  582. }
  583. }
  584. if (++message_count == 1)
  585. {
  586. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  587. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  588. pubmsg.payload = test4_payload;
  589. pubmsg.payloadlen = test4_payloadlen;
  590. pubmsg.qos = 1;
  591. pubmsg.retained = 0;
  592. opts.onSuccess = test4_onPublish;
  593. opts.context = c;
  594. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  595. }
  596. else if (message_count == 2)
  597. {
  598. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  599. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  600. pubmsg.payload = test4_payload;
  601. pubmsg.payloadlen = test4_payloadlen;
  602. pubmsg.qos = 0;
  603. pubmsg.retained = 0;
  604. opts.onSuccess = test4_onPublish;
  605. opts.context = c;
  606. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  607. }
  608. else
  609. {
  610. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  611. opts.onSuccess = test1_onUnsubscribe;
  612. opts.context = c;
  613. rc = MQTTAsync_unsubscribe(c, test_topic, &opts);
  614. assert("Unsubscribe successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  615. }
  616. MQTTAsync_freeMessage(&message);
  617. MQTTAsync_free(topicName);
  618. return 1;
  619. }
  620. void test4_onSubscribe(void* context, MQTTAsync_successData* response)
  621. {
  622. MQTTAsync c = (MQTTAsync)context;
  623. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  624. int rc, i;
  625. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback %p", c);
  626. pubmsg.payload = test4_payload = malloc(options.size);
  627. pubmsg.payloadlen = test4_payloadlen = options.size;
  628. srand(33);
  629. for (i = 0; i < options.size; ++i)
  630. ((char*)pubmsg.payload)[i] = rand() % 256;
  631. pubmsg.qos = 2;
  632. pubmsg.retained = 0;
  633. rc = MQTTAsync_send(c, test_topic, pubmsg.payloadlen, pubmsg.payload, pubmsg.qos, pubmsg.retained, NULL);
  634. assert("Send successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  635. }
  636. void test4_onConnect(void* context, MQTTAsync_successData* response)
  637. {
  638. MQTTAsync c = (MQTTAsync)context;
  639. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  640. int rc;
  641. MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
  642. opts.onSuccess = test4_onSubscribe;
  643. opts.context = c;
  644. rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
  645. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  646. if (rc != MQTTASYNC_SUCCESS)
  647. test_finished = 1;
  648. }
  649. /*********************************************************************
  650. Test4: Send and receive big messages
  651. *********************************************************************/
  652. int test4(struct Options options)
  653. {
  654. int subsqos = 2;
  655. MQTTAsync c;
  656. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  657. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  658. int rc = 0;
  659. char* test_topic = "C client test4";
  660. test_finished = failures = 0;
  661. MyLog(LOGA_INFO, "Starting test 4 - big messages");
  662. rc = MQTTAsync_create(&c, options.connection, "async_test_4",
  663. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  664. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  665. if (rc != MQTTASYNC_SUCCESS)
  666. {
  667. MQTTAsync_destroy(&c);
  668. goto exit;
  669. }
  670. rc = MQTTAsync_setCallbacks(c, c, NULL, test4_messageArrived, NULL);
  671. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  672. opts.keepAliveInterval = 20;
  673. opts.cleansession = 1;
  674. opts.username = "testuser";
  675. opts.password = "testpassword";
  676. opts.will = &wopts;
  677. opts.will->message = "will message";
  678. opts.will->qos = 1;
  679. opts.will->retained = 0;
  680. opts.will->topicName = "will topic";
  681. opts.will = NULL;
  682. opts.onSuccess = test4_onConnect;
  683. opts.onFailure = NULL;
  684. opts.context = c;
  685. MyLog(LOGA_DEBUG, "Connecting");
  686. rc = MQTTAsync_connect(c, &opts);
  687. rc = 0;
  688. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  689. if (rc != MQTTASYNC_SUCCESS)
  690. goto exit;
  691. while (!test_finished)
  692. #if defined(_WIN32)
  693. Sleep(100);
  694. #else
  695. usleep(1000L);
  696. #endif
  697. MQTTAsync_destroy(&c);
  698. exit:
  699. MyLog(LOGA_INFO, "TEST4: test %s. %d tests run, %d failures.",
  700. (failures == 0) ? "passed" : "failed", tests, failures);
  701. return failures;
  702. }
  703. int test5_onConnect_called = 0;
  704. int test5_onFailure_called = 0;
  705. void test5_onConnect(void* context, MQTTAsync_successData* response)
  706. {
  707. MQTTAsync c = (MQTTAsync)context;
  708. MyLog(LOGA_DEBUG, "In connect onSuccess callback, context %p", context);
  709. test5_onConnect_called++;
  710. test_finished = 1;
  711. }
  712. void test5_onConnectFailure(void* context, MQTTAsync_failureData* response)
  713. {
  714. MQTTAsync c = (MQTTAsync)context;
  715. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  716. test5_onFailure_called++;
  717. test_finished = 1;
  718. }
  719. /*********************************************************************
  720. Test5a: All HA connections out of service.
  721. *********************************************************************/
  722. int test5a(struct Options options)
  723. {
  724. MQTTAsync c;
  725. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  726. int rc = 0;
  727. char* test_topic = "C client test5a";
  728. char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", "tcp://localhost:1882"};
  729. failures = 0;
  730. MyLog(LOGA_INFO, "Starting test 5a - All HA connections out of service");
  731. rc = MQTTAsync_create(&c, "rubbish", "all_ha_down",
  732. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  733. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  734. if (rc != MQTTASYNC_SUCCESS)
  735. {
  736. MQTTAsync_destroy(&c);
  737. goto exit;
  738. }
  739. opts.keepAliveInterval = 20;
  740. opts.cleansession = 1;
  741. opts.username = "testuser";
  742. opts.password = "testpassword";
  743. opts.onSuccess = test5_onConnect;
  744. opts.onFailure = test5_onConnectFailure;
  745. opts.context = c;
  746. opts.serverURIcount = 3;
  747. opts.serverURIs = serverURIs;
  748. MyLog(LOGA_DEBUG, "Connecting");
  749. rc = MQTTAsync_connect(c, &opts);
  750. rc = 0;
  751. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  752. if (rc != MQTTASYNC_SUCCESS)
  753. goto exit;
  754. while (!test_finished)
  755. #if defined(_WIN32)
  756. Sleep(100);
  757. #else
  758. usleep(10000L);
  759. #endif
  760. MQTTAsync_destroy(&c);
  761. exit:
  762. assert("Connect onFailure should be called once", test5_onFailure_called == 1,
  763. "connect onFailure was called %d times", test5_onFailure_called);
  764. MyLog(LOGA_INFO, "TEST5a: test %s. %d tests run, %d failures.",
  765. (failures == 0) ? "passed" : "failed", tests, failures);
  766. return failures;
  767. }
  768. /*********************************************************************
  769. Test5b: All HA connections out of service except the last one.
  770. *********************************************************************/
  771. int test5b(struct Options options)
  772. {
  773. MQTTAsync c;
  774. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  775. int rc = 0;
  776. char* test_topic = "C client test5b";
  777. char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", options.connection};
  778. failures = 0;
  779. MyLog(LOGA_INFO, "Starting test 5b - All HA connections out of service except the last one");
  780. rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_last_one",
  781. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  782. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  783. if (rc != MQTTASYNC_SUCCESS)
  784. {
  785. MQTTAsync_destroy(&c);
  786. goto exit;
  787. }
  788. opts.keepAliveInterval = 20;
  789. opts.cleansession = 1;
  790. opts.username = "testuser";
  791. opts.password = "testpassword";
  792. opts.onSuccess = test5_onConnect;
  793. opts.onFailure = test5_onConnectFailure;
  794. opts.context = c;
  795. opts.serverURIcount = 3;
  796. opts.serverURIs = serverURIs;
  797. MyLog(LOGA_DEBUG, "Connecting");
  798. rc = MQTTAsync_connect(c, &opts);
  799. rc = 0;
  800. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  801. if (rc != MQTTASYNC_SUCCESS)
  802. goto exit;
  803. while (!test_finished)
  804. #if defined(_WIN32)
  805. Sleep(100);
  806. #else
  807. usleep(10000L);
  808. #endif
  809. MQTTAsync_destroy(&c);
  810. exit:
  811. assert("Connect onConnect should be called once", test5_onConnect_called == 1,
  812. "connect onConnect was called %d times", test5_onConnect_called);
  813. MyLog(LOGA_INFO, "TEST5b: test %s. %d tests run, %d failures.",
  814. (failures == 0) ? "passed" : "failed", tests, failures);
  815. return failures;
  816. }
  817. /*********************************************************************
  818. Test5c: All HA connections out of service except the first one.
  819. *********************************************************************/
  820. int test5c(struct Options options)
  821. {
  822. MQTTAsync c;
  823. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  824. int rc = 0;
  825. char* test_topic = "C client test5c";
  826. char* serverURIs[3] = {options.connection, "tcp://localhost:1881", "tcp://localhost:1882"};
  827. failures = 0;
  828. MyLog(LOGA_INFO, "Starting test 5c - All HA connections out of service except the first one");
  829. rc = MQTTAsync_create(&c, "rubbish", "all_ha_down_except_first_one",
  830. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  831. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  832. if (rc != MQTTASYNC_SUCCESS)
  833. {
  834. MQTTAsync_destroy(&c);
  835. goto exit;
  836. }
  837. opts.keepAliveInterval = 20;
  838. opts.cleansession = 1;
  839. opts.username = "testuser";
  840. opts.password = "testpassword";
  841. opts.onSuccess = test5_onConnect;
  842. opts.onFailure = test5_onConnectFailure;
  843. opts.context = c;
  844. opts.serverURIcount = 3;
  845. opts.serverURIs = serverURIs;
  846. MyLog(LOGA_DEBUG, "Connecting");
  847. rc = MQTTAsync_connect(c, &opts);
  848. rc = 0;
  849. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  850. if (rc != MQTTASYNC_SUCCESS)
  851. goto exit;
  852. while (!test_finished)
  853. #if defined(_WIN32)
  854. Sleep(100);
  855. #else
  856. usleep(10000L);
  857. #endif
  858. MQTTAsync_destroy(&c);
  859. exit:
  860. assert("Connect onConnect should be called once", test5_onConnect_called == 1,
  861. "connect onConnect was called %d times", test5_onConnect_called);
  862. MyLog(LOGA_INFO, "TEST5c: test %s. %d tests run, %d failures.",
  863. (failures == 0) ? "passed" : "failed", tests, failures);
  864. return failures;
  865. }
  866. void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  867. {
  868. if (strstr(message, "onnect") && !strstr(message, "isconnect"))
  869. printf("Trace : %d, %s\n", level, message);
  870. }
  871. int main(int argc, char** argv)
  872. {
  873. int rc = 0;
  874. int (*tests[])() = {NULL, test1, test2, test3, test4, test5a, test5b, test5c}; /* indexed starting from 1 */
  875. MQTTAsync_nameValue* info;
  876. getopts(argc, argv);
  877. MQTTAsync_setTraceCallback(trace_callback);
  878. info = MQTTAsync_getVersionInfo();
  879. while (info->name)
  880. {
  881. MyLog(LOGA_INFO, "%s: %s", info->name, info->value);
  882. info++;
  883. }
  884. if (options.test_no == -1)
  885. { /* run all the tests */
  886. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  887. {
  888. failures = 0;
  889. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  890. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  891. }
  892. }
  893. else
  894. {
  895. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  896. rc = tests[options.test_no](options); /* run just the selected test */
  897. }
  898. if (failures == 0)
  899. MyLog(LOGA_INFO, "verdict pass");
  900. else
  901. MyLog(LOGA_INFO, "verdict fail");
  902. return rc;
  903. }