stasis_endpoints.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. #include "asterisk/astobj2.h"
  29. #include "asterisk/stasis.h"
  30. #include "asterisk/stasis_endpoints.h"
  31. /*** DOCUMENTATION
  32. <managerEvent language="en_US" name="PeerStatus">
  33. <managerEventInstance class="EVENT_FLAG_SYSTEM">
  34. <synopsis>Raised when the state of a peer changes.</synopsis>
  35. <syntax>
  36. <parameter name="ChannelType">
  37. <para>The channel technology of the peer.</para>
  38. </parameter>
  39. <parameter name="Peer">
  40. <para>The name of the peer (including channel technology).</para>
  41. </parameter>
  42. <parameter name="PeerStatus">
  43. <para>New status of the peer.</para>
  44. <enumlist>
  45. <enum name="Unknown"/>
  46. <enum name="Registered"/>
  47. <enum name="Unregistered"/>
  48. <enum name="Rejected"/>
  49. <enum name="Lagged"/>
  50. </enumlist>
  51. </parameter>
  52. <parameter name="Cause">
  53. <para>The reason the status has changed.</para>
  54. </parameter>
  55. <parameter name="Address">
  56. <para>New address of the peer.</para>
  57. </parameter>
  58. <parameter name="Port">
  59. <para>New port for the peer.</para>
  60. </parameter>
  61. <parameter name="Time">
  62. <para>Time it takes to reach the peer and receive a response.</para>
  63. </parameter>
  64. </syntax>
  65. </managerEventInstance>
  66. </managerEvent>
  67. <managerEvent language="en_US" name="ContactStatus">
  68. <managerEventInstance class="EVENT_FLAG_SYSTEM">
  69. <synopsis>Raised when the state of a contact changes.</synopsis>
  70. <syntax>
  71. <parameter name="URI">
  72. <para>This contact's URI.</para>
  73. </parameter>
  74. <parameter name="ContactStatus">
  75. <para>New status of the contact.</para>
  76. <enumlist>
  77. <enum name="Unknown"/>
  78. <enum name="Unreachable"/>
  79. <enum name="Reachable"/>
  80. <enum name="Unqualified"/>
  81. <enum name="Removed"/>
  82. <enum name="Updated"/>
  83. </enumlist>
  84. </parameter>
  85. <parameter name="AOR">
  86. <para>The name of the associated aor.</para>
  87. </parameter>
  88. <parameter name="EndpointName">
  89. <para>The name of the associated endpoint.</para>
  90. </parameter>
  91. <parameter name="RoundtripUsec">
  92. <para>The RTT measured during the last qualify.</para>
  93. </parameter>
  94. </syntax>
  95. </managerEventInstance>
  96. </managerEvent>
  97. ***/
  98. static struct stasis_cp_all *endpoint_cache_all;
  99. struct stasis_cp_all *ast_endpoint_cache_all(void)
  100. {
  101. return endpoint_cache_all;
  102. }
  103. struct stasis_cache *ast_endpoint_cache(void)
  104. {
  105. return stasis_cp_all_cache(endpoint_cache_all);
  106. }
  107. struct stasis_topic *ast_endpoint_topic_all(void)
  108. {
  109. return stasis_cp_all_topic(endpoint_cache_all);
  110. }
  111. struct stasis_topic *ast_endpoint_topic_all_cached(void)
  112. {
  113. return stasis_cp_all_topic_cached(endpoint_cache_all);
  114. }
  115. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
  116. static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
  117. {
  118. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  119. RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
  120. const char *value;
  121. /* peer_status is the only *required* thing */
  122. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
  123. return NULL;
  124. }
  125. ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
  126. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
  127. ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
  128. }
  129. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
  130. ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
  131. }
  132. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
  133. ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
  134. }
  135. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
  136. ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
  137. }
  138. return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
  139. "ChannelType: %s\r\n"
  140. "Peer: %s/%s\r\n"
  141. "%s",
  142. obj->snapshot->tech,
  143. obj->snapshot->tech,
  144. obj->snapshot->resource,
  145. ast_str_buffer(peerstatus_event_string));
  146. }
  147. static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
  148. {
  149. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  150. struct ast_json *json_endpoint;
  151. struct ast_json *json_peer;
  152. struct ast_json *json_final;
  153. const struct timeval *tv = stasis_message_timestamp(msg);
  154. json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
  155. if (!json_endpoint) {
  156. return NULL;
  157. }
  158. json_peer = ast_json_object_create();
  159. if (!json_peer) {
  160. ast_json_unref(json_endpoint);
  161. return NULL;
  162. }
  163. /* Copy all fields from the blob */
  164. ast_json_object_update(json_peer, obj->blob);
  165. json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
  166. "type", "PeerStatusChange",
  167. "timestamp", ast_json_timeval(*tv, NULL),
  168. "endpoint", json_endpoint,
  169. "peer", json_peer);
  170. if (!json_final) {
  171. ast_json_unref(json_endpoint);
  172. ast_json_unref(json_peer);
  173. }
  174. return json_final;
  175. }
  176. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
  177. .to_ami = peerstatus_to_ami,
  178. .to_json = peerstatus_to_json,
  179. );
  180. static struct ast_manager_event_blob *contactstatus_to_ami(struct stasis_message *msg)
  181. {
  182. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  183. RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
  184. const char *value;
  185. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
  186. return NULL;
  187. }
  188. ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
  189. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
  190. return NULL;
  191. }
  192. ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
  193. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
  194. return NULL;
  195. }
  196. ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
  197. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
  198. return NULL;
  199. }
  200. ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
  201. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
  202. ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
  203. }
  204. return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
  205. "%s", ast_str_buffer(contactstatus_event_string));
  206. }
  207. static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
  208. {
  209. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  210. struct ast_json *json_endpoint;
  211. struct ast_json *json_final;
  212. const char *rtt;
  213. const struct timeval *tv = stasis_message_timestamp(msg);
  214. json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
  215. if (!json_endpoint) {
  216. return NULL;
  217. }
  218. /* The roundtrip time is optional. */
  219. rtt = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec"));
  220. if (!ast_strlen_zero(rtt)) {
  221. json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
  222. "type", "ContactStatusChange",
  223. "timestamp", ast_json_timeval(*tv, NULL),
  224. "endpoint", json_endpoint,
  225. "contact_info",
  226. "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
  227. "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
  228. "contact_status")),
  229. "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
  230. "roundtrip_usec", rtt);
  231. } else {
  232. json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s } } ",
  233. "type", "ContactStatusChange",
  234. "timestamp", ast_json_timeval(*tv, NULL),
  235. "endpoint", json_endpoint,
  236. "contact_info",
  237. "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
  238. "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
  239. "contact_status")),
  240. "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")));
  241. }
  242. if (!json_final) {
  243. ast_json_unref(json_endpoint);
  244. }
  245. return json_final;
  246. }
  247. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_contact_state_type,
  248. .to_ami = contactstatus_to_ami,
  249. .to_json = contactstatus_to_json
  250. );
  251. static void endpoint_blob_dtor(void *obj)
  252. {
  253. struct ast_endpoint_blob *event = obj;
  254. ao2_cleanup(event->snapshot);
  255. ast_json_unref(event->blob);
  256. }
  257. struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
  258. struct stasis_message_type *type, struct ast_json *blob)
  259. {
  260. struct ast_endpoint_blob *obj;
  261. struct stasis_message *msg;
  262. if (!type) {
  263. return NULL;
  264. }
  265. if (!blob) {
  266. blob = ast_json_null();
  267. }
  268. if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
  269. return NULL;
  270. }
  271. if (endpoint) {
  272. if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
  273. ao2_ref(obj, -1);
  274. return NULL;
  275. }
  276. }
  277. obj->blob = ast_json_ref(blob);
  278. msg = stasis_message_create(type, obj);
  279. ao2_ref(obj, -1);
  280. return msg;
  281. }
  282. void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
  283. struct ast_json *blob)
  284. {
  285. struct stasis_message *message;
  286. if (!blob) {
  287. return;
  288. }
  289. message = ast_endpoint_blob_create(endpoint, type, blob);
  290. if (message) {
  291. stasis_publish(ast_endpoint_topic(endpoint), message);
  292. ao2_ref(message, -1);
  293. }
  294. }
  295. struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
  296. const char *name)
  297. {
  298. char *id = NULL;
  299. struct stasis_message *msg;
  300. struct ast_endpoint_snapshot *snapshot;
  301. if (ast_strlen_zero(name)) {
  302. ast_asprintf(&id, "%s", tech);
  303. } else {
  304. ast_asprintf(&id, "%s/%s", tech, name);
  305. }
  306. if (!id) {
  307. return NULL;
  308. }
  309. ast_tech_to_upper(id);
  310. msg = stasis_cache_get(ast_endpoint_cache(), ast_endpoint_snapshot_type(), id);
  311. ast_free(id);
  312. if (!msg) {
  313. return NULL;
  314. }
  315. snapshot = stasis_message_data(msg);
  316. ast_assert(snapshot != NULL);
  317. ao2_ref(snapshot, +1);
  318. ao2_ref(msg, -1);
  319. return snapshot;
  320. }
  321. /*!
  322. * \brief Callback extract a unique identity from a snapshot message.
  323. *
  324. * This identity is unique to the underlying object of the snapshot, such as the
  325. * UniqueId field of a channel.
  326. *
  327. * \param message Message to extract id from.
  328. * \return String representing the snapshot's id.
  329. * \retval NULL if the message_type of the message isn't a handled snapshot.
  330. * \since 12
  331. */
  332. static const char *endpoint_snapshot_get_id(struct stasis_message *message)
  333. {
  334. struct ast_endpoint_snapshot *snapshot;
  335. if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
  336. return NULL;
  337. }
  338. snapshot = stasis_message_data(message);
  339. return snapshot->id;
  340. }
  341. struct ast_json *ast_endpoint_snapshot_to_json(
  342. const struct ast_endpoint_snapshot *snapshot,
  343. const struct stasis_message_sanitizer *sanitize)
  344. {
  345. struct ast_json *json;
  346. struct ast_json *channel_array;
  347. int i;
  348. json = ast_json_pack("{s: s, s: s, s: s, s: []}",
  349. "technology", snapshot->tech,
  350. "resource", snapshot->resource,
  351. "state", ast_endpoint_state_to_string(snapshot->state),
  352. "channel_ids");
  353. if (json == NULL) {
  354. return NULL;
  355. }
  356. if (snapshot->max_channels != -1) {
  357. int res = ast_json_object_set(json, "max_channels",
  358. ast_json_integer_create(snapshot->max_channels));
  359. if (res != 0) {
  360. ast_json_unref(json);
  361. return NULL;
  362. }
  363. }
  364. channel_array = ast_json_object_get(json, "channel_ids");
  365. ast_assert(channel_array != NULL);
  366. for (i = 0; i < snapshot->num_channels; ++i) {
  367. int res;
  368. if (sanitize && sanitize->channel_id
  369. && sanitize->channel_id(snapshot->channel_ids[i])) {
  370. continue;
  371. }
  372. res = ast_json_array_append(channel_array,
  373. ast_json_string_create(snapshot->channel_ids[i]));
  374. if (res != 0) {
  375. ast_json_unref(json);
  376. return NULL;
  377. }
  378. }
  379. return json;
  380. }
  381. static void endpoints_stasis_cleanup(void)
  382. {
  383. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
  384. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
  385. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_contact_state_type);
  386. ao2_cleanup(endpoint_cache_all);
  387. endpoint_cache_all = NULL;
  388. }
  389. int ast_endpoint_stasis_init(void)
  390. {
  391. int res = 0;
  392. ast_register_cleanup(endpoints_stasis_cleanup);
  393. endpoint_cache_all = stasis_cp_all_create("endpoint:all",
  394. endpoint_snapshot_get_id);
  395. if (!endpoint_cache_all) {
  396. return -1;
  397. }
  398. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
  399. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
  400. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_contact_state_type);
  401. return res;
  402. }