stasis_cache.c 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis Message API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. #include "asterisk/astobj2.h"
  29. #include "asterisk/hashtab.h"
  30. #include "asterisk/stasis_internal.h"
  31. #include "asterisk/stasis.h"
  32. #include "asterisk/utils.h"
  33. #include "asterisk/vector.h"
  34. #ifdef LOW_MEMORY
  35. #define NUM_CACHE_BUCKETS 17
  36. #else
  37. #define NUM_CACHE_BUCKETS 563
  38. #endif
  39. /*! \internal */
  40. struct stasis_cache {
  41. struct ao2_container *entries;
  42. snapshot_get_id id_fn;
  43. cache_aggregate_calc_fn aggregate_calc_fn;
  44. cache_aggregate_publish_fn aggregate_publish_fn;
  45. int registered;
  46. };
  47. /*! \internal */
  48. struct stasis_caching_topic {
  49. struct stasis_cache *cache;
  50. struct stasis_topic *topic;
  51. struct stasis_topic *original_topic;
  52. struct stasis_subscription *sub;
  53. };
  54. static void stasis_caching_topic_dtor(void *obj)
  55. {
  56. struct stasis_caching_topic *caching_topic = obj;
  57. /* Caching topics contain subscriptions, and must be manually
  58. * unsubscribed. */
  59. ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
  60. /* If there are any messages in flight to this subscription; that would
  61. * be bad. */
  62. ast_assert(stasis_subscription_is_done(caching_topic->sub));
  63. ao2_container_unregister(stasis_topic_name(caching_topic->topic));
  64. ao2_cleanup(caching_topic->sub);
  65. caching_topic->sub = NULL;
  66. ao2_cleanup(caching_topic->cache);
  67. caching_topic->cache = NULL;
  68. ao2_cleanup(caching_topic->topic);
  69. caching_topic->topic = NULL;
  70. ao2_cleanup(caching_topic->original_topic);
  71. caching_topic->original_topic = NULL;
  72. }
  73. struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
  74. {
  75. return caching_topic->topic;
  76. }
  77. int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
  78. struct stasis_message_type *type)
  79. {
  80. int res;
  81. if (!caching_topic) {
  82. return -1;
  83. }
  84. /* We wait to accept the stasis specific message types until now so that by default everything
  85. * will flow to us.
  86. */
  87. res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
  88. res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
  89. res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
  90. return res;
  91. }
  92. int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
  93. enum stasis_subscription_message_filter filter)
  94. {
  95. if (!caching_topic) {
  96. return -1;
  97. }
  98. return stasis_subscription_set_filter(caching_topic->sub, filter);
  99. }
  100. struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
  101. {
  102. if (!caching_topic) {
  103. return NULL;
  104. }
  105. /*
  106. * The subscription may hold the last reference to this caching
  107. * topic, but we want to make sure the unsubscribe finishes
  108. * before kicking of the caching topic's dtor.
  109. */
  110. ao2_ref(caching_topic, +1);
  111. if (stasis_subscription_is_subscribed(caching_topic->sub)) {
  112. /*
  113. * Increment the reference to hold on to it past the
  114. * unsubscribe. Will be cleaned up in dtor.
  115. */
  116. ao2_ref(caching_topic->sub, +1);
  117. stasis_unsubscribe(caching_topic->sub);
  118. } else {
  119. ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
  120. }
  121. ao2_cleanup(caching_topic);
  122. return NULL;
  123. }
  124. struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
  125. {
  126. if (!caching_topic) {
  127. return NULL;
  128. }
  129. /* Hold a ref past the unsubscribe */
  130. ao2_ref(caching_topic, +1);
  131. stasis_caching_unsubscribe(caching_topic);
  132. stasis_subscription_join(caching_topic->sub);
  133. ao2_cleanup(caching_topic);
  134. return NULL;
  135. }
  136. /*!
  137. * \brief The key for an entry in the cache
  138. * \note The items in this struct must be immutable for the item in the cache
  139. */
  140. struct cache_entry_key {
  141. /*! The message type of the item stored in the cache */
  142. struct stasis_message_type *type;
  143. /*! The unique ID of the item stored in the cache */
  144. const char *id;
  145. /*! The hash, computed from \c type and \c id */
  146. unsigned int hash;
  147. };
  148. struct stasis_cache_entry {
  149. struct cache_entry_key key;
  150. /*! Aggregate snapshot of the stasis cache. */
  151. struct stasis_message *aggregate;
  152. /*! Local entity snapshot of the stasis event. */
  153. struct stasis_message *local;
  154. /*! Remote entity snapshots of the stasis event. */
  155. AST_VECTOR(, struct stasis_message *) remote;
  156. };
  157. static void cache_entry_dtor(void *obj)
  158. {
  159. struct stasis_cache_entry *entry = obj;
  160. size_t idx;
  161. entry->key.type = NULL;
  162. ast_free((char *) entry->key.id);
  163. entry->key.id = NULL;
  164. ao2_cleanup(entry->aggregate);
  165. entry->aggregate = NULL;
  166. ao2_cleanup(entry->local);
  167. entry->local = NULL;
  168. for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  169. struct stasis_message *remote;
  170. remote = AST_VECTOR_GET(&entry->remote, idx);
  171. ao2_cleanup(remote);
  172. }
  173. AST_VECTOR_FREE(&entry->remote);
  174. }
  175. static void cache_entry_compute_hash(struct cache_entry_key *key)
  176. {
  177. key->hash = stasis_message_type_hash(key->type);
  178. key->hash += ast_hashtab_hash_string(key->id);
  179. }
  180. static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
  181. {
  182. struct stasis_cache_entry *entry;
  183. int is_remote;
  184. ast_assert(id != NULL);
  185. ast_assert(snapshot != NULL);
  186. if (!type) {
  187. return NULL;
  188. }
  189. entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
  190. AO2_ALLOC_OPT_LOCK_NOLOCK);
  191. if (!entry) {
  192. return NULL;
  193. }
  194. entry->key.id = ast_strdup(id);
  195. if (!entry->key.id) {
  196. ao2_cleanup(entry);
  197. return NULL;
  198. }
  199. /*
  200. * Normal ao2 ref counting rules says we should increment the message
  201. * type ref here and decrement it in cache_entry_dtor(). However, the
  202. * stasis message snapshot is cached here, will always have the same type
  203. * as the cache entry, and can legitimately cause the type ref count to
  204. * hit the excessive ref count assertion. Since the cache entry will
  205. * always have a snapshot we can get away with not holding a ref here.
  206. */
  207. ast_assert(type == stasis_message_type(snapshot));
  208. entry->key.type = type;
  209. cache_entry_compute_hash(&entry->key);
  210. is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
  211. if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
  212. ao2_cleanup(entry);
  213. return NULL;
  214. }
  215. if (is_remote) {
  216. if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
  217. ao2_cleanup(entry);
  218. return NULL;
  219. }
  220. } else {
  221. entry->local = snapshot;
  222. }
  223. ao2_bump(snapshot);
  224. return entry;
  225. }
  226. static int cache_entry_hash(const void *obj, int flags)
  227. {
  228. const struct stasis_cache_entry *object;
  229. const struct cache_entry_key *key;
  230. switch (flags & OBJ_SEARCH_MASK) {
  231. case OBJ_SEARCH_KEY:
  232. key = obj;
  233. break;
  234. case OBJ_SEARCH_OBJECT:
  235. object = obj;
  236. key = &object->key;
  237. break;
  238. default:
  239. /* Hash can only work on something with a full key. */
  240. ast_assert(0);
  241. return 0;
  242. }
  243. return (int)key->hash;
  244. }
  245. static int cache_entry_cmp(void *obj, void *arg, int flags)
  246. {
  247. const struct stasis_cache_entry *object_left = obj;
  248. const struct stasis_cache_entry *object_right = arg;
  249. const struct cache_entry_key *right_key = arg;
  250. int cmp;
  251. switch (flags & OBJ_SEARCH_MASK) {
  252. case OBJ_SEARCH_OBJECT:
  253. right_key = &object_right->key;
  254. /* Fall through */
  255. case OBJ_SEARCH_KEY:
  256. cmp = object_left->key.type != right_key->type
  257. || strcmp(object_left->key.id, right_key->id);
  258. break;
  259. case OBJ_SEARCH_PARTIAL_KEY:
  260. /* Not supported by container */
  261. ast_assert(0);
  262. cmp = -1;
  263. break;
  264. default:
  265. /*
  266. * What arg points to is specific to this traversal callback
  267. * and has no special meaning to astobj2.
  268. */
  269. cmp = 0;
  270. break;
  271. }
  272. if (cmp) {
  273. return 0;
  274. }
  275. /*
  276. * At this point the traversal callback is identical to a sorted
  277. * container.
  278. */
  279. return CMP_MATCH;
  280. }
  281. static void cache_dtor(void *obj)
  282. {
  283. struct stasis_cache *cache = obj;
  284. ao2_cleanup(cache->entries);
  285. cache->entries = NULL;
  286. }
  287. struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
  288. cache_aggregate_calc_fn aggregate_calc_fn,
  289. cache_aggregate_publish_fn aggregate_publish_fn)
  290. {
  291. struct stasis_cache *cache;
  292. cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
  293. AO2_ALLOC_OPT_LOCK_NOLOCK);
  294. if (!cache) {
  295. return NULL;
  296. }
  297. cache->entries = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
  298. NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
  299. if (!cache->entries) {
  300. ao2_cleanup(cache);
  301. return NULL;
  302. }
  303. cache->id_fn = id_fn;
  304. cache->aggregate_calc_fn = aggregate_calc_fn;
  305. cache->aggregate_publish_fn = aggregate_publish_fn;
  306. return cache;
  307. }
  308. struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
  309. {
  310. return stasis_cache_create_full(id_fn, NULL, NULL);
  311. }
  312. struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
  313. {
  314. return entry->aggregate;
  315. }
  316. struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
  317. {
  318. return entry->local;
  319. }
  320. struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
  321. {
  322. if (idx < AST_VECTOR_SIZE(&entry->remote)) {
  323. return AST_VECTOR_GET(&entry->remote, idx);
  324. }
  325. return NULL;
  326. }
  327. /*!
  328. * \internal
  329. * \brief Find the cache entry in the cache entries container.
  330. *
  331. * \param entries Container of cached entries.
  332. * \param type Type of message to retrieve the cache entry.
  333. * \param id Identity of the snapshot to retrieve the cache entry.
  334. *
  335. * \note The entries container is already locked.
  336. *
  337. * \return Cache-entry on success.
  338. * \retval NULL Not in cache.
  339. */
  340. static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
  341. {
  342. struct cache_entry_key search_key;
  343. struct stasis_cache_entry *entry;
  344. search_key.type = type;
  345. search_key.id = id;
  346. cache_entry_compute_hash(&search_key);
  347. entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  348. /* Ensure that what we looked for is what we found. */
  349. ast_assert(!entry
  350. || (!strcmp(stasis_message_type_name(entry->key.type),
  351. stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
  352. return entry;
  353. }
  354. /*!
  355. * \internal
  356. * \brief Remove the stasis snapshot in the cache entry determined by eid.
  357. *
  358. * \param entries Container of cached entries.
  359. * \param cached_entry The entry to remove the snapshot from.
  360. * \param eid Which snapshot in the cached entry.
  361. *
  362. * \note The entries container is already locked.
  363. *
  364. * \return Previous stasis entry snapshot.
  365. */
  366. static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
  367. {
  368. struct stasis_message *old_snapshot;
  369. int is_remote;
  370. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  371. if (!is_remote) {
  372. old_snapshot = cached_entry->local;
  373. cached_entry->local = NULL;
  374. } else {
  375. int idx;
  376. old_snapshot = NULL;
  377. for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
  378. struct stasis_message *cur;
  379. cur = AST_VECTOR_GET(&cached_entry->remote, idx);
  380. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  381. old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
  382. break;
  383. }
  384. }
  385. }
  386. if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
  387. ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
  388. }
  389. return old_snapshot;
  390. }
  391. /*!
  392. * \internal
  393. * \brief Update the stasis snapshot in the cache entry determined by eid.
  394. *
  395. * \param cached_entry The entry to remove the snapshot from.
  396. * \param eid Which snapshot in the cached entry.
  397. * \param new_snapshot Snapshot to replace the old snapshot.
  398. *
  399. * \return Previous stasis entry snapshot.
  400. */
  401. static struct stasis_message *cache_update(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
  402. {
  403. struct stasis_message *old_snapshot;
  404. int is_remote;
  405. int idx;
  406. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  407. if (!is_remote) {
  408. old_snapshot = cached_entry->local;
  409. cached_entry->local = ao2_bump(new_snapshot);
  410. return old_snapshot;
  411. }
  412. old_snapshot = NULL;
  413. for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
  414. struct stasis_message *cur;
  415. cur = AST_VECTOR_GET(&cached_entry->remote, idx);
  416. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  417. old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
  418. break;
  419. }
  420. }
  421. if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
  422. ao2_bump(new_snapshot);
  423. }
  424. return old_snapshot;
  425. }
  426. struct cache_put_snapshots {
  427. /*! Old cache eid snapshot. */
  428. struct stasis_message *old;
  429. /*! Old cache aggregate snapshot. */
  430. struct stasis_message *aggregate_old;
  431. /*! New cache aggregate snapshot. */
  432. struct stasis_message *aggregate_new;
  433. };
  434. static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
  435. struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
  436. struct stasis_message *new_snapshot)
  437. {
  438. struct stasis_cache_entry *cached_entry;
  439. struct cache_put_snapshots snapshots;
  440. ast_assert(cache->entries != NULL);
  441. ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
  442. ast_assert(new_snapshot == NULL ||
  443. type == stasis_message_type(new_snapshot));
  444. memset(&snapshots, 0, sizeof(snapshots));
  445. ao2_wrlock(cache->entries);
  446. cached_entry = cache_find(cache->entries, type, id);
  447. /* Update the eid snapshot. */
  448. if (!new_snapshot) {
  449. /* Remove snapshot from cache */
  450. if (cached_entry) {
  451. snapshots.old = cache_remove(cache->entries, cached_entry, eid);
  452. }
  453. } else if (cached_entry) {
  454. /* Update snapshot in cache */
  455. snapshots.old = cache_update(cached_entry, eid, new_snapshot);
  456. } else {
  457. /* Insert into the cache */
  458. cached_entry = cache_entry_create(type, id, new_snapshot);
  459. if (cached_entry) {
  460. ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
  461. }
  462. }
  463. /* Update the aggregate snapshot. */
  464. if (cache->aggregate_calc_fn && cached_entry) {
  465. snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
  466. snapshots.aggregate_old = cached_entry->aggregate;
  467. cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
  468. }
  469. ao2_unlock(cache->entries);
  470. ao2_cleanup(cached_entry);
  471. return snapshots;
  472. }
  473. /*!
  474. * \internal
  475. * \brief Dump all entity snapshots in the cache entry into the given container.
  476. *
  477. * \param snapshots Container to put all snapshots in the cache entry.
  478. * \param entry Cache entry to use.
  479. *
  480. * \retval 0 on success.
  481. * \retval non-zero on error.
  482. */
  483. static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
  484. {
  485. int idx;
  486. int err = 0;
  487. ast_assert(snapshots != NULL);
  488. ast_assert(entry != NULL);
  489. /* The aggregate snapshot is not a snapshot from an entity. */
  490. if (entry->local) {
  491. err |= !ao2_link(snapshots, entry->local);
  492. }
  493. for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  494. struct stasis_message *snapshot;
  495. snapshot = AST_VECTOR_GET(&entry->remote, idx);
  496. err |= !ao2_link(snapshots, snapshot);
  497. }
  498. return err;
  499. }
  500. struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
  501. {
  502. struct stasis_cache_entry *cached_entry;
  503. struct ao2_container *found;
  504. ast_assert(cache != NULL);
  505. ast_assert(cache->entries != NULL);
  506. ast_assert(id != NULL);
  507. if (!type) {
  508. return NULL;
  509. }
  510. found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  511. if (!found) {
  512. return NULL;
  513. }
  514. ao2_rdlock(cache->entries);
  515. cached_entry = cache_find(cache->entries, type, id);
  516. if (cached_entry && cache_entry_dump(found, cached_entry)) {
  517. ao2_cleanup(found);
  518. found = NULL;
  519. }
  520. ao2_unlock(cache->entries);
  521. ao2_cleanup(cached_entry);
  522. return found;
  523. }
  524. /*!
  525. * \internal
  526. * \brief Retrieve an item from the cache entry for a specific eid.
  527. *
  528. * \param entry Cache entry to use.
  529. * \param eid Specific entity id to retrieve. NULL for aggregate.
  530. *
  531. * \note The returned snapshot has not had its reference bumped.
  532. *
  533. * \return Snapshot from the cache.
  534. * \retval NULL if snapshot is not found.
  535. */
  536. static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
  537. {
  538. int is_remote;
  539. int idx;
  540. if (!eid) {
  541. /* Get aggregate. */
  542. return entry->aggregate;
  543. }
  544. /* Get snapshot with specific eid. */
  545. is_remote = ast_eid_cmp(eid, &ast_eid_default);
  546. if (!is_remote) {
  547. return entry->local;
  548. }
  549. for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
  550. struct stasis_message *cur;
  551. cur = AST_VECTOR_GET(&entry->remote, idx);
  552. if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
  553. return cur;
  554. }
  555. }
  556. return NULL;
  557. }
  558. struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
  559. {
  560. struct stasis_cache_entry *cached_entry;
  561. struct stasis_message *snapshot = NULL;
  562. ast_assert(cache != NULL);
  563. ast_assert(cache->entries != NULL);
  564. ast_assert(id != NULL);
  565. if (!type) {
  566. return NULL;
  567. }
  568. ao2_rdlock(cache->entries);
  569. cached_entry = cache_find(cache->entries, type, id);
  570. if (cached_entry) {
  571. snapshot = cache_entry_by_eid(cached_entry, eid);
  572. ao2_bump(snapshot);
  573. }
  574. ao2_unlock(cache->entries);
  575. ao2_cleanup(cached_entry);
  576. return snapshot;
  577. }
  578. struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
  579. {
  580. return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
  581. }
  582. struct cache_dump_data {
  583. struct ao2_container *container;
  584. struct stasis_message_type *type;
  585. const struct ast_eid *eid;
  586. };
  587. static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
  588. {
  589. struct cache_dump_data *cache_dump = arg;
  590. struct stasis_cache_entry *entry = obj;
  591. if (!cache_dump->type || entry->key.type == cache_dump->type) {
  592. struct stasis_message *snapshot;
  593. snapshot = cache_entry_by_eid(entry, cache_dump->eid);
  594. if (snapshot) {
  595. if (!ao2_link(cache_dump->container, snapshot)) {
  596. ao2_cleanup(cache_dump->container);
  597. cache_dump->container = NULL;
  598. return CMP_STOP;
  599. }
  600. }
  601. }
  602. return 0;
  603. }
  604. struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
  605. {
  606. struct cache_dump_data cache_dump;
  607. ast_assert(cache != NULL);
  608. ast_assert(cache->entries != NULL);
  609. cache_dump.eid = eid;
  610. cache_dump.type = type;
  611. cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  612. if (!cache_dump.container) {
  613. return NULL;
  614. }
  615. ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
  616. return cache_dump.container;
  617. }
  618. struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
  619. {
  620. return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
  621. }
  622. static int cache_dump_all_cb(void *obj, void *arg, int flags)
  623. {
  624. struct cache_dump_data *cache_dump = arg;
  625. struct stasis_cache_entry *entry = obj;
  626. if (!cache_dump->type || entry->key.type == cache_dump->type) {
  627. if (cache_entry_dump(cache_dump->container, entry)) {
  628. ao2_cleanup(cache_dump->container);
  629. cache_dump->container = NULL;
  630. return CMP_STOP;
  631. }
  632. }
  633. return 0;
  634. }
  635. struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
  636. {
  637. struct cache_dump_data cache_dump;
  638. ast_assert(cache != NULL);
  639. ast_assert(cache->entries != NULL);
  640. cache_dump.eid = NULL;
  641. cache_dump.type = type;
  642. cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
  643. if (!cache_dump.container) {
  644. return NULL;
  645. }
  646. ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
  647. return cache_dump.container;
  648. }
  649. STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
  650. STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
  651. struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
  652. {
  653. return stasis_message_create(stasis_cache_clear_type(), id_message);
  654. }
  655. static void stasis_cache_update_dtor(void *obj)
  656. {
  657. struct stasis_cache_update *update = obj;
  658. ao2_cleanup(update->old_snapshot);
  659. update->old_snapshot = NULL;
  660. ao2_cleanup(update->new_snapshot);
  661. update->new_snapshot = NULL;
  662. ao2_cleanup(update->type);
  663. update->type = NULL;
  664. }
  665. static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
  666. {
  667. struct stasis_cache_update *update;
  668. struct stasis_message *msg;
  669. ast_assert(old_snapshot != NULL || new_snapshot != NULL);
  670. if (!stasis_cache_update_type()) {
  671. return NULL;
  672. }
  673. update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
  674. AO2_ALLOC_OPT_LOCK_NOLOCK);
  675. if (!update) {
  676. return NULL;
  677. }
  678. if (old_snapshot) {
  679. ao2_ref(old_snapshot, +1);
  680. update->old_snapshot = old_snapshot;
  681. if (!new_snapshot) {
  682. ao2_ref(stasis_message_type(old_snapshot), +1);
  683. update->type = stasis_message_type(old_snapshot);
  684. }
  685. }
  686. if (new_snapshot) {
  687. ao2_ref(new_snapshot, +1);
  688. update->new_snapshot = new_snapshot;
  689. ao2_ref(stasis_message_type(new_snapshot), +1);
  690. update->type = stasis_message_type(new_snapshot);
  691. }
  692. msg = stasis_message_create(stasis_cache_update_type(), update);
  693. ao2_cleanup(update);
  694. return msg;
  695. }
  696. static void caching_topic_exec(void *data, struct stasis_subscription *sub,
  697. struct stasis_message *message)
  698. {
  699. struct stasis_caching_topic *caching_topic_needs_unref;
  700. struct stasis_caching_topic *caching_topic = data;
  701. struct stasis_message *msg;
  702. struct stasis_message *msg_put;
  703. struct stasis_message_type *msg_type;
  704. const struct ast_eid *msg_eid;
  705. const char *msg_id;
  706. ast_assert(caching_topic != NULL);
  707. ast_assert(caching_topic->topic != NULL);
  708. ast_assert(caching_topic->cache != NULL);
  709. ast_assert(caching_topic->cache->id_fn != NULL);
  710. if (stasis_subscription_final_message(sub, message)) {
  711. caching_topic_needs_unref = caching_topic;
  712. } else {
  713. caching_topic_needs_unref = NULL;
  714. }
  715. msg_type = stasis_message_type(message);
  716. if (stasis_subscription_change_type() == msg_type) {
  717. struct stasis_subscription_change *change = stasis_message_data(message);
  718. /*
  719. * If this change type is an unsubscribe, we need to find the original
  720. * subscribe and remove it from the cache otherwise the cache will
  721. * continue to grow unabated.
  722. */
  723. if (strcmp(change->description, "Unsubscribe") == 0) {
  724. struct stasis_cache_entry *cached_sub;
  725. ao2_wrlock(caching_topic->cache->entries);
  726. cached_sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
  727. if (cached_sub) {
  728. ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub, stasis_message_eid(message)));
  729. ao2_cleanup(cached_sub);
  730. }
  731. ao2_unlock(caching_topic->cache->entries);
  732. ao2_cleanup(caching_topic_needs_unref);
  733. return;
  734. }
  735. msg_put = message;
  736. msg = message;
  737. } else if (stasis_cache_clear_type() == msg_type) {
  738. /* Cache clear event. */
  739. msg_put = NULL;
  740. msg = stasis_message_data(message);
  741. msg_type = stasis_message_type(msg);
  742. } else {
  743. /* Normal cache update event. */
  744. msg_put = message;
  745. msg = message;
  746. }
  747. ast_assert(msg_type != NULL);
  748. msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
  749. msg_id = caching_topic->cache->id_fn(msg);
  750. if (msg_id && msg_eid) {
  751. struct stasis_message *update;
  752. struct cache_put_snapshots snapshots;
  753. /* Update the cache */
  754. snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
  755. if (snapshots.old || msg_put) {
  756. if (stasis_topic_subscribers(caching_topic->topic)) {
  757. update = update_create(snapshots.old, msg_put);
  758. if (update) {
  759. stasis_publish(caching_topic->topic, update);
  760. ao2_ref(update, -1);
  761. }
  762. }
  763. } else {
  764. ast_debug(1,
  765. "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
  766. stasis_topic_name(caching_topic->topic),
  767. stasis_message_type_name(msg_type), msg_id);
  768. }
  769. if (snapshots.aggregate_old != snapshots.aggregate_new) {
  770. if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
  771. caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
  772. snapshots.aggregate_new);
  773. }
  774. if (stasis_topic_subscribers(caching_topic->topic)) {
  775. update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
  776. if (update) {
  777. stasis_publish(caching_topic->topic, update);
  778. ao2_ref(update, -1);
  779. }
  780. }
  781. }
  782. ao2_cleanup(snapshots.old);
  783. ao2_cleanup(snapshots.aggregate_old);
  784. ao2_cleanup(snapshots.aggregate_new);
  785. }
  786. ao2_cleanup(caching_topic_needs_unref);
  787. }
  788. static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
  789. {
  790. struct stasis_cache_entry *entry = v_obj;
  791. if (!entry) {
  792. return;
  793. }
  794. prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
  795. entry->key.id, entry->key.hash);
  796. }
  797. struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
  798. {
  799. struct stasis_caching_topic *caching_topic;
  800. static int caching_id;
  801. char *new_name;
  802. int ret;
  803. ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
  804. if (ret < 0) {
  805. return NULL;
  806. }
  807. caching_topic = ao2_alloc_options(sizeof(*caching_topic),
  808. stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  809. if (caching_topic == NULL) {
  810. ast_free(new_name);
  811. return NULL;
  812. }
  813. caching_topic->topic = stasis_topic_create(new_name);
  814. if (caching_topic->topic == NULL) {
  815. ao2_ref(caching_topic, -1);
  816. ast_free(new_name);
  817. return NULL;
  818. }
  819. ao2_ref(cache, +1);
  820. caching_topic->cache = cache;
  821. if (!cache->registered) {
  822. if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
  823. ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
  824. cache->entries, new_name);
  825. } else {
  826. cache->registered = 1;
  827. }
  828. }
  829. ast_free(new_name);
  830. caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
  831. if (caching_topic->sub == NULL) {
  832. ao2_ref(caching_topic, -1);
  833. return NULL;
  834. }
  835. ao2_ref(original_topic, +1);
  836. caching_topic->original_topic = original_topic;
  837. /* The subscription holds the reference, so no additional ref bump. */
  838. return caching_topic;
  839. }
  840. static void stasis_cache_cleanup(void)
  841. {
  842. STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
  843. STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
  844. }
  845. int stasis_cache_init(void)
  846. {
  847. ast_register_cleanup(stasis_cache_cleanup);
  848. if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
  849. return -1;
  850. }
  851. if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
  852. return -1;
  853. }
  854. return 0;
  855. }