Line data Source code
1 : /*!
2 : * \file
3 : * \author Deniz Armagan
4 : * \date 2019-03-27
5 : * \version 0.1
6 : *
7 : * \brief Universal JAPI Push Services library.
8 : *
9 : * \details
10 : * japi_pushsrv is a universal JSON API library.
11 : *
12 : * \copyright
13 : * Copyright (c) 2023 Fraunhofer IIS
14 : *
15 : * Permission is hereby granted, free of charge, to any person obtaining a copy
16 : * of this software and associated documentation files (the “Software”), to deal
17 : * in the Software without restriction, including without limitation the rights
18 : * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
19 : * copies of the Software, and to permit persons to whom the Software is
20 : * furnished to do so, subject to the following conditions:
21 : *
22 : * The above copyright notice and this permission notice shall be included in
23 : * all copies or substantial portions of the Software.
24 : *
25 : * THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
26 : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
27 : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
28 : * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
29 : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
30 : * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
31 : * THE SOFTWARE.
32 : */
33 :
34 : #include <assert.h>
35 : #include <json-c/json.h>
36 : #include <stdbool.h>
37 : #include <stdio.h>
38 : #include <string.h>
39 : #include <strings.h>
40 : #include <unistd.h>
41 :
42 : #include "japi_intern.h"
43 : #include "japi_pushsrv_intern.h"
44 : #include "japi_utils.h"
45 : #include "prntdbg.h"
46 :
47 : #include "rw_n.h"
48 :
49 : /*!
50 : * \brief Add client to push service
51 : *
52 : * Add client socket to given push service.
53 : *
54 : * \param socket Socket to add
55 : * \param pushsrv_name The name of the push service
56 : * \param psc JAPI push service context
57 : *
58 : * \returns On success, 0 is returned. On error, -1 if memory allocation failed.
59 : */
60 6 : static int japi_pushsrv_add_client(japi_pushsrv_context *psc, int socket)
61 : {
62 : japi_client *client;
63 :
64 : /* Error handling */
65 6 : assert(psc != NULL);
66 6 : assert(socket >= 0);
67 :
68 6 : client = (japi_client *)malloc(sizeof(japi_client));
69 6 : if (client == NULL) {
70 0 : perror("ERROR: malloc() failed\n");
71 0 : return -1;
72 : }
73 :
74 6 : pthread_mutex_lock(&(psc->lock));
75 6 : client->socket = socket;
76 6 : client->next = psc->clients;
77 6 : psc->clients = client;
78 6 : pthread_mutex_unlock(&(psc->lock));
79 :
80 6 : return 0;
81 : }
82 :
83 : /*
84 : * Remove the client socket for the respective push service
85 : */
86 5 : int japi_pushsrv_remove_client(japi_pushsrv_context *psc, int socket)
87 : {
88 : japi_client *client, *prev;
89 5 : int ret = -1;
90 :
91 : /* Error handling */
92 5 : assert(psc != NULL);
93 5 : assert(socket >= 0);
94 :
95 5 : client = psc->clients;
96 5 : prev = NULL;
97 :
98 : /* Remove socket from list */
99 18 : while (client != NULL) {
100 : /* If first element */
101 11 : if ((client->socket == socket) && (prev == NULL)) {
102 1 : psc->clients = client->next;
103 1 : prntdbg("removing client %d from pushsrv %s\n", client->socket,
104 : psc->pushsrv_name);
105 1 : free(client);
106 1 : ret = 0;
107 1 : break;
108 : }
109 : /* If last element */
110 10 : if ((client->socket == socket) && (client->next == NULL)) {
111 0 : prev->next = NULL;
112 0 : prntdbg("removing client %d from pushsrv %s\n", client->socket,
113 : psc->pushsrv_name);
114 0 : free(client);
115 0 : ret = 0;
116 0 : break;
117 : }
118 10 : if (client->socket == socket) {
119 2 : prev->next = client->next;
120 2 : prntdbg("removing client %d from pushsrv %s\n", client->socket,
121 : psc->pushsrv_name);
122 2 : free(client);
123 2 : ret = 0;
124 2 : break;
125 : }
126 :
127 8 : prev = client;
128 8 : client = client->next;
129 : }
130 :
131 5 : return ret;
132 : }
133 :
134 : /*
135 : * Removes clients from all push services
136 : */
137 4 : void japi_pushsrv_remove_client_from_all_pushsrv(japi_context *ctx, int socket)
138 : {
139 : japi_pushsrv_context *psc;
140 :
141 : /* Error handling */
142 4 : assert(ctx != NULL);
143 4 : assert(socket >= 0);
144 :
145 4 : prntdbg("removing client %i from all pushsrv\n", socket);
146 :
147 4 : psc = ctx->push_services;
148 8 : while (psc != NULL) {
149 0 : pthread_mutex_lock(&(psc->lock));
150 0 : japi_pushsrv_remove_client(psc, socket);
151 0 : pthread_mutex_unlock(&(psc->lock));
152 0 : psc = psc->next;
153 : }
154 4 : }
155 :
156 : /*
157 : * Saves client socket, if passed push service is registered
158 : */
159 10 : void japi_pushsrv_subscribe(japi_context *ctx, json_object *jreq, json_object *jresp)
160 : {
161 : japi_pushsrv_context *psc;
162 : json_object *jval;
163 : const char *pushsrv_name;
164 : int socket, ret;
165 :
166 : /* Error handling */
167 10 : assert(ctx != NULL);
168 10 : assert(jresp != NULL);
169 :
170 10 : psc = ctx->push_services;
171 :
172 : /* Get the push service name */
173 10 : if (!json_object_object_get_ex(jreq, "service", &jval) || jval == NULL) {
174 2 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
175 2 : json_object_object_add(jresp, "message",
176 : json_object_new_string("Push service not found."));
177 2 : return;
178 : }
179 8 : pushsrv_name = json_object_get_string(jval);
180 8 : ret = json_object_object_get_ex(jreq, "socket", &jval);
181 8 : socket = json_object_get_int(jval);
182 8 : if (!ret || socket < 0) {
183 1 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
184 1 : json_object_object_add(
185 : jresp, "message",
186 : json_object_new_string("Subscribing push service to non-existing socket"));
187 1 : return;
188 : }
189 :
190 : /* Search for push service in list and save socket, if found */
191 14 : while (psc != NULL) {
192 6 : if (strcasecmp(pushsrv_name, psc->pushsrv_name) == 0) {
193 6 : ret = japi_pushsrv_add_client(psc, socket);
194 6 : break;
195 : }
196 0 : psc = psc->next;
197 : }
198 :
199 7 : json_object_object_add(jresp, "service", json_object_new_string(pushsrv_name));
200 :
201 : /* Create JSON response object */
202 7 : if (psc == NULL || ret < 0) {
203 1 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
204 1 : json_object_object_add(jresp, "message",
205 : json_object_new_string("Push service not found."));
206 : } else {
207 6 : json_object_object_add(jresp, "success", json_object_new_boolean(true));
208 : }
209 : }
210 :
211 : /*
212 : * Removes client socket, if passed push service is registered
213 : */
214 8 : void japi_pushsrv_unsubscribe(japi_context *ctx, json_object *jreq, json_object *jresp)
215 : {
216 : japi_pushsrv_context *psc;
217 : json_object *jval;
218 : const char *pushsrv_name;
219 : int ret, socket;
220 :
221 : /* Error handling */
222 8 : assert(ctx != NULL);
223 8 : assert(jresp != NULL);
224 :
225 8 : psc = ctx->push_services;
226 8 : bool registered = false; /* Service registered? */
227 8 : bool unsubscribed = false; /* Service unsubscribed? */
228 :
229 : /* Get the push service name */
230 8 : if (!json_object_object_get_ex(jreq, "service", &jval) || jval == NULL) {
231 2 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
232 2 : json_object_object_add(jresp, "message",
233 : json_object_new_string("Push service not found."));
234 2 : return;
235 : }
236 6 : pushsrv_name = json_object_get_string(jval);
237 :
238 6 : ret = json_object_object_get_ex(jreq, "socket", &jval);
239 6 : socket = json_object_get_int(jval);
240 6 : if (!ret || socket < 0) {
241 0 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
242 0 : json_object_object_add(
243 : jresp, "message",
244 : json_object_new_string(
245 : "Unsubscribing push service from non-existing socket"));
246 0 : return;
247 : }
248 :
249 : /* Search for push service in list and remove socket, if found & socket is
250 : * registered */
251 15 : while (psc != NULL) {
252 6 : if (strcasecmp(pushsrv_name, psc->pushsrv_name) == 0) {
253 5 : registered = true;
254 5 : if (japi_pushsrv_remove_client(psc, socket) >= 0) {
255 3 : unsubscribed = true;
256 3 : break;
257 : }
258 : }
259 3 : psc = psc->next;
260 : }
261 :
262 6 : json_object_object_add(jresp, "service", json_object_new_string(pushsrv_name));
263 :
264 : /* Create JSON response object */
265 6 : if (registered && unsubscribed) { /* Subscribed */
266 3 : json_object_object_add(jresp, "success", json_object_new_boolean(true));
267 3 : } else if (registered && !unsubscribed) { /* Registered, but not subscribed */
268 2 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
269 2 : json_object_object_add(
270 : jresp, "message",
271 : json_object_new_string(
272 : "Can't unsubscribe a service that wasn't subscribed before."));
273 : } else { /* Not registered */
274 1 : json_object_object_add(jresp, "success", json_object_new_boolean(false));
275 1 : json_object_object_add(jresp, "message",
276 : json_object_new_string("Push service not found."));
277 : }
278 : }
279 :
280 : /* Check if there is a duplicate name for a request */
281 15 : static bool pushsrv_isredundant(japi_context *ctx, const char *pushsrv_name)
282 : {
283 : japi_pushsrv_context *psc;
284 : bool duplicate;
285 :
286 15 : duplicate = false;
287 15 : psc = ctx->push_services;
288 :
289 45 : while (psc != NULL) {
290 16 : if (strcmp(psc->pushsrv_name, pushsrv_name) == 0) {
291 1 : duplicate = true;
292 1 : break;
293 : }
294 15 : psc = psc->next;
295 : }
296 15 : return duplicate;
297 : }
298 :
299 : /* Free memory for duplicated push service name and element */
300 9 : static void free_pushsrv(japi_pushsrv_context *psc)
301 : {
302 9 : free(psc->pushsrv_name);
303 9 : free(psc);
304 9 : }
305 :
306 : /*
307 : * Registers push-service and returns pointer to that service object.
308 : */
309 18 : japi_pushsrv_context *japi_pushsrv_register(japi_context *ctx, const char *pushsrv_name)
310 : {
311 : japi_pushsrv_context *psc;
312 :
313 18 : if (ctx == NULL) {
314 1 : fprintf(stderr, "ERROR: JAPI context is NULL.\n");
315 1 : return NULL;
316 : }
317 :
318 17 : if ((pushsrv_name == NULL) || (strcmp(pushsrv_name, "") == 0)) {
319 2 : fprintf(stderr, "ERROR: Push service name is NULL or empty.\n");
320 2 : return NULL;
321 : }
322 :
323 15 : if (pushsrv_isredundant(ctx, pushsrv_name)) {
324 1 : fprintf(stderr, "ERROR: A push service called '%s' was already registered.\n",
325 : pushsrv_name);
326 1 : return NULL;
327 : }
328 :
329 : /* Reserve memory for japi_pushsrv_context struct */
330 14 : psc = (japi_pushsrv_context *)malloc(sizeof(japi_pushsrv_context));
331 14 : if (psc == NULL) {
332 0 : perror("ERROR: malloc() failed");
333 0 : return NULL;
334 : }
335 :
336 : /* Duplicate string in case that pushsrv_name comes from
337 : a namespace that is left after this function */
338 14 : psc->pushsrv_name = strdup(pushsrv_name);
339 14 : if (psc->pushsrv_name == NULL) {
340 0 : free(psc);
341 0 : return NULL;
342 : }
343 :
344 : /* Initialize struct */
345 14 : psc->thread_id = 0;
346 14 : psc->routine = NULL;
347 14 : psc->clients = NULL;
348 14 : psc->enabled = false;
349 14 : psc->userptr = ctx->userptr;
350 :
351 14 : if (pthread_mutex_init(&(psc->lock), NULL) != 0) {
352 0 : fprintf(stderr, "ERROR: mutex initialization has failed\n");
353 0 : return NULL;
354 : }
355 :
356 : /* Point to last struct */
357 14 : psc->next = ctx->push_services;
358 14 : ctx->push_services = psc;
359 :
360 14 : return psc;
361 : }
362 :
363 : /*
364 : * Remove push service context from japi context, unsubscribe for all clients and free
365 : * memory
366 : */
367 10 : int japi_pushsrv_destroy(japi_context *ctx, japi_pushsrv_context *psc)
368 : {
369 : japi_pushsrv_context *psc_iter, *psc_prev, *psc_next;
370 : japi_client *client, *client_next;
371 :
372 10 : assert(ctx != NULL);
373 :
374 10 : if (psc == NULL) {
375 1 : fprintf(stderr, "ERROR: push service context is NULL\n");
376 1 : return -1;
377 : }
378 :
379 : /* clean up linked list in ctx->push_service */
380 9 : psc_prev = NULL;
381 9 : psc_iter = ctx->push_services;
382 :
383 23 : while (psc_iter != NULL) {
384 14 : psc_next = psc_iter->next;
385 14 : if (psc_iter == psc) {
386 : /* If first element */
387 9 : if (psc_prev == NULL) {
388 7 : ctx->push_services = psc_next;
389 : } else {
390 2 : psc_prev->next = psc_next;
391 : }
392 9 : break;
393 : }
394 5 : psc_prev = psc_iter;
395 5 : psc_iter = psc_next;
396 : }
397 :
398 : /* Iterates through push service client list and frees memory for every element and
399 : * for the push service themself */
400 9 : client = psc->clients;
401 9 : pthread_mutex_lock(&(psc->lock));
402 18 : while (client != NULL) {
403 0 : client_next = client->next;
404 0 : japi_pushsrv_remove_client(psc, client->socket);
405 0 : client = client_next;
406 : }
407 9 : pthread_mutex_unlock(&(psc->lock));
408 :
409 9 : japi_pushsrv_stop(psc);
410 :
411 9 : pthread_mutex_destroy(&(psc->lock));
412 9 : free_pushsrv(psc);
413 :
414 9 : return 0;
415 : }
416 :
417 : /*
418 : * Provide the names of all registered push-services as a JAPI response.
419 : */
420 4 : void japi_pushsrv_list(japi_context *ctx, json_object *request, json_object *response)
421 : {
422 : japi_pushsrv_context *psc;
423 : json_object *jstring;
424 : json_object *jarray;
425 :
426 4 : assert(ctx != NULL);
427 4 : assert(response != NULL);
428 :
429 4 : jarray = json_object_new_array();
430 4 : psc = ctx->push_services;
431 :
432 : /* Iterate through push service list and return JSON object */
433 20 : while (psc != NULL) {
434 12 : jstring = json_object_new_string(psc->pushsrv_name); /* Create JSON-string */
435 12 : json_object_array_add(jarray, jstring); /* Add string to JSON array */
436 12 : psc = psc->next;
437 : }
438 :
439 : /* Add array to JSON-object */
440 4 : json_object_object_add(response, "services", jarray);
441 4 : }
442 :
443 : /*
444 : * Send message to all subscribed clients of a push service
445 : */
446 0 : int japi_pushsrv_sendmsg(japi_pushsrv_context *psc, json_object *jmsg_data)
447 : {
448 : char *msg;
449 : int ret;
450 : int success; /* number of successfull send messages */
451 : japi_client *client, *following_client;
452 : json_object *jmsg;
453 : json_object *jdata;
454 :
455 : /* Return -1 if there is no message to send */
456 0 : if (jmsg_data == NULL) {
457 0 : fprintf(stderr, "ERROR: Nothing to send.\n");
458 0 : return -1;
459 : }
460 :
461 : /* Return 0 if no client is subscribed */
462 0 : if (psc->clients == NULL) {
463 0 : return 0;
464 : }
465 :
466 0 : ret = 0;
467 0 : success = 0;
468 0 : jmsg = json_object_new_object();
469 0 : jdata = NULL;
470 :
471 0 : json_object_object_add(jmsg, "japi_pushsrv",
472 0 : json_object_new_string(psc->pushsrv_name));
473 0 : jdata = json_object_get(
474 : jmsg_data); // increment refcount before calling json_object_object_add as
475 : // jmesg_data may still be in use by the caller
476 0 : json_object_object_add(jmsg, "data", jdata);
477 :
478 0 : msg = japi_get_jobj_as_ndstr(jmsg);
479 0 : json_object_put(jmsg);
480 :
481 0 : pthread_mutex_lock(&(psc->lock));
482 0 : client = psc->clients;
483 :
484 0 : while (client != NULL) {
485 0 : prntdbg("pushsrv '%s': Sending message to client %d\n. Message: '%s'",
486 : psc->pushsrv_name, client->socket, msg);
487 0 : following_client = client->next; // Save pointer to next element
488 :
489 0 : ret = write_n(client->socket, msg, strlen(msg));
490 :
491 0 : if (ret <= 0) {
492 : /* If write failed print error and unsubscribe client */
493 0 : fprintf(stderr,
494 : "ERROR: Failed to send push service message to client %i (write "
495 : "returned %i)\n",
496 : client->socket, ret);
497 : /* Remove client from respective push service and free */
498 0 : japi_pushsrv_remove_client(psc, client->socket);
499 : } else {
500 0 : success++;
501 : }
502 0 : client = following_client;
503 : }
504 0 : pthread_mutex_unlock(&(psc->lock));
505 :
506 0 : free(msg);
507 :
508 0 : return success;
509 : }
510 :
511 : /*
512 : * Wrapper function that is executed by pthread_create and starts the desired push
513 : * service routine
514 : */
515 0 : static void *generic_pushsrv_runner(void *arg)
516 : {
517 : japi_pushsrv_context *psc;
518 : japi_pushsrv_routine routine;
519 :
520 0 : psc = (japi_pushsrv_context *)arg;
521 :
522 0 : assert(psc != NULL);
523 :
524 : /* Start user routine */
525 0 : routine = psc->routine;
526 0 : routine(psc);
527 :
528 0 : return NULL;
529 : }
530 :
531 : /*
532 : * Save thread infos and create thread
533 : */
534 0 : int japi_pushsrv_start(japi_pushsrv_context *psc, japi_pushsrv_routine routine)
535 : {
536 0 : if (psc == NULL) {
537 0 : fprintf(stderr,
538 : "ERROR: No push service context passed. Not starting thread.\n");
539 0 : return -1;
540 : }
541 :
542 0 : if (routine == NULL) {
543 0 : fprintf(stderr, "ERROR: No routine passed. Not starting thread.\n");
544 0 : return -2;
545 : }
546 :
547 0 : psc->enabled = true;
548 0 : psc->routine = routine;
549 :
550 0 : if (pthread_create(&(psc->thread_id), NULL, generic_pushsrv_runner, (void *)psc) !=
551 : 0) {
552 0 : fprintf(stderr, "ERROR: Error creating push service thread.\n");
553 0 : psc->enabled = false;
554 0 : return -3;
555 : }
556 :
557 0 : return 0;
558 : }
559 :
560 : /*
561 : * Stop pushsrv routine
562 : */
563 9 : int japi_pushsrv_stop(japi_pushsrv_context *psc)
564 : {
565 9 : if (psc == NULL) {
566 0 : fprintf(stderr, "ERROR: No push service context passed. Can't stop thread.\n");
567 0 : return -1;
568 : }
569 :
570 9 : if (psc->enabled == false) {
571 9 : fprintf(stderr, "ERROR: Thread not running.\n");
572 9 : return -2;
573 : }
574 :
575 : /* Tell routine to end */
576 0 : psc->enabled = false;
577 :
578 : /* Wait for thread to end and close it */
579 0 : if (pthread_join(psc->thread_id, NULL) != 0) {
580 0 : fprintf(stderr, "ERROR: Error joining push service routine '%s'\n",
581 : psc->pushsrv_name);
582 0 : return -3;
583 : }
584 :
585 0 : return 0;
586 : }
|