scheduling: add Data Processing scheduler type

The DP scheduler is a scheduler based on Zephyr preemptible
threads. It will start each SOF task as a separate Zephyr
thread.
At current implementation the scheduler can trigger each
task/thread periodically or on demand.
TODO: more sophisticated scheduling decisions, with deadline
and task budgets calculations.

Signed-off-by: Marcin Szkudlinski <marcin.szkudlinski@intel.com>
This commit is contained in:
Marcin Szkudlinski 2023-02-14 11:12:11 +01:00 committed by Liam Girdwood
parent 62e358231e
commit 3ee1d78738
6 changed files with 484 additions and 3 deletions

View File

@ -0,0 +1,94 @@
/* SPDX-License-Identifier: BSD-3-Clause */
/*
* Copyright(c) 2023 Intel Corporation. All rights reserved.
*
* Author: Marcin Szkudlinski
*/
#ifndef __SOF_SCHEDULE_DP_SCHEDULE_H__
#define __SOF_SCHEDULE_DP_SCHEDULE_H__
#include <rtos/task.h>
#include <sof/trace/trace.h>
#include <user/trace.h>
#include <stdint.h>
/**
*
* DP scheduler is a scheduler that creates a separate preemptible Zephyr thread for each SOF task
* There's only one instance of DP in the system, however, threads can be assigned and pinned
* to any core in the system for its execution, there's no SMP processing.
*
* The task execution may be delayed and task may be re-scheduled periodically
* NOTE: delayed start and rescheduling takes place in sync with LL scheduler, meaning the
* DP scheduler is triggered as the last task of LL running on a primary core.
* That implies a limitation: LL scheduler MUST be running on primary core in order to have
* this feature working.
* It is fine, because rescheduling is a feature used for data processing when a pipeline is
* running.
*
* Other possible usage of DP scheduler is to schedule task with DP_SCHEDULER_RUN_TASK_IMMEDIATELY
* as start parameter. It will force the task to work without any delays and async to LL.
* This kind of scheduling may be used for staring regular zephyr tasks using SOF api
*
* Task run() may return:
* SOF_TASK_STATE_RESCHEDULE - the task will be rescheduled as specified in scheduler period
* note that task won't ever be rescheduled if LL is not running
* SOF_TASK_STATE_COMPLETED - the task will be removed from scheduling,
* calling schedule_task will add the task to processing again
* task_complete() will be called
* SOF_TASK_STATE_CANCEL - the task will be removed from scheduling,
* calling schedule_task will add the task to processing again
* task_complete() won't be called
* other statuses - assert will go off
*
* NOTE: task - means a SOF task
* thread - means a Zephyr preemptible thread
*
* TODO - EDF:
* Threads run on the same priority, lower than thread running LL tasks. Zephyr EDF mechanism
* is used for decision which thread/task is to be scheduled next. The DP scheduler calculates
* the task deadline and set it in Zephyr thread properties, the final scheduling decision is made
* by Zephyr.
*
* Each time tick the scheduler iterates through the list of all active tasks and calculates
* a deadline based on
* - knowledge how the modules are bound
* - declared time required by a task to complete processing
* - the deadline of the last module
*
*/
/** \brief tell the scheduler to run the task immediately, even if LL tick is not yet running */
#define SCHEDULER_DP_RUN_TASK_IMMEDIATELY ((uint64_t)-1)
/**
* \brief Init the Data Processing scheduler
*/
int scheduler_dp_init(void);
/**
* \brief Set the Data Processing scheduler to be accessible at secondary cores
*/
int scheduler_dp_init_secondary_core(void);
/**
* \brief initialize a DP task and add it to scheduling
*
* \param[out] task pointer, pointer to allocated task structure will be return
* \param[in] uid pointer to UUID of the task
* \param[in] ops pointer to task functions
* \param[in] data pointer to the thread private data
* \param[in] core CPU the thread should run on
* \param[in] stack_size size of stack for a zephyr task
* \param[in] task_priority priority of the zephyr task
*/
int scheduler_dp_task_init(struct task **task,
const struct sof_uuid_entry *uid,
const struct task_ops *ops,
void *data,
uint16_t core,
size_t stack_size,
uint32_t task_priority);
#endif /* __SOF_SCHEDULE_DP_SCHEDULE_H__ */

View File

