123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2019, Sangoma Technologies Corporation
- *
- * Kevin Harwell <kharwell@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- #include "asterisk.h"
- #include "asterisk/astobj2.h"
- #include "asterisk/serializer.h"
- #include "asterisk/taskprocessor.h"
- #include "asterisk/threadpool.h"
- #include "asterisk/utils.h"
- #include "asterisk/vector.h"
- struct ast_serializer_pool {
- /*! Shutdown group to monitor serializers. */
- struct ast_serializer_shutdown_group *shutdown_group;
- /*! Time to wait if using a shutdown group. */
- int shutdown_group_timeout;
- /*! A pool of taskprocessor(s) */
- AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
- /*! Base name for the pool */
- char name[];
- };
- int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
- {
- if (!pool) {
- return 0;
- }
- /* Clear out the serializers */
- AST_VECTOR_RW_WRLOCK(&pool->serializers);
- AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);
- AST_VECTOR_RW_UNLOCK(&pool->serializers);
- /* If using a shutdown group then wait for all threads to complete */
- if (pool->shutdown_group) {
- int remaining;
- ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
- remaining = ast_serializer_shutdown_group_join(
- pool->shutdown_group, pool->shutdown_group_timeout);
- if (remaining) {
- /* If we've timed out don't fully cleanup yet */
- ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
- "'%d' dependencies still processing.\n", pool->name, remaining);
- return remaining;
- }
- ao2_ref(pool->shutdown_group, -1);
- pool->shutdown_group = NULL;
- }
- AST_VECTOR_RW_FREE(&pool->serializers);
- ast_free(pool);
- return 0;
- }
- struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
- unsigned int size, struct ast_threadpool *threadpool, int timeout)
- {
- struct ast_serializer_pool *pool;
- char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
- size_t idx;
- ast_assert(size > 0);
- pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
- if (!pool) {
- return NULL;
- }
- strcpy(pool->name, name); /* safe */
- pool->shutdown_group_timeout = timeout;
- pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
- AST_VECTOR_RW_INIT(&pool->serializers, size);
- for (idx = 0; idx < size; ++idx) {
- struct ast_taskprocessor *tps;
- /* Create name with seq number appended. */
- ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
- tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
- if (!tps) {
- ast_serializer_pool_destroy(pool);
- ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
- tps_name);
- return NULL;
- }
- if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
- ast_serializer_pool_destroy(pool);
- ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
- tps_name);
- return NULL;
- }
- }
- return pool;
- }
- const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
- {
- return pool->name;
- }
- struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)
- {
- struct ast_taskprocessor *res;
- size_t idx;
- if (!pool) {
- return NULL;
- }
- AST_VECTOR_RW_RDLOCK(&pool->serializers);
- if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
- AST_VECTOR_RW_UNLOCK(&pool->serializers);
- return NULL;
- }
- res = AST_VECTOR_GET(&pool->serializers, 0);
- /* Choose the taskprocessor with the smallest queue */
- for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
- struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
- if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {
- res = cur;
- }
- }
- AST_VECTOR_RW_UNLOCK(&pool->serializers);
- return res;
- }
- int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
- {
- size_t idx;
- long tps_queue_high;
- long tps_queue_low;
- if (!pool) {
- return 0;
- }
- tps_queue_high = high;
- if (tps_queue_high <= 0) {
- ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
- "trigger level '%ld'\n", pool->name, tps_queue_high);
- tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
- }
- tps_queue_low = low;
- if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
- ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
- "level '%ld'\n", pool->name, tps_queue_low);
- tps_queue_low = -1;
- }
- for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
- struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
- if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
- ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
- ast_taskprocessor_name(cur));
- }
- }
- return 0;
- }
|