test_taskprocessor.c 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012-2013, Digium, Inc.
  5. *
  6. * Mark Michelson <mmichelson@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief taskprocessor unit tests
  21. *
  22. * \author Mark Michelson <mmichelson@digium.com>
  23. *
  24. */
  25. /*** MODULEINFO
  26. <depend>TEST_FRAMEWORK</depend>
  27. <support_level>core</support_level>
  28. ***/
  29. #include "asterisk.h"
  30. #include "asterisk/test.h"
  31. #include "asterisk/taskprocessor.h"
  32. #include "asterisk/module.h"
  33. #include "asterisk/astobj2.h"
  34. #include "asterisk/serializer.h"
  35. #include "asterisk/threadpool.h"
  36. /*!
  37. * \brief userdata associated with baseline taskprocessor test
  38. */
  39. struct task_data {
  40. /* Condition used to signal to queuing thread that task was executed */
  41. ast_cond_t cond;
  42. /* Lock protecting the condition */
  43. ast_mutex_t lock;
  44. /*! Boolean indicating that the task was run */
  45. int task_complete;
  46. /*! Milliseconds to wait before returning */
  47. unsigned long wait_time;
  48. };
  49. static void task_data_dtor(void *obj)
  50. {
  51. struct task_data *task_data = obj;
  52. ast_mutex_destroy(&task_data->lock);
  53. ast_cond_destroy(&task_data->cond);
  54. }
  55. /*! \brief Create a task_data object */
  56. static struct task_data *task_data_create(void)
  57. {
  58. struct task_data *task_data =
  59. ao2_alloc(sizeof(*task_data), task_data_dtor);
  60. if (!task_data) {
  61. return NULL;
  62. }
  63. ast_cond_init(&task_data->cond, NULL);
  64. ast_mutex_init(&task_data->lock);
  65. task_data->task_complete = 0;
  66. task_data->wait_time = 0;
  67. return task_data;
  68. }
  69. /*!
  70. * \brief Queued task for baseline test.
  71. *
  72. * The task simply sets a boolean to indicate the
  73. * task has been run and then signals a condition
  74. * saying it's complete
  75. */
  76. static int task(void *data)
  77. {
  78. struct task_data *task_data = data;
  79. SCOPED_MUTEX(lock, &task_data->lock);
  80. if (task_data->wait_time > 0) {
  81. usleep(task_data->wait_time * 1000);
  82. }
  83. task_data->task_complete = 1;
  84. ast_cond_signal(&task_data->cond);
  85. return 0;
  86. }
  87. /*!
  88. * \brief Wait for a task to execute.
  89. */
  90. static int task_wait(struct task_data *task_data)
  91. {
  92. struct timeval start = ast_tvnow();
  93. struct timespec end;
  94. SCOPED_MUTEX(lock, &task_data->lock);
  95. end.tv_sec = start.tv_sec + 30;
  96. end.tv_nsec = start.tv_usec * 1000;
  97. while (!task_data->task_complete) {
  98. int res;
  99. res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
  100. &end);
  101. if (res == ETIMEDOUT) {
  102. return -1;
  103. }
  104. }
  105. return 0;
  106. }
  107. /*!
  108. * \brief Baseline test for default taskprocessor
  109. *
  110. * This test ensures that when a task is added to a taskprocessor that
  111. * has been allocated with a default listener that the task gets executed
  112. * as expected
  113. */
  114. AST_TEST_DEFINE(default_taskprocessor)
  115. {
  116. RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
  117. RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
  118. int res;
  119. switch (cmd) {
  120. case TEST_INIT:
  121. info->name = "default_taskprocessor";
  122. info->category = "/main/taskprocessor/";
  123. info->summary = "Test of default taskprocessor";
  124. info->description =
  125. "Ensures that a queued task gets executed.";
  126. return AST_TEST_NOT_RUN;
  127. case TEST_EXECUTE:
  128. break;
  129. }
  130. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  131. if (!tps) {
  132. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  133. return AST_TEST_FAIL;
  134. }
  135. task_data = task_data_create();
  136. if (!task_data) {
  137. ast_test_status_update(test, "Unable to create task_data\n");
  138. return AST_TEST_FAIL;
  139. }
  140. if (ast_taskprocessor_push(tps, task, task_data)) {
  141. ast_test_status_update(test, "Failed to queue task\n");
  142. return AST_TEST_FAIL;
  143. }
  144. res = task_wait(task_data);
  145. if (res != 0) {
  146. ast_test_status_update(test, "Queued task did not execute!\n");
  147. return AST_TEST_FAIL;
  148. }
  149. return AST_TEST_PASS;
  150. }
  151. /*!
  152. * \brief Baseline test for subsystem alert
  153. */
  154. AST_TEST_DEFINE(subsystem_alert)
  155. {
  156. RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
  157. #define TEST_DATA_ARRAY_SIZE 10
  158. #define LOW_WATER_MARK 3
  159. #define HIGH_WATER_MARK 6
  160. struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
  161. int res = 0;
  162. int i;
  163. long queue_count;
  164. unsigned int alert_level;
  165. unsigned int subsystem_alert_level;
  166. switch (cmd) {
  167. case TEST_INIT:
  168. info->name = "subsystem_alert";
  169. info->category = "/main/taskprocessor/";
  170. info->summary = "Test of subsystem alerts";
  171. info->description =
  172. "Ensures alerts are generated properly.";
  173. return AST_TEST_NOT_RUN;
  174. case TEST_EXECUTE:
  175. break;
  176. }
  177. tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
  178. if (!tps) {
  179. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  180. return AST_TEST_FAIL;
  181. }
  182. ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
  183. ast_taskprocessor_suspend(tps);
  184. for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
  185. task_data[i] = task_data_create();
  186. if (!task_data[i]) {
  187. ast_test_status_update(test, "Unable to create task_data\n");
  188. res = -1;
  189. goto data_cleanup;
  190. }
  191. task_data[i]->wait_time = 500;
  192. ast_test_status_update(test, "Pushing task %d\n", i);
  193. if (ast_taskprocessor_push(tps, task, task_data[i])) {
  194. ast_test_status_update(test, "Failed to queue task\n");
  195. res = -1;
  196. goto data_cleanup;
  197. }
  198. queue_count = ast_taskprocessor_size(tps);
  199. alert_level = ast_taskprocessor_alert_get();
  200. subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
  201. if (queue_count == HIGH_WATER_MARK) {
  202. if (subsystem_alert_level) {
  203. ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
  204. }
  205. if (alert_level) {
  206. ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
  207. }
  208. } else if (queue_count < HIGH_WATER_MARK) {
  209. if (subsystem_alert_level > 0) {
  210. ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
  211. res = -1;
  212. }
  213. if (alert_level > 0) {
  214. ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
  215. res = -1;
  216. }
  217. } else {
  218. if (subsystem_alert_level == 0) {
  219. ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
  220. res = -1;
  221. }
  222. if (alert_level == 0) {
  223. ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
  224. res = -1;
  225. }
  226. }
  227. }
  228. ast_taskprocessor_unsuspend(tps);
  229. for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
  230. ast_test_status_update(test, "Waiting on task %d\n", i);
  231. if (task_wait(task_data[i])) {
  232. ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
  233. res = -1;
  234. goto data_cleanup;
  235. }
  236. queue_count = ast_taskprocessor_size(tps);
  237. alert_level = ast_taskprocessor_alert_get();
  238. subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
  239. if (queue_count == LOW_WATER_MARK) {
  240. if (!subsystem_alert_level) {
  241. ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
  242. }
  243. if (!alert_level) {
  244. ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
  245. }
  246. } else if (queue_count > LOW_WATER_MARK) {
  247. if (subsystem_alert_level == 0) {
  248. ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
  249. res = -1;
  250. }
  251. if (alert_level == 0) {
  252. ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
  253. res = -1;
  254. }
  255. } else {
  256. if (subsystem_alert_level > 0) {
  257. ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
  258. res = -1;
  259. }
  260. if (alert_level > 0) {
  261. ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
  262. res = -1;
  263. }
  264. }
  265. }
  266. data_cleanup:
  267. for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
  268. ao2_cleanup(task_data[i]);
  269. }
  270. return res ? AST_TEST_FAIL : AST_TEST_PASS;
  271. }
  272. #define NUM_TASKS 20000
  273. /*!
  274. * \brief Relevant data associated with taskprocessor load test
  275. */
  276. static struct load_task_data {
  277. /*! Condition used to indicate a task has completed executing */
  278. ast_cond_t cond;
  279. /*! Lock used to protect the condition */
  280. ast_mutex_t lock;
  281. /*! Counter of the number of completed tasks */
  282. int tasks_completed;
  283. /*! Storage for task-specific data */
  284. int task_rand[NUM_TASKS];
  285. } load_task_results;
  286. /*!
  287. * \brief a queued task to be used in the taskprocessor load test
  288. *
  289. * The task increments the number of tasks executed and puts the passed-in
  290. * data into the next slot in the array of random data.
  291. */
  292. static int load_task(void *data)
  293. {
  294. int *randdata = data;
  295. SCOPED_MUTEX(lock, &load_task_results.lock);
  296. load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
  297. ast_cond_signal(&load_task_results.cond);
  298. return 0;
  299. }
  300. /*!
  301. * \brief Load test for taskprocessor with default listener
  302. *
  303. * This test queues a large number of tasks, each with random data associated.
  304. * The test ensures that all of the tasks are run and that the tasks are executed
  305. * in the same order that they were queued
  306. */
  307. AST_TEST_DEFINE(default_taskprocessor_load)
  308. {
  309. struct ast_taskprocessor *tps;
  310. struct timeval start;
  311. struct timespec ts;
  312. enum ast_test_result_state res = AST_TEST_PASS;
  313. int timedwait_res;
  314. int i;
  315. int rand_data[NUM_TASKS];
  316. switch (cmd) {
  317. case TEST_INIT:
  318. info->name = "default_taskprocessor_load";
  319. info->category = "/main/taskprocessor/";
  320. info->summary = "Load test of default taskprocessor";
  321. info->description =
  322. "Ensure that a large number of queued tasks are executed in the proper order.";
  323. return AST_TEST_NOT_RUN;
  324. case TEST_EXECUTE:
  325. break;
  326. }
  327. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  328. if (!tps) {
  329. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  330. return AST_TEST_FAIL;
  331. }
  332. start = ast_tvnow();
  333. ts.tv_sec = start.tv_sec + 60;
  334. ts.tv_nsec = start.tv_usec * 1000;
  335. ast_cond_init(&load_task_results.cond, NULL);
  336. ast_mutex_init(&load_task_results.lock);
  337. load_task_results.tasks_completed = 0;
  338. for (i = 0; i < NUM_TASKS; ++i) {
  339. rand_data[i] = ast_random();
  340. if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
  341. ast_test_status_update(test, "Failed to queue task\n");
  342. res = AST_TEST_FAIL;
  343. goto test_end;
  344. }
  345. }
  346. ast_mutex_lock(&load_task_results.lock);
  347. while (load_task_results.tasks_completed < NUM_TASKS) {
  348. timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
  349. if (timedwait_res == ETIMEDOUT) {
  350. break;
  351. }
  352. }
  353. ast_mutex_unlock(&load_task_results.lock);
  354. if (load_task_results.tasks_completed != NUM_TASKS) {
  355. ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
  356. NUM_TASKS, load_task_results.tasks_completed);
  357. res = AST_TEST_FAIL;
  358. goto test_end;
  359. }
  360. for (i = 0; i < NUM_TASKS; ++i) {
  361. if (rand_data[i] != load_task_results.task_rand[i]) {
  362. ast_test_status_update(test, "Queued tasks did not execute in order\n");
  363. res = AST_TEST_FAIL;
  364. goto test_end;
  365. }
  366. }
  367. test_end:
  368. tps = ast_taskprocessor_unreference(tps);
  369. ast_mutex_destroy(&load_task_results.lock);
  370. ast_cond_destroy(&load_task_results.cond);
  371. return res;
  372. }
  373. /*!
  374. * \brief Private data for the test taskprocessor listener
  375. */
  376. struct test_listener_pvt {
  377. /* Counter of number of tasks pushed to the queue */
  378. int num_pushed;
  379. /* Counter of number of times the queue was emptied */
  380. int num_emptied;
  381. /* Counter of number of times that a pushed task occurred on an empty queue */
  382. int num_was_empty;
  383. /* Boolean indicating whether the shutdown callback was called */
  384. int shutdown;
  385. };
  386. /*!
  387. * \brief test taskprocessor listener's alloc callback
  388. */
  389. static void *test_listener_pvt_alloc(void)
  390. {
  391. struct test_listener_pvt *pvt;
  392. pvt = ast_calloc(1, sizeof(*pvt));
  393. return pvt;
  394. }
  395. /*!
  396. * \brief test taskprocessor listener's start callback
  397. */
  398. static int test_start(struct ast_taskprocessor_listener *listener)
  399. {
  400. return 0;
  401. }
  402. /*!
  403. * \brief test taskprocessor listener's task_pushed callback
  404. *
  405. * Adjusts private data's stats as indicated by the parameters.
  406. */
  407. static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  408. {
  409. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  410. ++pvt->num_pushed;
  411. if (was_empty) {
  412. ++pvt->num_was_empty;
  413. }
  414. }
  415. /*!
  416. * \brief test taskprocessor listener's emptied callback.
  417. */
  418. static void test_emptied(struct ast_taskprocessor_listener *listener)
  419. {
  420. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  421. ++pvt->num_emptied;
  422. }
  423. /*!
  424. * \brief test taskprocessor listener's shutdown callback.
  425. */
  426. static void test_shutdown(struct ast_taskprocessor_listener *listener)
  427. {
  428. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  429. pvt->shutdown = 1;
  430. }
  431. static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
  432. .start = test_start,
  433. .task_pushed = test_task_pushed,
  434. .emptied = test_emptied,
  435. .shutdown = test_shutdown,
  436. };
  437. /*!
  438. * \brief Queued task for taskprocessor listener test.
  439. *
  440. * Does nothing.
  441. */
  442. static int listener_test_task(void *ignore)
  443. {
  444. return 0;
  445. }
  446. /*!
  447. * \brief helper to ensure that statistics the listener is keeping are what we expect
  448. *
  449. * \param test The currently-running test
  450. * \param pvt The private data for the taskprocessor listener
  451. * \param num_pushed The expected current number of tasks pushed to the processor
  452. * \param num_emptied The expected current number of times the taskprocessor has become empty
  453. * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
  454. * \retval -1 Stats were not as expected
  455. * \retval 0 Stats were as expected
  456. */
  457. static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
  458. {
  459. if (pvt->num_pushed != num_pushed) {
  460. ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
  461. num_pushed, pvt->num_pushed);
  462. return -1;
  463. }
  464. if (pvt->num_emptied != num_emptied) {
  465. ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
  466. num_emptied, pvt->num_emptied);
  467. return -1;
  468. }
  469. if (pvt->num_was_empty != num_was_empty) {
  470. ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
  471. num_was_empty, pvt->num_emptied);
  472. return -1;
  473. }
  474. return 0;
  475. }
  476. /*!
  477. * \brief Test for a taskprocessor with custom listener.
  478. *
  479. * This test pushes tasks to a taskprocessor with a custom listener, executes the tasks,
  480. * and destroys the taskprocessor.
  481. *
  482. * The test ensures that the listener's callbacks are called when expected and that the data
  483. * being passed in is accurate.
  484. */
  485. AST_TEST_DEFINE(taskprocessor_listener)
  486. {
  487. struct ast_taskprocessor *tps = NULL;
  488. struct ast_taskprocessor_listener *listener = NULL;
  489. struct test_listener_pvt *pvt = NULL;
  490. enum ast_test_result_state res = AST_TEST_PASS;
  491. switch (cmd) {
  492. case TEST_INIT:
  493. info->name = "taskprocessor_listener";
  494. info->category = "/main/taskprocessor/";
  495. info->summary = "Test of taskprocessor listeners";
  496. info->description =
  497. "Ensures that listener callbacks are called when expected.";
  498. return AST_TEST_NOT_RUN;
  499. case TEST_EXECUTE:
  500. break;
  501. }
  502. pvt = test_listener_pvt_alloc();
  503. if (!pvt) {
  504. ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
  505. return AST_TEST_FAIL;
  506. }
  507. listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
  508. if (!listener) {
  509. ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
  510. res = AST_TEST_FAIL;
  511. goto test_exit;
  512. }
  513. tps = ast_taskprocessor_create_with_listener("test_listener", listener);
  514. if (!tps) {
  515. ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
  516. res = AST_TEST_FAIL;
  517. goto test_exit;
  518. }
  519. if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
  520. ast_test_status_update(test, "Failed to queue task\n");
  521. res = AST_TEST_FAIL;
  522. goto test_exit;
  523. }
  524. if (check_stats(test, pvt, 1, 0, 1) < 0) {
  525. res = AST_TEST_FAIL;
  526. goto test_exit;
  527. }
  528. if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
  529. ast_test_status_update(test, "Failed to queue task\n");
  530. res = AST_TEST_FAIL;
  531. goto test_exit;
  532. }
  533. if (check_stats(test, pvt, 2, 0, 1) < 0) {
  534. res = AST_TEST_FAIL;
  535. goto test_exit;
  536. }
  537. ast_taskprocessor_execute(tps);
  538. if (check_stats(test, pvt, 2, 0, 1) < 0) {
  539. res = AST_TEST_FAIL;
  540. goto test_exit;
  541. }
  542. ast_taskprocessor_execute(tps);
  543. if (check_stats(test, pvt, 2, 1, 1) < 0) {
  544. res = AST_TEST_FAIL;
  545. goto test_exit;
  546. }
  547. tps = ast_taskprocessor_unreference(tps);
  548. if (!pvt->shutdown) {
  549. res = AST_TEST_FAIL;
  550. goto test_exit;
  551. }
  552. test_exit:
  553. ao2_cleanup(listener);
  554. /* This is safe even if tps is NULL */
  555. ast_taskprocessor_unreference(tps);
  556. ast_free(pvt);
  557. return res;
  558. }
  559. struct shutdown_data {
  560. ast_cond_t in;
  561. ast_cond_t out;
  562. ast_mutex_t lock;
  563. int task_complete;
  564. int task_started;
  565. int task_stop_waiting;
  566. };
  567. static void shutdown_data_dtor(void *data)
  568. {
  569. struct shutdown_data *shutdown_data = data;
  570. ast_mutex_destroy(&shutdown_data->lock);
  571. ast_cond_destroy(&shutdown_data->in);
  572. ast_cond_destroy(&shutdown_data->out);
  573. }
  574. static struct shutdown_data *shutdown_data_create(int dont_wait)
  575. {
  576. RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
  577. shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
  578. if (!shutdown_data) {
  579. return NULL;
  580. }
  581. ast_mutex_init(&shutdown_data->lock);
  582. ast_cond_init(&shutdown_data->in, NULL);
  583. ast_cond_init(&shutdown_data->out, NULL);
  584. shutdown_data->task_stop_waiting = dont_wait;
  585. ao2_ref(shutdown_data, +1);
  586. return shutdown_data;
  587. }
  588. static int shutdown_task_exec(void *data)
  589. {
  590. struct shutdown_data *shutdown_data = data;
  591. SCOPED_MUTEX(lock, &shutdown_data->lock);
  592. shutdown_data->task_started = 1;
  593. ast_cond_signal(&shutdown_data->out);
  594. while (!shutdown_data->task_stop_waiting) {
  595. ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
  596. }
  597. shutdown_data->task_complete = 1;
  598. ast_cond_signal(&shutdown_data->out);
  599. return 0;
  600. }
  601. static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
  602. {
  603. struct timeval start = ast_tvnow();
  604. struct timespec end = {
  605. .tv_sec = start.tv_sec + 5,
  606. .tv_nsec = start.tv_usec * 1000
  607. };
  608. SCOPED_MUTEX(lock, &shutdown_data->lock);
  609. while (!shutdown_data->task_complete) {
  610. if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
  611. break;
  612. }
  613. }
  614. return shutdown_data->task_complete;
  615. }
  616. static int shutdown_has_completed(struct shutdown_data *shutdown_data)
  617. {
  618. SCOPED_MUTEX(lock, &shutdown_data->lock);
  619. return shutdown_data->task_complete;
  620. }
  621. static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
  622. {
  623. struct timeval start = ast_tvnow();
  624. struct timespec end = {
  625. .tv_sec = start.tv_sec + 5,
  626. .tv_nsec = start.tv_usec * 1000
  627. };
  628. SCOPED_MUTEX(lock, &shutdown_data->lock);
  629. while (!shutdown_data->task_started) {
  630. if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
  631. break;
  632. }
  633. }
  634. return shutdown_data->task_started;
  635. }
  636. static void shutdown_poke(struct shutdown_data *shutdown_data)
  637. {
  638. SCOPED_MUTEX(lock, &shutdown_data->lock);
  639. shutdown_data->task_stop_waiting = 1;
  640. ast_cond_signal(&shutdown_data->in);
  641. }
  642. static void *tps_shutdown_thread(void *data)
  643. {
  644. struct ast_taskprocessor *tps = data;
  645. ast_taskprocessor_unreference(tps);
  646. return NULL;
  647. }
  648. AST_TEST_DEFINE(taskprocessor_shutdown)
  649. {
  650. RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
  651. RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
  652. RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
  653. int push_res;
  654. int wait_res;
  655. int pthread_res;
  656. pthread_t shutdown_thread;
  657. switch (cmd) {
  658. case TEST_INIT:
  659. info->name = "taskprocessor_shutdown";
  660. info->category = "/main/taskprocessor/";
  661. info->summary = "Test of taskprocessor shutdown sequence";
  662. info->description =
  663. "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
  664. return AST_TEST_NOT_RUN;
  665. case TEST_EXECUTE:
  666. break;
  667. }
  668. tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
  669. task1 = shutdown_data_create(0); /* task1 waits to be poked */
  670. task2 = shutdown_data_create(1); /* task2 waits for nothing */
  671. if (!tps || !task1 || !task2) {
  672. ast_test_status_update(test, "Allocation error\n");
  673. return AST_TEST_FAIL;
  674. }
  675. push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
  676. if (push_res != 0) {
  677. ast_test_status_update(test, "Could not push task1\n");
  678. return AST_TEST_FAIL;
  679. }
  680. push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
  681. if (push_res != 0) {
  682. ast_test_status_update(test, "Could not push task2\n");
  683. return AST_TEST_FAIL;
  684. }
  685. wait_res = shutdown_waitfor_start(task1);
  686. if (!wait_res) {
  687. ast_test_status_update(test, "Task1 didn't start\n");
  688. return AST_TEST_FAIL;
  689. }
  690. pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
  691. if (pthread_res != 0) {
  692. ast_test_status_update(test, "Failed to create shutdown thread\n");
  693. return AST_TEST_FAIL;
  694. }
  695. tps = NULL;
  696. /* Wakeup task1; it should complete */
  697. shutdown_poke(task1);
  698. wait_res = shutdown_waitfor_completion(task1);
  699. if (!wait_res) {
  700. ast_test_status_update(test, "Task1 didn't complete\n");
  701. return AST_TEST_FAIL;
  702. }
  703. /* Wait for shutdown to complete */
  704. pthread_join(shutdown_thread, NULL);
  705. /* Should have also completed task2 */
  706. wait_res = shutdown_has_completed(task2);
  707. if (!wait_res) {
  708. ast_test_status_update(test, "Task2 didn't finish\n");
  709. return AST_TEST_FAIL;
  710. }
  711. return AST_TEST_PASS;
  712. }
  713. static int local_task_exe(struct ast_taskprocessor_local *local)
  714. {
  715. int *local_data = local->local_data;
  716. struct task_data *task_data = local->data;
  717. *local_data = 1;
  718. task(task_data);
  719. return 0;
  720. }
  721. AST_TEST_DEFINE(taskprocessor_push_local)
  722. {
  723. RAII_VAR(struct ast_taskprocessor *, tps, NULL,
  724. ast_taskprocessor_unreference);
  725. RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
  726. int local_data;
  727. int res;
  728. switch (cmd) {
  729. case TEST_INIT:
  730. info->name = __func__;
  731. info->category = "/main/taskprocessor/";
  732. info->summary = "Test of pushing local data";
  733. info->description =
  734. "Ensures that local data is passed along.";
  735. return AST_TEST_NOT_RUN;
  736. case TEST_EXECUTE:
  737. break;
  738. }
  739. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  740. if (!tps) {
  741. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  742. return AST_TEST_FAIL;
  743. }
  744. task_data = task_data_create();
  745. if (!task_data) {
  746. ast_test_status_update(test, "Unable to create task_data\n");
  747. return AST_TEST_FAIL;
  748. }
  749. local_data = 0;
  750. ast_taskprocessor_set_local(tps, &local_data);
  751. if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
  752. ast_test_status_update(test, "Failed to queue task\n");
  753. return AST_TEST_FAIL;
  754. }
  755. res = task_wait(task_data);
  756. if (res != 0) {
  757. ast_test_status_update(test, "Queued task did not execute!\n");
  758. return AST_TEST_FAIL;
  759. }
  760. if (local_data != 1) {
  761. ast_test_status_update(test,
  762. "Queued task did not set local_data!\n");
  763. return AST_TEST_FAIL;
  764. }
  765. return AST_TEST_PASS;
  766. }
  767. /*!
  768. * \brief Baseline test for a serializer pool
  769. *
  770. * This test ensures that when a task is added to a taskprocessor that
  771. * has been allocated with a default listener that the task gets executed
  772. * as expected
  773. */
  774. AST_TEST_DEFINE(serializer_pool)
  775. {
  776. RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
  777. RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
  778. RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
  779. struct ast_threadpool_options options = {
  780. .version = AST_THREADPOOL_OPTIONS_VERSION,
  781. .idle_timeout = 0,
  782. .auto_increment = 0,
  783. .initial_size = 1,
  784. .max_size = 0,
  785. };
  786. /* struct ast_taskprocessor *tps; */
  787. switch (cmd) {
  788. case TEST_INIT:
  789. info->name = "serializer_pool";
  790. info->category = "/main/taskprocessor/";
  791. info->summary = "Test using a serializer pool";
  792. info->description =
  793. "Ensures that a queued task gets executed.";
  794. return AST_TEST_NOT_RUN;
  795. case TEST_EXECUTE:
  796. break;
  797. }
  798. ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
  799. ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
  800. "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
  801. ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
  802. ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
  803. ast_test_validate(test, task_data = task_data_create());
  804. task_data->wait_time = 4000; /* task takes 4 seconds */
  805. ast_test_validate(test, !ast_taskprocessor_push(
  806. ast_serializer_pool_get(serializer_pool), task, task_data));
  807. if (!ast_serializer_pool_destroy(serializer_pool)) {
  808. ast_test_status_update(test, "Unexpected pool destruction!\n");
  809. /*
  810. * The pool should have timed out, so if it destruction reports success
  811. * we need to fail.
  812. */
  813. serializer_pool = NULL;
  814. return AST_TEST_FAIL;
  815. }
  816. ast_test_validate(test, !task_wait(task_data));
  817. /* The first attempt should have failed. Second try should destroy successfully */
  818. if (ast_serializer_pool_destroy(serializer_pool)) {
  819. ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
  820. /*
  821. * If this fails we'll try again on return to hopefully avoid a memory leak.
  822. * If it again times out a third time, well not much we can do.
  823. */
  824. return AST_TEST_FAIL;
  825. }
  826. /* Test passed, so set pool to NULL to avoid "re-running" destroy */
  827. serializer_pool = NULL;
  828. return AST_TEST_PASS;
  829. }
  830. static int unload_module(void)
  831. {
  832. ast_test_unregister(default_taskprocessor);
  833. ast_test_unregister(default_taskprocessor_load);
  834. ast_test_unregister(subsystem_alert);
  835. ast_test_unregister(taskprocessor_listener);
  836. ast_test_unregister(taskprocessor_shutdown);
  837. ast_test_unregister(taskprocessor_push_local);
  838. ast_test_unregister(serializer_pool);
  839. return 0;
  840. }
  841. static int load_module(void)
  842. {
  843. ast_test_register(default_taskprocessor);
  844. ast_test_register(default_taskprocessor_load);
  845. ast_test_register(subsystem_alert);
  846. ast_test_register(taskprocessor_listener);
  847. ast_test_register(taskprocessor_shutdown);
  848. ast_test_register(taskprocessor_push_local);
  849. ast_test_register(serializer_pool);
  850. return AST_MODULE_LOAD_SUCCESS;
  851. }
  852. AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");