@ -31,6 +31,12 @@ enum {
SOF_SCHEDULE_LL_DMA, /**< Low latency DMA, schedules immediately
* on scheduling component's DMA interrupt
*/
SOF_SCHEDULE_DP, /**< DataProcessing scheduler
* Scheduler based on Zephyr peemptive threads
* TODO: DP will become the Zephyr EDF scheduler type
* and will be unified with SOF_SCHEDULE_EDF for Zephyr builds
* current implementation of Zephyr based EDF is depreciated now
*/
SOF_SCHEDULE_COUNT /**< indicates number of scheduler types */
};
@ -38,7 +44,6 @@ enum {
#define SOF_SCHEDULER_FREE_IRQ_ONLY BIT(0) /**< Free function disables only
* interrupts
*/
/**
* Scheduler operations.
*

View File

@ -26,6 +26,7 @@
#include <rtos/idc.h>
#include <sof/schedule/schedule.h>
#include <sof/schedule/edf_schedule.h>
#include <sof/schedule/dp_schedule.h>
#include <sof/schedule/ll_schedule.h>
#include <sof/schedule/ll_schedule_domain.h>
#include <ipc/trace.h>
@ -183,6 +184,12 @@ int secondary_core_init(struct sof *sof)
if (dma_domain)
scheduler_init_ll(dma_domain);
#if CONFIG_ZEPHYR_DP_SCHEDULER
err = scheduler_dp_init_secondary_core();
if (err < 0)
return err;
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */
/* initialize IDC mechanism */
trace_point(TRACE_BOOT_PLATFORM_IDC);
err = idc_init();

View File

@ -15,6 +15,7 @@
#include <sof/lib/mm_heap.h>
#include <sof/lib/watchdog.h>
#include <sof/schedule/edf_schedule.h>
#include <sof/schedule/dp_schedule.h>
#include <sof/schedule/ll_schedule.h>
#include <sof/schedule/ll_schedule_domain.h>
#include <sof/trace/trace.h>
@ -84,9 +85,17 @@ int platform_init(struct sof *sof)
trace_point(TRACE_BOOT_PLATFORM_SCHED);
scheduler_init_edf();
/* init low latency timer domain and scheduler */
/* init low latency timer domain and scheduler. Any failure is fatal */
sof->platform_timer_domain = zephyr_domain_init(PLATFORM_DEFAULT_CLOCK);
scheduler_init_ll(sof->platform_timer_domain);
ret = scheduler_init_ll(sof->platform_timer_domain);
if (ret < 0)
return ret;
#if CONFIG_ZEPHYR_DP_SCHEDULER
ret = scheduler_dp_init();
if (ret < 0)
return ret;
#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */
/* init the system agent */
trace_point(TRACE_BOOT_PLATFORM_AGENT);

View File

@ -0,0 +1,362 @@
// SPDX-License-Identifier: BSD-3-Clause
/*
* Copyright(c) 2023 Intel Corporation. All rights reserved.
*
* Author: Marcin Szkudlinski
*/
#include <sof/audio/component.h>
#include <rtos/task.h>
#include <stdint.h>
#include <sof/schedule/dp_schedule.h>
#include <sof/schedule/ll_schedule_domain.h>
#include <sof/trace/trace.h>
#include <rtos/wait.h>
#include <rtos/interrupt.h>
#include <zephyr/kernel.h>
#include <zephyr/sys_clock.h>
#include <sof/lib/notifier.h>
#include <zephyr/kernel/thread.h>
LOG_MODULE_REGISTER(dp_schedule, CONFIG_SOF_LOG_LEVEL);
/* 87858bc2-baa9-40b6-8e4c-2c95ba8b1545 */
DECLARE_SOF_UUID("dp-schedule", dp_sched_uuid, 0x87858bc2, 0xbaa9, 0x40b6,
0x8e, 0x4c, 0x2c, 0x95, 0xba, 0x8b, 0x15, 0x45);
DECLARE_TR_CTX(dp_tr, SOF_UUID(dp_sched_uuid), LOG_LEVEL_INFO);
struct scheduler_dp_data {
struct list_item tasks; /* list of active dp tasks */
struct k_spinlock lock; /* synchronization between cores */
};
struct task_dp_pdata {
k_tid_t thread_id; /* zephyr thread ID */
k_thread_stack_t __sparse_cache *p_stack; /* pointer to thread stack */
uint32_t ticks_period; /* period the task should be scheduled in LL ticks */
uint32_t ticks_to_trigger; /* number of ticks the task should be triggered after */
struct k_sem sem; /* semaphore for task scheduling */
};
/*
* there's only one instance of DP scheduler for all cores
* Keep pointer to it here
*/
static struct scheduler_dp_data *dp_sch;
static inline k_spinlock_key_t scheduler_dp_lock(void)
{
return k_spin_lock(&dp_sch->lock);
}
static inline void scheduler_dp_unlock(k_spinlock_key_t key)
{
k_spin_unlock(&dp_sch->lock, key);
}
/*
* function called after every LL tick
*
* TODO:
* the scheduler should here calculate deadlines of all task and tell Zephyr about them
* Currently there's an assumption that the task is always ready to run
*/
void scheduler_dp_ll_tick(void *receiver_data, enum notify_id event_type, void *caller_data)
{
(void)receiver_data;
(void)event_type;
(void)caller_data;
struct list_item *tlist;
struct task *curr_task;
struct task_dp_pdata *pdata;
k_spinlock_key_t lock_key;
if (cpu_get_id() != PLATFORM_PRIMARY_CORE_ID)
return;
if (!dp_sch)
return;
lock_key = scheduler_dp_lock();
list_for_item(tlist, &dp_sch->tasks) {
curr_task = container_of(tlist, struct task, list);
pdata = curr_task->priv_data;
if (pdata->ticks_to_trigger == 0) {
if (curr_task->state == SOF_TASK_STATE_QUEUED) {
/* set new trigger time, start the thread */
pdata->ticks_to_trigger = pdata->ticks_period;
curr_task->state = SOF_TASK_STATE_RUNNING;
k_sem_give(&pdata->sem);
}
} else {
if (curr_task->state == SOF_TASK_STATE_QUEUED ||
curr_task->state == SOF_TASK_STATE_RUNNING)
/* decrease num of ticks to re-schedule */
pdata->ticks_to_trigger--;
}
}
scheduler_dp_unlock(lock_key);
}
static int scheduler_dp_task_cancel(void *data, struct task *task)
{
(void)(data);
k_spinlock_key_t lock_key;
/* this is asyn cancel - mark the task as canceled and remove it from scheduling */
lock_key = scheduler_dp_lock();
task->state = SOF_TASK_STATE_CANCEL;
list_item_del(&task->list);
scheduler_dp_unlock(lock_key);
return 0;
}
static int scheduler_dp_task_free(void *data, struct task *task)
{
k_spinlock_key_t lock_key;
struct task_dp_pdata *pdata = task->priv_data;
/* abort the execution of the thread */
k_thread_abort(pdata->thread_id);
lock_key = scheduler_dp_lock();
list_item_del(&task->list);
task->priv_data = NULL;
task->state = SOF_TASK_STATE_FREE;
scheduler_dp_unlock(lock_key);
/* free task stack */
rfree(pdata->p_stack);
/* all other memory has been allocated as a single malloc, will be freed later by caller */
return 0;
}
/* Thread function called in component context, on target core */
static void dp_thread_fn(void *p1, void *p2, void *p3)
{
struct task *task = p1;
(void)p2;
(void)p3;
struct task_dp_pdata *task_pdata = task->priv_data;
k_spinlock_key_t lock_key;
enum task_state state;
while (1) {
/*
* the thread is started immediately after creation, it will stop on semaphore
* Semaphore will be released once the task is ready to process
*/
k_sem_take(&task_pdata->sem, K_FOREVER);
if (task->state == SOF_TASK_STATE_RUNNING)
state = task_run(task);
else
state = task->state; /* to avoid undefined variable warning */
lock_key = scheduler_dp_lock();
/*
* check if task is still running, may have been canceled by external call
* if not, set the state returned by run procedure
*/
if (task->state == SOF_TASK_STATE_RUNNING) {
task->state = state;
switch (state) {
case SOF_TASK_STATE_RESCHEDULE:
/* mark to reschedule, schedule time is already calculated */
task->state = SOF_TASK_STATE_QUEUED;
break;
case SOF_TASK_STATE_CANCEL:
case SOF_TASK_STATE_COMPLETED:
/* remove from scheduling */
list_item_del(&task->list);
break;
default:
/* illegal state, serious defect, won't happen */
k_panic();
}
}
/* call task_complete */
if (task->state == SOF_TASK_STATE_COMPLETED) {
/* call task_complete out of lock, it may eventually call schedule again */
scheduler_dp_unlock(lock_key);
task_complete(task);
} else {
scheduler_dp_unlock(lock_key);
}
};
/* never be here */
}
static int scheduler_dp_task_shedule(void *data, struct task *task, uint64_t start,
uint64_t period)
{
struct scheduler_dp_data *sch = data;
struct task_dp_pdata *pdata = task->priv_data;
k_spinlock_key_t lock_key;
lock_key = scheduler_dp_lock();
if (task->state != SOF_TASK_STATE_INIT &&
task->state != SOF_TASK_STATE_CANCEL &&
task->state != SOF_TASK_STATE_COMPLETED) {
scheduler_dp_unlock(lock_key);
return -EINVAL;
}
/* calculate period and start time in LL ticks */
pdata->ticks_period = period / LL_TIMER_PERIOD_US;
/* add a task to DP scheduler list */
list_item_prepend(&task->list, &sch->tasks);
if (start == SCHEDULER_DP_RUN_TASK_IMMEDIATELY) {
/* trigger the task immediately, don't wait for LL tick */
pdata->ticks_to_trigger = 0;
task->state = SOF_TASK_STATE_RUNNING;
k_sem_give(&pdata->sem);
} else {
/* wait for tick */
pdata->ticks_to_trigger = start / LL_TIMER_PERIOD_US;
task->state = SOF_TASK_STATE_QUEUED;
}
scheduler_dp_unlock(lock_key);
return 0;
}
static struct scheduler_ops schedule_dp_ops = {
.schedule_task = scheduler_dp_task_shedule,
.schedule_task_cancel = scheduler_dp_task_cancel,
.schedule_task_free = scheduler_dp_task_free,
};
int scheduler_dp_init_secondary_core(void)
{
if (!dp_sch)
return -ENOMEM;
/* register the scheduler instance for secondary core */
scheduler_init(SOF_SCHEDULE_DP, &schedule_dp_ops, dp_sch);
return 0;
}
int scheduler_dp_init(void)
{
dp_sch = rzalloc(SOF_MEM_ZONE_SYS_SHARED, 0, SOF_MEM_CAPS_RAM, sizeof(*dp_sch));
if (!dp_sch)
return -ENOMEM;
list_init(&dp_sch->tasks);
scheduler_init(SOF_SCHEDULE_DP, &schedule_dp_ops, dp_sch);
notifier_register(NULL, NULL, NOTIFIER_ID_LL_POST_RUN, scheduler_dp_ll_tick, 0);
return 0;
}
int scheduler_dp_task_init(struct task **task,
const struct sof_uuid_entry *uid,
const struct task_ops *ops,
void *data,
uint16_t core,
size_t stack_size,
uint32_t task_priority)
{
void *p_stack = NULL;
/* memory allocation helper structure */
struct {
struct task task;
struct task_dp_pdata pdata;
struct k_thread thread;
} *task_memory;
k_tid_t thread_id = NULL;
int ret;
/*
* allocate memory
* to avoid multiple malloc operations allocate all required memory as a single structure
* and return pointer to task_memory->task
*/
task_memory = rzalloc(SOF_MEM_ZONE_RUNTIME, 0, SOF_MEM_CAPS_RAM, sizeof(*task_memory));
if (!task_memory) {
tr_err(&dp_tr, "zephyr_dp_task_init(): memory alloc failed");
return -ENOMEM;
}
/* allocate stack - must be aligned so a separate alloc */
stack_size = Z_KERNEL_STACK_SIZE_ADJUST(stack_size);
p_stack = rballoc_align(0, SOF_MEM_CAPS_RAM, stack_size, Z_KERNEL_STACK_OBJ_ALIGN);
if (!p_stack) {
tr_err(&dp_tr, "zephyr_dp_task_init(): stack alloc failed");
ret = -ENOMEM;
goto err;
}
/* create a zephyr thread for the task */
thread_id = k_thread_create(&task_memory->thread, p_stack, stack_size, dp_thread_fn,
&task_memory->task, NULL, NULL, task_priority,
K_USER, K_FOREVER);
if (!thread_id) {
ret = -EFAULT;
tr_err(&dp_tr, "zephyr_dp_task_init(): zephyr thread create failed");
goto err;
}
/* pin the thread to specific core */
ret = k_thread_cpu_pin(thread_id, core);
if (ret < 0) {
ret = -EFAULT;
tr_err(&dp_tr, "zephyr_dp_task_init(): zephyr task pin to core failed");
goto err;
}
/* internal SOF task init */
ret = schedule_task_init(&task_memory->task, uid, SOF_SCHEDULE_DP, 0, ops->run,
data, core, 0);
if (ret < 0) {
tr_err(&dp_tr, "zephyr_dp_task_init(): schedule_task_init failed");
goto err;
}
/* initialize other task structures */
task_memory->task.ops.complete = ops->complete;
task_memory->task.ops.get_deadline = ops->get_deadline;
task_memory->task.state = SOF_TASK_STATE_INIT;
task_memory->task.core = core;
/* initialize semaprhore */
k_sem_init(&task_memory->pdata.sem, 0, 1);
/* success, fill the structures */
task_memory->task.priv_data = &task_memory->pdata;
task_memory->pdata.thread_id = thread_id;
task_memory->pdata.p_stack = p_stack;
*task = &task_memory->task;
/* start the thread - it will immediately stop at a semaphore */
k_thread_start(thread_id);
return 0;
err:
/* cleanup - free all allocated resources */
if (thread_id)
k_thread_abort(thread_id);
rfree(p_stack);
rfree(task_memory);
return ret;
}

View File

@ -220,6 +220,10 @@ if (CONFIG_ACE_VERSION_1_5)
${SOF_SRC_PATH}/schedule/zephyr_ll.c
)
zephyr_library_sources_ifdef(CONFIG_ZEPHYR_DP_SCHEDULER
${SOF_SRC_PATH}/schedule/zephyr_dp_schedule.c
)
# Sources for virtual heap management
zephyr_library_sources(
lib/regions_mm.c