/* * Copyright (c) 2022 Rodrigo Peixoto * SPDX-License-Identifier: Apache-2.0 */ #include #include #include #include #include #include #include LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); #if defined(CONFIG_ZBUS_PRIORITY_BOOST) /* Available only when the priority boost is enabled */ static struct k_spinlock _zbus_chan_slock; #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ static struct k_spinlock obs_slock; #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC) NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE, sizeof(struct zbus_channel *), NULL); static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, k_timeout_t timeout) { return net_buf_alloc_len(pool, size, timeout); } #else NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool, (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE), (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE), sizeof(struct zbus_channel *), NULL); static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, k_timeout_t timeout) { __ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE, "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to " "%d", (int)size); return net_buf_alloc(pool, timeout); } #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */ #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ int _zbus_init(void) { const struct zbus_channel *curr = NULL; const struct zbus_channel *prev = NULL; STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) { curr = observation->chan; if (prev != curr) { if (prev == NULL) { curr->data->observers_start_idx = 0; curr->data->observers_end_idx = 0; } else { curr->data->observers_start_idx = prev->data->observers_end_idx; curr->data->observers_end_idx = prev->data->observers_end_idx; } prev = curr; } ++(curr->data->observers_end_idx); } return 0; } SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY); static inline int _zbus_notify_observer(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timepoint_t end_time, struct net_buf *buf) { switch (obs->type) { case ZBUS_OBSERVER_LISTENER_TYPE: { obs->callback(chan); break; } case ZBUS_OBSERVER_SUBSCRIBER_TYPE: { return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); } #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: { struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time)); if (cloned_buf == NULL) { return -ENOMEM; } k_fifo_put(obs->message_fifo, cloned_buf); break; } #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ default: _ZBUS_ASSERT(false, "Unreachable"); } return 0; } static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time) { int err = 0; int last_error = 0; struct net_buf *buf = NULL; /* Static observer event dispatcher logic */ struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) struct net_buf_pool *pool = COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION, (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool)); buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time)); _ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is " "unavailable or heap is full"); memcpy(net_buf_user_data(buf), &chan, sizeof(struct zbus_channel *)); net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan)); #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan)); int __maybe_unused index = 0; for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); const struct zbus_observer *obs = observation->obs; if (!obs->data->enabled || observation_mask->enabled) { continue; } err = _zbus_notify_observer(chan, obs, end_time, buf); if (err) { last_error = err; LOG_ERR("could not deliver notification to observer %s. Error code %d", _ZBUS_OBS_NAME(obs), err); if (err == -ENOMEM) { if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) { net_buf_unref(buf); } return err; } } LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs)); } #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) /* Dynamic observer event dispatcher logic */ struct zbus_observer_node *obs_nd, *tmp; SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { const struct zbus_observer *obs = obs_nd->obs; if (!obs->data->enabled) { continue; } err = _zbus_notify_observer(chan, obs, end_time, buf); if (err) { last_error = err; } } #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);)) return last_error; } #if defined(CONFIG_ZBUS_PRIORITY_BOOST) static inline void chan_update_hop(const struct zbus_channel *chan) { struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY; K_SPINLOCK(&_zbus_chan_slock) { const int limit = chan->data->observers_end_idx; for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); __ASSERT(observation != NULL, "observation must be not NULL"); const struct zbus_observer *obs = observation->obs; if (!obs->data->enabled || observation_mask->enabled) { continue; } if (chan_highest_observer_priority > obs->data->priority) { chan_highest_observer_priority = obs->data->priority; } } chan->data->highest_observer_priority = chan_highest_observer_priority; } } static inline void update_all_channels_hop(const struct zbus_observer *obs) { STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) { if (obs != observation->obs) { continue; } chan_update_hop(observation->chan); } } int zbus_obs_attach_to_thread(const struct zbus_observer *obs) { _ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR"); _ZBUS_ASSERT(obs != NULL, "obs is required"); int current_thread_priority = k_thread_priority_get(k_current_get()); K_SPINLOCK(&obs_slock) { if (obs->data->priority != current_thread_priority) { obs->data->priority = current_thread_priority; update_all_channels_hop(obs); } } return 0; } int zbus_obs_detach_from_thread(const struct zbus_observer *obs) { _ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR"); _ZBUS_ASSERT(obs != NULL, "obs is required"); K_SPINLOCK(&obs_slock) { obs->data->priority = ZBUS_MIN_THREAD_PRIORITY; update_all_channels_hop(obs); } return 0; } #else static inline void update_all_channels_hop(const struct zbus_observer *obs) { } #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio) { bool boosting = false; #if defined(CONFIG_ZBUS_PRIORITY_BOOST) if (!k_is_in_isr()) { *prio = k_thread_priority_get(k_current_get()); K_SPINLOCK(&_zbus_chan_slock) { if (*prio > chan->data->highest_observer_priority) { int new_prio = chan->data->highest_observer_priority - 1; new_prio = MAX(new_prio, 0); /* Elevating priority since the highest_observer_priority is * greater than the current thread */ k_thread_priority_set(k_current_get(), new_prio); boosting = true; } } } #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ int err = k_sem_take(&chan->data->sem, timeout); if (err) { /* When the priority boost is disabled, this IF will be optimized out. */ if (boosting) { /* Restoring thread priority since the semaphore is not available */ k_thread_priority_set(k_current_get(), *prio); } return err; } return 0; } static inline void chan_unlock(const struct zbus_channel *chan, int prio) { k_sem_give(&chan->data->sem); #if defined(CONFIG_ZBUS_PRIORITY_BOOST) /* During the unlock phase, with the priority boost enabled, the priority must be * restored to the original value in case it was elevated */ if (prio < ZBUS_MIN_THREAD_PRIORITY) { k_thread_priority_set(k_current_get(), prio); } #endif /* CONFIG_ZBUS_PRIORITY_BOOST */ } int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) { int err; _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); if (k_is_in_isr()) { timeout = K_NO_WAIT; } k_timepoint_t end_time = sys_timepoint_calc(timeout); if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { return -ENOMSG; } int context_priority = ZBUS_MIN_THREAD_PRIORITY; err = chan_lock(chan, timeout, &context_priority); if (err) { return err; } #if defined(CONFIG_ZBUS_CHANNEL_PUBLISH_STATS) chan->data->publish_timestamp = k_uptime_ticks(); chan->data->publish_count += 1; #endif /* CONFIG_ZBUS_CHANNEL_PUBLISH_STATS */ memcpy(chan->message, msg, chan->message_size); err = _zbus_vded_exec(chan, end_time); chan_unlock(chan, context_priority); return err; } int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) { _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); if (k_is_in_isr()) { timeout = K_NO_WAIT; } int err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; } memcpy(msg, chan->message, chan->message_size); k_sem_give(&chan->data->sem); return 0; } int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) { int err; _ZBUS_ASSERT(chan != NULL, "chan is required"); if (k_is_in_isr()) { timeout = K_NO_WAIT; } k_timepoint_t end_time = sys_timepoint_calc(timeout); int context_priority = ZBUS_MIN_THREAD_PRIORITY; err = chan_lock(chan, timeout, &context_priority); if (err) { return err; } err = _zbus_vded_exec(chan, end_time); chan_unlock(chan, context_priority); return err; } int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) { _ZBUS_ASSERT(chan != NULL, "chan is required"); if (k_is_in_isr()) { timeout = K_NO_WAIT; } int err = k_sem_take(&chan->data->sem, timeout); if (err) { return err; } return 0; } int zbus_chan_finish(const struct zbus_channel *chan) { _ZBUS_ASSERT(chan != NULL, "chan is required"); k_sem_give(&chan->data->sem); return 0; } int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout) { _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); return k_msgq_get(sub->queue, chan, timeout); } #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, k_timeout_t timeout) { _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, "sub must be a MSG_SUBSCRIBER"); _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout); if (buf == NULL) { return -ENOMSG; } *chan = *((struct zbus_channel **)net_buf_user_data(buf)); memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan)); net_buf_unref(buf); return 0; } #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, const struct zbus_channel *chan, bool masked) { _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); int err = -ESRCH; struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; K_SPINLOCK(&obs_slock) { for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); __ASSERT(observation != NULL, "observation must be not NULL"); if (observation->obs == obs) { if (observation_mask->enabled != masked) { observation_mask->enabled = masked; update_all_channels_hop(obs); } err = 0; K_SPINLOCK_BREAK; } } } return err; } int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, const struct zbus_channel *chan, bool *masked) { _ZBUS_ASSERT(obs != NULL, "obs is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); int err = -ESRCH; struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; K_SPINLOCK(&obs_slock) { const int limit = chan->data->observers_end_idx; for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); __ASSERT(observation != NULL, "observation must be not NULL"); if (observation->obs == obs) { *masked = observation_mask->enabled; err = 0; K_SPINLOCK_BREAK; } } } return err; } int zbus_obs_set_enable(const struct zbus_observer *obs, bool enabled) { _ZBUS_ASSERT(obs != NULL, "obs is required"); K_SPINLOCK(&obs_slock) { if (obs->data->enabled != enabled) { obs->data->enabled = enabled; update_all_channels_hop(obs); } } return 0; }