serializer.c 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2019, Sangoma Technologies Corporation
  5. *
  6. * Kevin Harwell <kharwell@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. #include "asterisk.h"
  19. #include "asterisk/astobj2.h"
  20. #include "asterisk/serializer.h"
  21. #include "asterisk/taskprocessor.h"
  22. #include "asterisk/threadpool.h"
  23. #include "asterisk/utils.h"
  24. #include "asterisk/vector.h"
  25. struct ast_serializer_pool {
  26. /*! Shutdown group to monitor serializers. */
  27. struct ast_serializer_shutdown_group *shutdown_group;
  28. /*! Time to wait if using a shutdown group. */
  29. int shutdown_group_timeout;
  30. /*! A pool of taskprocessor(s) */
  31. AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
  32. /*! Base name for the pool */
  33. char name[];
  34. };
  35. int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
  36. {
  37. if (!pool) {
  38. return 0;
  39. }
  40. /* Clear out the serializers */
  41. AST_VECTOR_RW_WRLOCK(&pool->serializers);
  42. AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);
  43. AST_VECTOR_RW_UNLOCK(&pool->serializers);
  44. /* If using a shutdown group then wait for all threads to complete */
  45. if (pool->shutdown_group) {
  46. int remaining;
  47. ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
  48. remaining = ast_serializer_shutdown_group_join(
  49. pool->shutdown_group, pool->shutdown_group_timeout);
  50. if (remaining) {
  51. /* If we've timed out don't fully cleanup yet */
  52. ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
  53. "'%d' dependencies still processing.\n", pool->name, remaining);
  54. return remaining;
  55. }
  56. ao2_ref(pool->shutdown_group, -1);
  57. pool->shutdown_group = NULL;
  58. }
  59. AST_VECTOR_RW_FREE(&pool->serializers);
  60. ast_free(pool);
  61. return 0;
  62. }
  63. struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
  64. unsigned int size, struct ast_threadpool *threadpool, int timeout)
  65. {
  66. struct ast_serializer_pool *pool;
  67. char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
  68. size_t idx;
  69. ast_assert(size > 0);
  70. pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
  71. if (!pool) {
  72. return NULL;
  73. }
  74. strcpy(pool->name, name); /* safe */
  75. pool->shutdown_group_timeout = timeout;
  76. pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
  77. AST_VECTOR_RW_INIT(&pool->serializers, size);
  78. for (idx = 0; idx < size; ++idx) {
  79. struct ast_taskprocessor *tps;
  80. /* Create name with seq number appended. */
  81. ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
  82. tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
  83. if (!tps) {
  84. ast_serializer_pool_destroy(pool);
  85. ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
  86. tps_name);
  87. return NULL;
  88. }
  89. if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
  90. ast_serializer_pool_destroy(pool);
  91. ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
  92. tps_name);
  93. return NULL;
  94. }
  95. }
  96. return pool;
  97. }
  98. const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
  99. {
  100. return pool->name;
  101. }
  102. struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)
  103. {
  104. struct ast_taskprocessor *res;
  105. size_t idx;
  106. if (!pool) {
  107. return NULL;
  108. }
  109. AST_VECTOR_RW_RDLOCK(&pool->serializers);
  110. if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
  111. AST_VECTOR_RW_UNLOCK(&pool->serializers);
  112. return NULL;
  113. }
  114. res = AST_VECTOR_GET(&pool->serializers, 0);
  115. /* Choose the taskprocessor with the smallest queue */
  116. for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
  117. struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
  118. if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {
  119. res = cur;
  120. }
  121. }
  122. AST_VECTOR_RW_UNLOCK(&pool->serializers);
  123. return res;
  124. }
  125. int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
  126. {
  127. size_t idx;
  128. long tps_queue_high;
  129. long tps_queue_low;
  130. if (!pool) {
  131. return 0;
  132. }
  133. tps_queue_high = high;
  134. if (tps_queue_high <= 0) {
  135. ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
  136. "trigger level '%ld'\n", pool->name, tps_queue_high);
  137. tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
  138. }
  139. tps_queue_low = low;
  140. if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
  141. ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
  142. "level '%ld'\n", pool->name, tps_queue_low);
  143. tps_queue_low = -1;
  144. }
  145. for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
  146. struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
  147. if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
  148. ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
  149. ast_taskprocessor_name(cur));
  150. }
  151. }
  152. return 0;
  153. }