osiris/
sched.rs

1//! This module provides access to the scheduler.
2
3mod dispch;
4pub mod rr;
5pub mod rt;
6pub mod task;
7pub mod thread;
8
9use core::{
10    ffi::c_void,
11    sync::atomic::{AtomicBool, Ordering},
12};
13
14use hal::Schedable;
15
16use crate::{
17    error::Result,
18    mem,
19    sched::thread::Waiter,
20    sync::{self, atomic::AtomicU64, spinlock::SpinLocked},
21    time::{self},
22    types::{
23        array::BitReclaimMap,
24        rbtree::RbTree,
25        traits::{Get, GetMut},
26        view::ViewMut,
27    },
28};
29
30type ThreadMap<const N: usize> = BitReclaimMap<thread::UId, thread::Thread, N>;
31type TaskMap<const N: usize> = BitReclaimMap<task::UId, task::Task, N>;
32
33type GlobalScheduler = Scheduler<32>;
34
35static SCHED: SpinLocked<GlobalScheduler> = SpinLocked::new(GlobalScheduler::new());
36
37static DISABLED: AtomicBool = AtomicBool::new(true);
38static NEXT_TICK: AtomicU64 = AtomicU64::new(0);
39
40type WaiterView<'a, const N: usize> = ViewMut<'a, thread::UId, thread::Waiter, ThreadMap<N>>;
41
42pub struct Scheduler<const N: usize> {
43    threads: ThreadMap<N>,
44    tasks: TaskMap<N>,
45
46    rt_scheduler: rt::Scheduler<N>,
47    rr_scheduler: rr::Scheduler<N>,
48
49    wakeup: RbTree<thread::WakupTree, thread::UId>,
50
51    current: Option<thread::UId>,
52    last_tick: u64,
53}
54
55/// We define dequeue as a macro in order to avoid borrow checker issues.
56macro_rules! dequeue {
57    ($self:expr, $uid:expr) => {
58        rt::ServerView::<N>::with(&mut $self.threads, |view| {
59            $self.rt_scheduler.dequeue($uid, view)
60        })
61        .or_else(|_| $self.rr_scheduler.dequeue($uid, &mut $self.threads))
62        .or_else(|_| {
63            $self
64                .wakeup
65                .remove($uid, &mut WaiterView::<N>::new(&mut $self.threads))
66        })
67    };
68}
69
70impl<const N: usize> Scheduler<N> {
71    const fn new() -> Self {
72        Self {
73            threads: ThreadMap::new(),
74            tasks: TaskMap::new(),
75            rt_scheduler: rt::Scheduler::new(),
76            rr_scheduler: rr::Scheduler::new(),
77            wakeup: RbTree::new(),
78            current: None,
79            last_tick: 0,
80        }
81    }
82
83    fn land(&mut self, ctx: *mut c_void) {
84        if let Some(current) = self.current {
85            let mut kill = None;
86            if let Some(thread) = self.threads.get_mut(current) {
87                if thread.save_ctx(ctx).is_err() {
88                    warn!(
89                        "failed to save context (SP: {:x}) of thread {}.",
90                        ctx as usize, current
91                    );
92                    kill = Some(thread.task_id());
93                }
94            } else {
95                bug!("failed to land thread {}. Does not exist.", current);
96            }
97
98            if let Some(task_id) = kill {
99                if self.kill_by_task(task_id).is_err() {
100                    // Should not be possible. The thread exists, so the task must exist.
101                    bug!("failed to kill task {}", task_id);
102                }
103            }
104        }
105    }
106
107    /// Triggers a reschedule at *latest* when we hit timepoint `next`.
108    /// Note that we may reschedule earlier than `next` if another thread wakes up or is enqueued, but we will never reschedule later than `next`.
109    ///
110    /// `now` - The current timepoint, in ticks.
111    /// `next` - The next timepoint to reschedule at, in ticks.
112    fn next_resched(now: u64, next: u64) {
113        let old = NEXT_TICK.load(Ordering::Acquire);
114
115        if old > now && old <= next {
116            return;
117        }
118
119        NEXT_TICK.store(next, Ordering::Release);
120    }
121
122    /// Enqueues a thread into the scheduler. This will trigger a reschedule.
123    ///
124    /// `uid` - The UID of the thread to enqueue.
125    /// `now` - The current timepoint, in ticks. This is used for RT threads to calculate their deadlines.
126    ///
127    /// Returns an error if the thread does not exist.
128    pub fn enqueue(&mut self, now: u64, uid: thread::UId) -> Result<()> {
129        let thread = self.threads.get(uid).ok_or(kerr!(InvalidArgument))?;
130
131        if thread.rt_server().is_some() {
132            let mut view = rt::ServerView::<N>::new(&mut self.threads);
133            self.rt_scheduler.enqueue(uid, now, &mut view)?;
134        } else {
135            if self.rr_scheduler.enqueue(uid, &mut self.threads).is_err() {
136                // This should not be possible.
137                // - Thread is in the thread list.
138                // - Thread is not linked into a different list.
139                bug!("failed to enqueue thread {} into RR scheduler.", uid);
140            }
141        }
142        reschedule();
143        Ok(())
144    }
145
146    fn do_wakeups(&mut self, now: u64) {
147        while let Some(uid) = self.wakeup.min() {
148            let mut done = false;
149            WaiterView::<N>::with(&mut self.threads, |view| {
150                if let Some(waiter) = view.get(uid) {
151                    if waiter.until() > now {
152                        Self::next_resched(now, waiter.until());
153                        done = true;
154                        return;
155                    }
156
157                    if let Err(_) = self.wakeup.remove(uid, view) {
158                        bug!("failed to remove thread {} from wakeup tree.", uid);
159                    }
160                } else {
161                    bug!("failed to get thread {} from wakeup tree.", uid);
162                }
163            });
164
165            if done {
166                break;
167            }
168
169            if self.enqueue(now, uid).is_err() {
170                bug!("failed to enqueue thread {} after wakeup.", uid);
171            }
172        }
173    }
174
175    /// Syncs the new state after the last do_sched call to the scheduler, and returns whether we need to immediately reschedule.
176    fn sync_to_sched(&mut self, now: u64) -> bool {
177        let dt = now - self.last_tick;
178        self.last_tick = now;
179
180        if let Some(old) = self.current {
181            let throttle = rt::ServerView::<N>::with(&mut self.threads, |view| {
182                self.rt_scheduler.put(old, dt, view)
183            });
184
185            if let Some(throttle) = throttle {
186                let _ = self.sleep_until(throttle, now);
187                return true;
188            }
189
190            self.rr_scheduler.put(old, dt as u32);
191        }
192
193        self.do_wakeups(now);
194        false
195    }
196
197    fn select_next(&mut self) -> (thread::UId, u32) {
198        rt::ServerView::<N>::with(&mut self.threads, |view| self.rt_scheduler.pick(view))
199            .or_else(|| self.rr_scheduler.pick(&mut self.threads))
200            .unwrap_or((thread::IDLE_THREAD, 1000))
201    }
202
203    /// Picks the next thread to run and returns its context and task. This should only be called by sched_enter after land.
204    fn do_sched(&mut self, now: u64) -> Option<(*mut c_void, &mut task::Task)> {
205        // Sync the new state to the scheduler.
206        if self.sync_to_sched(now) {
207            // Trigger reschedule after interrupts are enabled.
208            return None;
209        }
210
211        // Pick the next thread to run.
212        let (new, budget) = self.select_next();
213
214        // At this point, the task/thread must exist. Everything else is a bug.
215        let Some(thread) = self.threads.get(new) else {
216            bug!("failed to pick thread {}. Does not exist.", new);
217        };
218        let (ctx, task_id) = (thread.ctx(), thread.task_id());
219
220        let Some(task) = self.tasks.get_mut(task_id) else {
221            bug!("failed to get task {}. Does not exist.", task_id);
222        };
223
224        // We don't need to resched if the thread has budget.
225        self.current = Some(new);
226        Self::next_resched(now, now.saturating_add(budget as u64));
227        Some((ctx, task))
228    }
229
230    /// Puts the current thread to sleep until the specified timepoint. This will trigger a reschedule.
231    ///
232    /// `until` - The timepoint to sleep until, in ticks. This is an absolute time, not a relative time.
233    /// `now` - The current timepoint, in ticks.
234    ///
235    /// Returns an error if there is no current thread, it is not enqueued, or if the specified timepoint is in the past.
236    pub fn sleep_until(&mut self, until: u64, now: u64) -> Result<()> {
237        if until <= now {
238            return Ok(());
239        }
240        let uid = self.current.ok_or(kerr!(InvalidArgument))?;
241
242        if let Some(thread) = self.threads.get_mut(uid) {
243            thread.set_waiter(Some(Waiter::new(until, uid)));
244        } else {
245            // This should not be possible. The thread must exist since it's the current thread.
246            bug!(
247                "failed to put current thread {} to sleep. Does not exist.",
248                uid
249            );
250        }
251
252        dequeue!(self, uid)?;
253
254        if self
255            .wakeup
256            .insert(uid, &mut WaiterView::<N>::new(&mut self.threads))
257            .is_err()
258        {
259            // This should not be possible. The thread exists.
260            bug!("failed to insert thread {} into wakeup tree.", uid);
261        }
262
263        reschedule();
264        Ok(())
265    }
266
267    /// If the thread is currently sleeping, this will trigger a wakeup on the next reschedule. Note this does not trigger an immediate reschedule.
268    ///
269    /// Returns an error if the thread does not exist, or if the thread is not currently sleeping.
270    pub fn kick(&mut self, uid: thread::UId) -> Result<()> {
271        WaiterView::<N>::with(&mut self.threads, |view| {
272            self.wakeup.remove(uid, view)?;
273            let thread = view.get_mut(uid).unwrap_or_else(|| {
274                // This should not be possible. The thread must exist since it's in the wakeup tree.
275                bug!("failed to get thread {} from wakeup tree.", uid);
276            });
277            thread.set_until(0);
278            self.wakeup.insert(uid, view).unwrap_or_else(|_| {
279                // This should not be possible. The thread exists and we just removed it from the wakeup tree, so it must be able to be re-inserted.
280                bug!("failed to re-insert thread {} into wakeup tree.", uid);
281            });
282            Ok(())
283        })
284    }
285
286    /// This will just remove the thread from the scheduler, but it will not trigger a reschedule, even if the thread is currently running.
287    ///
288    /// Returns an error if the thread does not exist, or if the thread is not currently enqueued in any scheduler.
289    pub fn dequeue(&mut self, uid: thread::UId) -> Result<()> {
290        dequeue!(self, uid)
291    }
292
293    pub fn create_task(&mut self, attrs: task::Attributes) -> Result<task::UId> {
294        self.tasks.insert_with(|idx| {
295            let task = task::Task::new(task::UId::new(idx), attrs);
296            task.map(|t| (task::UId::new(idx), t))
297        })
298    }
299
300    /// Dequeues all threads of the task and removes the task. If the current thread belongs to the task, reschedule will be triggered.
301    ///
302    /// If the task does not exist, an error will be returned.
303    pub fn kill_by_task(&mut self, uid: task::UId) -> Result<()> {
304        let task = self.tasks.get_mut(uid).ok_or(kerr!(InvalidArgument))?;
305
306        while let Some(id) = task.threads().head() {
307            dequeue!(self, id)?;
308
309            if task.threads_mut().remove(id, &mut self.threads).is_err() {
310                // This should not be possible. The thread ID is from the thread list of the task, so it must exist.
311                bug!("failed to remove thread {} from task {}.", id, uid);
312            }
313
314            if self.threads.remove(&id).is_none() {
315                // This should not be possible. The thread ID is from the thread list of the task, so it must exist.
316                bug!("failed to remove thread {} from thread list.", id);
317            }
318
319            if Some(id) == self.current {
320                self.current = None;
321                reschedule();
322            }
323        }
324
325        self.tasks.remove(&uid).ok_or(kerr!(InvalidArgument))?;
326        Ok(())
327    }
328
329    pub fn create_thread(
330        &mut self,
331        task: Option<task::UId>,
332        attrs: &thread::Attributes,
333    ) -> Result<thread::UId> {
334        let task = match task {
335            Some(t) => t,
336            None => self.current.ok_or(kerr!(InvalidArgument))?.owner(),
337        };
338        let task = self.tasks.get_mut(task).ok_or(kerr!(InvalidArgument))?;
339
340        self.threads
341            .insert_with(|idx| {
342                let uid = task.allocate_tid().get_uid(idx);
343                let stack = task.allocate_stack(attrs)?;
344                let thread = thread::Thread::new(uid, stack, attrs.attrs);
345                Ok((uid, thread))
346            })
347            .and_then(|k| {
348                task.register_thread(k, &mut self.threads)?;
349                Ok(k)
350            })
351    }
352
353    /// Dequeues a thread and removes it from its corresponding task. If the thread is currently running, reschedule will be triggered.
354    ///
355    /// `uid` - The UID of the thread to kill, or None to kill the current thread.
356    ///
357    /// If the thread does not exist, or if `uid` is None and there is no current thread, an error will be returned.
358    pub fn kill_by_thread(&mut self, uid: Option<thread::UId>) -> Result<()> {
359        let uid = uid.unwrap_or(self.current.ok_or(kerr!(InvalidArgument))?);
360        self.dequeue(uid)?;
361
362        self.tasks
363            .get_mut(uid.tid().owner())
364            .ok_or(kerr!(InvalidArgument))?
365            .threads_mut()
366            .remove(uid, &mut self.threads)?;
367
368        self.threads.remove(&uid).ok_or(kerr!(InvalidArgument))?;
369
370        if Some(uid) == self.current {
371            self.current = None;
372            reschedule();
373        }
374        Ok(())
375    }
376}
377
378/// This function provides safe access to the global scheduler.
379/// It disables interrupts and locks the scheduler. Use with caution!
380pub fn with<T, F: FnOnce(&mut GlobalScheduler) -> T>(f: F) -> T {
381    sync::atomic::irq_free(|| {
382        let mut sched = SCHED.lock();
383        f(&mut sched)
384    })
385}
386
387/// Initializes the scheduler. This should be called once during kernel initialization, before any threads are created.
388///
389/// `kaddr_space` - The address space of the kernel task. This is used to create the kernel task, which is required for the scheduler to function.
390///
391/// If the kernel task cannot be created, this function will panic. Note that the kernel task is essential for the system to function, so we cannot continue without it.
392pub fn init(kaddr_space: mem::vmm::AddressSpace) {
393    with(|sched| {
394        let attrs = task::Attributes {
395            resrv_pgs: None,
396            address_space: Some(kaddr_space),
397        };
398
399        sched.create_task(attrs).unwrap_or_else(|e| {
400            panic!("failed to create kernel task: {}", e);
401        });
402    })
403}
404
405/// This should be called on each timer tick, and if it returns true, sched_enter should be called to reschedule.
406///
407/// `now` - The current timepoint, in ticks.
408pub fn needs_reschedule(now: u64) -> bool {
409    if DISABLED.load(Ordering::Acquire) {
410        return false;
411    }
412
413    now >= NEXT_TICK.load(Ordering::Acquire)
414}
415
416/// This will disable rescheduling until the next call to enable. Use with caution!
417#[inline]
418#[allow(dead_code)]
419pub fn disable() {
420    DISABLED.store(true, Ordering::Release);
421}
422
423#[inline]
424pub fn enable() {
425    DISABLED.store(false, Ordering::Release);
426}
427
428/// Triggers a reschedule immediately, when interrupts are enabled.
429/// This must be called after enqueueing a thread, or after waking up a thread, or putting the current thread to sleep.
430pub fn reschedule() {
431    if DISABLED.load(Ordering::Acquire) {
432        return;
433    }
434
435    hal::Machine::trigger_reschedule();
436}
437
438/// This will be called by the architecture-specific code to enter the scheduler. It will land the current thread, pick the next thread to run, and return its context and task.
439#[unsafe(no_mangle)]
440pub extern "C" fn sched_enter(mut ctx: *mut c_void) -> *mut c_void {
441    with(|sched| {
442        let old = sched.current.map(|c| c.owner());
443        sched.land(ctx);
444
445        if let Some((new, task)) = sched.do_sched(time::tick()) {
446            if old != Some(task.id) {
447                dispch::prepare(task);
448            }
449            ctx = new;
450        }
451
452        ctx
453    })
454}