Skip to main content

osiris/
sched.rs

1//! This module provides access to the scheduler.
2
3mod dispch;
4pub mod rr;
5pub mod rt;
6pub mod task;
7pub mod thread;
8
9use core::{
10    ffi::c_void,
11    sync::atomic::{AtomicBool, Ordering},
12};
13
14use crate::hal::{self, Schedable};
15
16use crate::{
17    error::Result,
18    mem,
19    sync::{self, atomic::AtomicU64, spinlock::SpinLocked},
20    time::{self},
21    types::{
22        array::BitReclaimMap,
23        rbtree::RbTree,
24        traits::{Get, GetMut},
25        view::ViewMut,
26    },
27};
28
29type ThreadMap<const N: usize> = BitReclaimMap<thread::UId, thread::Thread, N>;
30type TaskMap<const N: usize> = BitReclaimMap<task::UId, task::Task, N>;
31
32pub(crate) const THREAD_COUNT: usize = 32;
33type GlobalScheduler = Scheduler<THREAD_COUNT>;
34
35static SCHED: SpinLocked<GlobalScheduler> = SpinLocked::new(GlobalScheduler::new());
36
37static DISABLED: AtomicBool = AtomicBool::new(true);
38static NEXT_TICK: AtomicU64 = AtomicU64::new(0);
39
40type WaiterView<'a, const N: usize> = ViewMut<'a, thread::UId, thread::Waiter, ThreadMap<N>>;
41
42pub struct Scheduler<const N: usize> {
43    threads: ThreadMap<N>,
44    tasks: TaskMap<N>,
45
46    rt_scheduler: rt::Scheduler<N>,
47    rr_scheduler: rr::Scheduler<N>,
48
49    wakeup: RbTree<thread::WakupTree, thread::UId>,
50
51    current: Option<thread::UId>,
52    last_tick: u64,
53}
54
55// Safety: The scheduler is not Copy or Clone.
56// The scheduler owns all its data exclusively.
57unsafe impl<const N: usize> Send for Scheduler<N> {}
58// Safety: The scheduler does only allow access to its data through &mut self, which is synchronized by the SCHED spinlock.
59unsafe impl<const N: usize> Sync for Scheduler<N> {}
60
61/// We define kill as a macro in order to avoid borrow checker issues.
62macro_rules! kill {
63    ($self:expr, $uid:expr) => {{
64        let _ = rt::ServerView::<N>::with(&mut $self.threads, |view| {
65            $self.rt_scheduler.dequeue($uid, view)
66        });
67        let _ = $self.rr_scheduler.dequeue($uid, &mut $self.threads);
68        let _ = $self
69            .wakeup
70            .remove($uid, &mut WaiterView::<N>::new(&mut $self.threads));
71        if let Some(thread) = $self.threads.get_mut($uid) {
72            thread.resume();
73        }
74        Ok::<(), crate::error::Error>(())
75    }};
76}
77
78impl<const N: usize> Scheduler<N> {
79    const fn new() -> Self {
80        Self {
81            threads: ThreadMap::new(),
82            tasks: TaskMap::new(),
83            rt_scheduler: rt::Scheduler::new(),
84            rr_scheduler: rr::Scheduler::new(),
85            wakeup: RbTree::new(),
86            current: None,
87            last_tick: 0,
88        }
89    }
90
91    fn land(&mut self, ctx: *mut c_void) {
92        if let Some(current) = self.current {
93            let mut kill = None;
94            if let Some(thread) = self.threads.get_mut(current) {
95                if thread.save_ctx(ctx).is_err() {
96                    warn!(
97                        "failed to save context (SP: {:x}) of thread {}.",
98                        ctx as usize, current
99                    );
100                    kill = Some(thread.task_id());
101                }
102            } else {
103                bug!("failed to land thread {}. Does not exist.", current);
104            }
105
106            if let Some(task_id) = kill {
107                if self.kill_by_task(task_id).is_err() {
108                    // Should not be possible. The thread exists, so the task must exist.
109                    bug!("failed to kill task {}", task_id);
110                }
111            }
112        }
113    }
114
115    /// Triggers a reschedule at *latest* when we hit timepoint `next`.
116    /// Note that we may reschedule earlier than `next` if another thread wakes up or is enqueued, but we will never reschedule later than `next`.
117    ///
118    /// `now` - The current timepoint, in ticks.
119    /// `next` - The next timepoint to reschedule at, in ticks.
120    fn next_resched(now: u64, next: u64) {
121        let old = NEXT_TICK.load(Ordering::Acquire);
122
123        if old > now && old <= next {
124            return;
125        }
126
127        NEXT_TICK.store(next, Ordering::Release);
128    }
129
130    /// Enqueues a thread into the scheduler. This will trigger a reschedule.
131    ///
132    /// `uid` - The UID of the thread to enqueue.
133    /// `now` - The current timepoint, in ticks. This is used for RT threads to calculate their deadlines.
134    ///
135    /// Returns an error if the thread does not exist.
136    pub fn enqueue(&mut self, now: u64, uid: thread::UId) -> Result<()> {
137        let thread = self.threads.get(uid).ok_or(kerr!(EINVAL))?;
138
139        if thread.rt_server().is_some() {
140            let mut view = rt::ServerView::<N>::new(&mut self.threads);
141            self.rt_scheduler.enqueue(uid, now, &mut view)?;
142        } else {
143            if self.rr_scheduler.enqueue(uid, &mut self.threads).is_err() {
144                // This should not be possible.
145                // - Thread is in the thread list.
146                // - Thread is not linked into a different list.
147                bug!("failed to enqueue thread {} into RR scheduler.", uid);
148            }
149        }
150        reschedule();
151        Ok(())
152    }
153
154    fn do_wakeups(&mut self, now: u64) {
155        while let Some(uid) = self.wakeup.min() {
156            let mut stop = false;
157            WaiterView::<N>::with(&mut self.threads, |view| {
158                if let Some(waiter) = view.get(uid) {
159                    if waiter.until() > now {
160                        Self::next_resched(now, waiter.until());
161                        stop = true;
162                        return;
163                    }
164
165                    if let Err(_) = self.wakeup.remove(uid, view) {
166                        bug!("failed to remove thread {} from wakeup tree.", uid);
167                    }
168                } else {
169                    bug!("failed to get thread {} from wakeup tree.", uid);
170                }
171            });
172
173            if stop {
174                break;
175            }
176
177            if let Some(thread) = self.threads.get_mut(uid) {
178                thread.resume();
179            } else {
180                // This should not be possible. The thread is in the wakeup tree, so it must exist.
181                bug!("failed to wake thread {}. Does not exist.", uid);
182            }
183
184            if self.enqueue(now, uid).is_err() {
185                bug!("failed to enqueue thread {} after wakeup.", uid);
186            }
187        }
188    }
189
190    /// Syncs the scheduler state at the beginning of a reschedule.
191    fn sync_to_sched(&mut self, now: u64) {
192        let dt = now - self.last_tick;
193        self.last_tick = now;
194
195        if let Some(old) = self.current {
196            let throttle = rt::ServerView::<N>::with(&mut self.threads, |view| {
197                self.rt_scheduler.put(old, dt, view)
198            });
199
200            if let Some(throttle) = throttle {
201                if throttle <= now {
202                    rt::ServerView::<N>::with(&mut self.threads, |view| {
203                        let _ = self.rt_scheduler.dequeue(old, view);
204                        let _ = self.rt_scheduler.enqueue(old, now, view);
205                    });
206                } else {
207                    // This ensures that sleep_until will not trigger a reschedule.
208                    self.current = None;
209                    let _ = self.sleep_until(Some(old), throttle, now);
210                    self.current = Some(old);
211                }
212            } else {
213                self.rr_scheduler.put(old, dt as u32);
214            }
215        }
216
217        self.do_wakeups(now);
218    }
219
220    fn select_next(&mut self) -> (thread::UId, u32) {
221        rt::ServerView::<N>::with(&mut self.threads, |view| self.rt_scheduler.pick(view))
222            .or_else(|| self.rr_scheduler.pick(&mut self.threads))
223            .unwrap_or((thread::IDLE_THREAD, 1000))
224    }
225
226    /// Picks the next thread to run and returns its context and task. This should only be called by sched_enter after land.
227    fn do_sched(&mut self, now: u64) -> Option<(*mut c_void, &mut task::Task)> {
228        // Sync the new state to the scheduler.
229        self.sync_to_sched(now);
230
231        // Pick the next thread to run.
232        let (new, budget) = self.select_next();
233
234        // At this point, the task/thread must exist. Everything else is a bug.
235        let Some(thread) = self.threads.get(new) else {
236            bug!("failed to pick thread {}. Does not exist.", new);
237        };
238        let (ctx, task_id) = (thread.ctx(), thread.task_id());
239
240        let Some(task) = self.tasks.get_mut(task_id) else {
241            bug!("failed to get task {}. Does not exist.", task_id);
242        };
243
244        // We don't need to resched if the thread has budget.
245        self.current = Some(new);
246        Self::next_resched(now, now.saturating_add(budget as u64));
247        Some((ctx, task))
248    }
249
250    /// Puts a thread to sleep until the specified timepoint. This will trigger a reschedule if the thread is currently running.
251    ///
252    /// `uid` - The UID of the thread to put to sleep, or None to put the current thread to sleep.
253    /// `until` - The timepoint to sleep until, in ticks. This is an absolute time, not a relative time.
254    /// `now` - The current timepoint, in ticks.
255    ///
256    /// Returns an error if there is no current thread, it is not enqueued, or if the specified timepoint is in the past.
257    pub fn sleep_until(&mut self, uid: Option<thread::UId>, until: u64, now: u64) -> Result<()> {
258        if until <= now {
259            return Ok(());
260        }
261        let uid = match uid {
262            Some(uid) => uid,
263            None => self.current.ok_or(kerr!(EINVAL))?,
264        };
265        // Make the thread not runnable. Triggers a reschedule if the thread is currently running.
266        // If it fails, it means the thread was not enqueued, which is fine.
267        let _ = self.dequeue(uid);
268
269        // Check if the thread is already sleeping.
270        let already = match self.threads.get_mut(uid) {
271            Some(thread) if thread.is_waiting() => true,
272            Some(_) => false,
273            None => return Err(kerr!(EINVAL)),
274        };
275
276        // If the thread already sleeps, remove it from the wakeup tree.
277        if already {
278            WaiterView::with(&mut self.threads, |view| self.wakeup.remove(uid, view))?;
279        }
280
281        // Put the thread to sleep until the specified timepoint.
282        if let Some(thread) = self.threads.get_mut(uid) {
283            thread.wait(until);
284        } else {
285            // This should not be possible. The thread was just checked to exist.
286            bug!("failed to set thread {} to waiting. Does not exist.", uid);
287        }
288
289        // Insert the thread into the wakeup tree.
290        let res = WaiterView::with(&mut self.threads, |view| self.wakeup.insert(uid, view));
291
292        if res.is_err() {
293            // This should not be possible. The thread was just checked to exist.
294            bug!("failed to insert thread {} into wakeup tree.", uid);
295        }
296        Ok(())
297    }
298
299    /// `kick` lookup by raw `UId::as_usize()`. Synthetic `tid` is a placeholder.
300    pub fn kick_by_uid(&mut self, uid: usize) -> Result<()> {
301        let lookup_uid = thread::UId::new(uid, thread::Id::new(0, crate::sched::task::UId::new(0)));
302        self.kick(lookup_uid)
303    }
304
305    pub fn current_uid(&self) -> Option<usize> {
306        self.current.map(|uid| uid.as_usize())
307    }
308
309    /// If the thread is currently sleeping, this will trigger a wakeup on the immediately following reschedule.
310    ///
311    /// Returns an error if the thread does not exist, or if the thread is not currently sleeping.
312    pub fn kick(&mut self, uid: thread::UId) -> Result<()> {
313        let now = time::tick();
314        let res = WaiterView::with(&mut self.threads, |view| self.wakeup.remove(uid, view));
315
316        if let Some(thread) = self.threads.get_mut(uid) {
317            thread.resume();
318        } else {
319            return Err(kerr!(EINVAL)); // Thread does not exist.
320        }
321
322        if res.is_ok() {
323            self.enqueue(now, uid)?;
324        }
325        Ok(())
326    }
327
328    /// This will make the thread not runnable, but it will not remove it from other lists.
329    /// If the thread is currently running, reschedule will be triggered.
330    ///
331    /// Returns an error if the thread does not exist, or if the thread is not currently enqueued in any scheduler.
332    pub fn dequeue(&mut self, uid: thread::UId) -> Result<()> {
333        rt::ServerView::<N>::with(&mut self.threads, |view| {
334            self.rt_scheduler.dequeue(uid, view)
335        })
336        .or_else(|_| self.rr_scheduler.dequeue(uid, &mut self.threads))?;
337
338        if Some(uid) == self.current {
339            reschedule();
340        }
341        Ok(())
342    }
343
344    pub fn create_task(&mut self, attrs: task::Attributes) -> Result<task::UId> {
345        let task_id = self.tasks.insert_with(|idx| {
346            let task = task::Task::new(task::UId::new(idx), attrs);
347            task.map(|t| (task::UId::new(idx), t))
348        })?;
349
350        #[cfg(any(feature = "metrics", metrics))]
351        if let Some(task) = self.tasks.get(task_id) {
352            crate::metrics::store::write_task_heap(
353                task_id.as_usize(),
354                task.allocator_metrics().into(),
355            );
356        }
357
358        Ok(task_id)
359    }
360
361    /// Dequeues all threads of the task and removes the task. If the current thread belongs to the task, reschedule will be triggered.
362    ///
363    /// If the task does not exist, an error will be returned.
364    pub fn kill_by_task(&mut self, uid: task::UId) -> Result<()> {
365        let task = self.tasks.get_mut(uid).ok_or(kerr!(EINVAL))?;
366
367        while let Some(id) = task.threads().head() {
368            kill!(self, id)?;
369
370            if task.threads_mut().remove(id, &mut self.threads).is_err() {
371                // This should not be possible. The thread ID is from the thread list of the task, so it must exist.
372                bug!("failed to remove thread {} from task {}.", id, uid);
373            }
374
375            if self.threads.remove(&id).is_none() {
376                // This should not be possible. The thread ID is from the thread list of the task, so it must exist.
377                bug!("failed to remove thread {} from thread list.", id);
378            }
379
380            #[cfg(any(feature = "metrics", metrics))]
381            crate::metrics::store::clear_thread_stack(id.as_usize());
382
383            if Some(id) == self.current {
384                self.current = None;
385                reschedule();
386            }
387        }
388
389        self.tasks.remove(&uid).ok_or(kerr!(EINVAL))?;
390
391        #[cfg(any(feature = "metrics", metrics))]
392        crate::metrics::store::clear_task_heap(uid.as_usize());
393
394        Ok(())
395    }
396
397    pub fn create_thread(
398        &mut self,
399        task: Option<task::UId>,
400        attrs: &thread::Attributes,
401    ) -> Result<thread::UId> {
402        let task = match task {
403            Some(t) => t,
404            None => self.current.ok_or(kerr!(EINVAL))?.owner(),
405        };
406        let task = self.tasks.get_mut(task).ok_or(kerr!(EINVAL))?;
407
408        let uid = self
409            .threads
410            .insert_with(|idx| {
411                let uid = task.allocate_tid().get_uid(idx);
412                let stack = task.allocate_stack(attrs)?;
413                let thread = thread::Thread::new(uid, stack, attrs.attrs);
414                Ok((uid, thread))
415            })
416            .and_then(|k| {
417                task.register_thread(k, &mut self.threads)?;
418                Ok(k)
419            })?;
420
421        #[cfg(any(feature = "metrics", metrics))]
422        if let Some(thread) = self.threads.get(uid) {
423            crate::metrics::store::write_thread_stack(
424                uid.as_usize(),
425                thread.stack_metrics().into(),
426            );
427        }
428
429        Ok(uid)
430    }
431
432    /// Dequeues a thread and removes it from its corresponding task. If the thread is currently running, reschedule will be triggered.
433    ///
434    /// `uid` - The UID of the thread to kill, or None to kill the current thread.
435    ///
436    /// If the thread does not exist, or if `uid` is None and there is no current thread, an error will be returned.
437    pub fn kill_by_thread(&mut self, uid: Option<thread::UId>) -> Result<()> {
438        let uid = match uid {
439            Some(uid) => uid,
440            None => self.current.ok_or(kerr!(EINVAL))?,
441        };
442        kill!(self, uid)?;
443
444        self.tasks
445            .get_mut(uid.tid().owner())
446            .ok_or(kerr!(EINVAL))?
447            .threads_mut()
448            .remove(uid, &mut self.threads)?;
449
450        self.threads.remove(&uid).ok_or(kerr!(EINVAL))?;
451
452        #[cfg(any(feature = "metrics", metrics))]
453        crate::metrics::store::clear_thread_stack(uid.as_usize());
454
455        if Some(uid) == self.current {
456            self.current = None;
457            reschedule();
458        }
459        Ok(())
460    }
461
462    /// Updates the lock-free mirror for the currently scheduled thread and its task.
463    /// Called on every reschedule; only the thread that just ran needs updating.
464    #[cfg(any(feature = "metrics", metrics))]
465    fn mirror_stats(&self) {
466        use crate::metrics::store;
467
468        store::write_global_heap(crate::mem::global_metrics().into());
469
470        if let Some(uid) = self.current {
471            if let Some(thread) = self.threads.get(uid) {
472                store::write_thread_stack(uid.as_usize(), thread.stack_metrics().into());
473
474                let task_id = thread.task_id();
475                if let Some(task) = self.tasks.get(task_id) {
476                    store::write_task_heap(task_id.as_usize(), task.allocator_metrics().into());
477                }
478            }
479        }
480    }
481}
482
483/// This function provides safe access to the global scheduler.
484/// It disables interrupts and locks the scheduler. Use with caution!
485pub fn with<T, F: FnOnce(&mut GlobalScheduler) -> T>(f: F) -> T {
486    // Must mask *all* interrupts: the ISR-callable `kick_thread` re-enters
487    // `with`, so any priority-selective mask would deadlock if an ISR
488    // preempted a holder.
489    sync::atomic::irq_free(|| {
490        let mut sched = SCHED.lock();
491        f(&mut sched)
492    })
493}
494
495/// Initializes the scheduler. This should be called once during kernel initialization, before any threads are created.
496///
497/// `kaddr_space` - The address space of the kernel task. This is used to create the kernel task, which is required for the scheduler to function.
498///
499/// If the kernel task cannot be created, this function will panic. Note that the kernel task is essential for the system to function, so we cannot continue without it.
500pub fn init(kaddr_space: mem::vmm::AddressSpace) {
501    with(|sched| {
502        let attrs = task::Attributes {
503            resrv_pgs: None,
504            address_space: Some(kaddr_space),
505        };
506
507        sched.create_task(attrs).unwrap_or_else(|e| {
508            panic!("failed to create kernel task: {}", e);
509        });
510    })
511}
512
513/// This should be called on each timer tick, and if it returns true, sched_enter should be called to reschedule.
514///
515/// `now` - The current timepoint, in ticks.
516pub fn needs_reschedule(now: u64) -> bool {
517    if DISABLED.load(Ordering::Acquire) {
518        return false;
519    }
520
521    now >= NEXT_TICK.load(Ordering::Acquire)
522}
523
524/// This will disable rescheduling until the next call to enable. Use with caution!
525#[inline]
526#[allow(dead_code)]
527pub fn disable() {
528    DISABLED.store(true, Ordering::Release);
529}
530
531#[inline]
532pub fn enable() {
533    DISABLED.store(false, Ordering::Release);
534}
535
536/// Triggers a reschedule immediately, when interrupts are enabled.
537/// This must be called after enqueueing a thread, or after waking up a thread, or putting the current thread to sleep.
538pub fn reschedule() {
539    if DISABLED.load(Ordering::Acquire) {
540        return;
541    }
542
543    hal::Machine::trigger_reschedule();
544}
545
546/// Wake a thread by raw `uid`. C-FFI so ISR-context callers can use it
547/// without going through the syscall path. Errors are swallowed:
548/// not-yet-sleeping is normal.
549#[unsafe(no_mangle)]
550pub extern "C" fn kick_thread(uid: u32) {
551    with(|sched| {
552        let _ = sched.kick_by_uid(uid as usize);
553    });
554    reschedule();
555}
556
557/// This will be called by the architecture-specific code to enter the scheduler. It will land the current thread, pick the next thread to run, and return its context and task.
558#[unsafe(no_mangle)]
559pub extern "C" fn sched_enter(mut ctx: *mut c_void) -> *mut c_void {
560    with(|sched| {
561        let old = sched.current.map(|c| c.owner());
562        sched.land(ctx);
563
564        // Mirror stats while self.current still points to the outgoing thread —
565        // its stack context was just saved by land() and its task reflects any
566        // allocations made since the last reschedule.
567        #[cfg(any(feature = "metrics", metrics))]
568        sched.mirror_stats();
569
570        if let Some((new, task)) = sched.do_sched(time::tick()) {
571            if old != Some(task.id) {
572                dispch::prepare(task);
573            }
574            ctx = new;
575        }
576
577        ctx
578    })
579}
580
581extern "C" fn thread_finalizer() -> ! {
582    with(|sched| {
583        if sched.kill_by_thread(None).is_err() {
584            bug!("failed to terminate returned thread.");
585        }
586    });
587    loop {
588        hal::asm::nop!();
589    }
590}