1mod dispch;
4pub mod rr;
5pub mod rt;
6pub mod task;
7pub mod thread;
8
9use core::{
10 ffi::c_void,
11 sync::atomic::{AtomicBool, Ordering},
12};
13
14use hal::Schedable;
15
16use crate::{
17 error::Result,
18 mem,
19 sched::thread::Waiter,
20 sync::{self, atomic::AtomicU64, spinlock::SpinLocked},
21 time::{self},
22 types::{
23 array::BitReclaimMap,
24 rbtree::RbTree,
25 traits::{Get, GetMut},
26 view::ViewMut,
27 },
28};
29
30type ThreadMap<const N: usize> = BitReclaimMap<thread::UId, thread::Thread, N>;
31type TaskMap<const N: usize> = BitReclaimMap<task::UId, task::Task, N>;
32
33type GlobalScheduler = Scheduler<32>;
34
35static SCHED: SpinLocked<GlobalScheduler> = SpinLocked::new(GlobalScheduler::new());
36
37static DISABLED: AtomicBool = AtomicBool::new(true);
38static NEXT_TICK: AtomicU64 = AtomicU64::new(0);
39
40type WaiterView<'a, const N: usize> = ViewMut<'a, thread::UId, thread::Waiter, ThreadMap<N>>;
41
42pub struct Scheduler<const N: usize> {
43 threads: ThreadMap<N>,
44 tasks: TaskMap<N>,
45
46 rt_scheduler: rt::Scheduler<N>,
47 rr_scheduler: rr::Scheduler<N>,
48
49 wakeup: RbTree<thread::WakupTree, thread::UId>,
50
51 current: Option<thread::UId>,
52 last_tick: u64,
53}
54
55macro_rules! dequeue {
57 ($self:expr, $uid:expr) => {
58 rt::ServerView::<N>::with(&mut $self.threads, |view| {
59 $self.rt_scheduler.dequeue($uid, view)
60 })
61 .or_else(|_| $self.rr_scheduler.dequeue($uid, &mut $self.threads))
62 .or_else(|_| {
63 $self
64 .wakeup
65 .remove($uid, &mut WaiterView::<N>::new(&mut $self.threads))
66 })
67 };
68}
69
70impl<const N: usize> Scheduler<N> {
71 const fn new() -> Self {
72 Self {
73 threads: ThreadMap::new(),
74 tasks: TaskMap::new(),
75 rt_scheduler: rt::Scheduler::new(),
76 rr_scheduler: rr::Scheduler::new(),
77 wakeup: RbTree::new(),
78 current: None,
79 last_tick: 0,
80 }
81 }
82
83 fn land(&mut self, ctx: *mut c_void) {
84 if let Some(current) = self.current {
85 let mut kill = None;
86 if let Some(thread) = self.threads.get_mut(current) {
87 if thread.save_ctx(ctx).is_err() {
88 warn!(
89 "failed to save context (SP: {:x}) of thread {}.",
90 ctx as usize, current
91 );
92 kill = Some(thread.task_id());
93 }
94 } else {
95 bug!("failed to land thread {}. Does not exist.", current);
96 }
97
98 if let Some(task_id) = kill {
99 if self.kill_by_task(task_id).is_err() {
100 bug!("failed to kill task {}", task_id);
102 }
103 }
104 }
105 }
106
107 fn next_resched(now: u64, next: u64) {
113 let old = NEXT_TICK.load(Ordering::Acquire);
114
115 if old > now && old <= next {
116 return;
117 }
118
119 NEXT_TICK.store(next, Ordering::Release);
120 }
121
122 pub fn enqueue(&mut self, now: u64, uid: thread::UId) -> Result<()> {
129 let thread = self.threads.get(uid).ok_or(kerr!(InvalidArgument))?;
130
131 if thread.rt_server().is_some() {
132 let mut view = rt::ServerView::<N>::new(&mut self.threads);
133 self.rt_scheduler.enqueue(uid, now, &mut view)?;
134 } else {
135 if self.rr_scheduler.enqueue(uid, &mut self.threads).is_err() {
136 bug!("failed to enqueue thread {} into RR scheduler.", uid);
140 }
141 }
142 reschedule();
143 Ok(())
144 }
145
146 fn do_wakeups(&mut self, now: u64) {
147 while let Some(uid) = self.wakeup.min() {
148 let mut done = false;
149 WaiterView::<N>::with(&mut self.threads, |view| {
150 if let Some(waiter) = view.get(uid) {
151 if waiter.until() > now {
152 Self::next_resched(now, waiter.until());
153 done = true;
154 return;
155 }
156
157 if let Err(_) = self.wakeup.remove(uid, view) {
158 bug!("failed to remove thread {} from wakeup tree.", uid);
159 }
160 } else {
161 bug!("failed to get thread {} from wakeup tree.", uid);
162 }
163 });
164
165 if done {
166 break;
167 }
168
169 if self.enqueue(now, uid).is_err() {
170 bug!("failed to enqueue thread {} after wakeup.", uid);
171 }
172 }
173 }
174
175 fn sync_to_sched(&mut self, now: u64) -> bool {
177 let dt = now - self.last_tick;
178 self.last_tick = now;
179
180 if let Some(old) = self.current {
181 let throttle = rt::ServerView::<N>::with(&mut self.threads, |view| {
182 self.rt_scheduler.put(old, dt, view)
183 });
184
185 if let Some(throttle) = throttle {
186 let _ = self.sleep_until(throttle, now);
187 return true;
188 }
189
190 self.rr_scheduler.put(old, dt as u32);
191 }
192
193 self.do_wakeups(now);
194 false
195 }
196
197 fn select_next(&mut self) -> (thread::UId, u32) {
198 rt::ServerView::<N>::with(&mut self.threads, |view| self.rt_scheduler.pick(view))
199 .or_else(|| self.rr_scheduler.pick(&mut self.threads))
200 .unwrap_or((thread::IDLE_THREAD, 1000))
201 }
202
203 fn do_sched(&mut self, now: u64) -> Option<(*mut c_void, &mut task::Task)> {
205 if self.sync_to_sched(now) {
207 return None;
209 }
210
211 let (new, budget) = self.select_next();
213
214 let Some(thread) = self.threads.get(new) else {
216 bug!("failed to pick thread {}. Does not exist.", new);
217 };
218 let (ctx, task_id) = (thread.ctx(), thread.task_id());
219
220 let Some(task) = self.tasks.get_mut(task_id) else {
221 bug!("failed to get task {}. Does not exist.", task_id);
222 };
223
224 self.current = Some(new);
226 Self::next_resched(now, now.saturating_add(budget as u64));
227 Some((ctx, task))
228 }
229
230 pub fn sleep_until(&mut self, until: u64, now: u64) -> Result<()> {
237 if until <= now {
238 return Ok(());
239 }
240 let uid = self.current.ok_or(kerr!(InvalidArgument))?;
241
242 if let Some(thread) = self.threads.get_mut(uid) {
243 thread.set_waiter(Some(Waiter::new(until, uid)));
244 } else {
245 bug!(
247 "failed to put current thread {} to sleep. Does not exist.",
248 uid
249 );
250 }
251
252 dequeue!(self, uid)?;
253
254 if self
255 .wakeup
256 .insert(uid, &mut WaiterView::<N>::new(&mut self.threads))
257 .is_err()
258 {
259 bug!("failed to insert thread {} into wakeup tree.", uid);
261 }
262
263 reschedule();
264 Ok(())
265 }
266
267 pub fn kick(&mut self, uid: thread::UId) -> Result<()> {
271 WaiterView::<N>::with(&mut self.threads, |view| {
272 self.wakeup.remove(uid, view)?;
273 let thread = view.get_mut(uid).unwrap_or_else(|| {
274 bug!("failed to get thread {} from wakeup tree.", uid);
276 });
277 thread.set_until(0);
278 self.wakeup.insert(uid, view).unwrap_or_else(|_| {
279 bug!("failed to re-insert thread {} into wakeup tree.", uid);
281 });
282 Ok(())
283 })
284 }
285
286 pub fn dequeue(&mut self, uid: thread::UId) -> Result<()> {
290 dequeue!(self, uid)
291 }
292
293 pub fn create_task(&mut self, attrs: task::Attributes) -> Result<task::UId> {
294 self.tasks.insert_with(|idx| {
295 let task = task::Task::new(task::UId::new(idx), attrs);
296 task.map(|t| (task::UId::new(idx), t))
297 })
298 }
299
300 pub fn kill_by_task(&mut self, uid: task::UId) -> Result<()> {
304 let task = self.tasks.get_mut(uid).ok_or(kerr!(InvalidArgument))?;
305
306 while let Some(id) = task.threads().head() {
307 dequeue!(self, id)?;
308
309 if task.threads_mut().remove(id, &mut self.threads).is_err() {
310 bug!("failed to remove thread {} from task {}.", id, uid);
312 }
313
314 if self.threads.remove(&id).is_none() {
315 bug!("failed to remove thread {} from thread list.", id);
317 }
318
319 if Some(id) == self.current {
320 self.current = None;
321 reschedule();
322 }
323 }
324
325 self.tasks.remove(&uid).ok_or(kerr!(InvalidArgument))?;
326 Ok(())
327 }
328
329 pub fn create_thread(
330 &mut self,
331 task: Option<task::UId>,
332 attrs: &thread::Attributes,
333 ) -> Result<thread::UId> {
334 let task = match task {
335 Some(t) => t,
336 None => self.current.ok_or(kerr!(InvalidArgument))?.owner(),
337 };
338 let task = self.tasks.get_mut(task).ok_or(kerr!(InvalidArgument))?;
339
340 self.threads
341 .insert_with(|idx| {
342 let uid = task.allocate_tid().get_uid(idx);
343 let stack = task.allocate_stack(attrs)?;
344 let thread = thread::Thread::new(uid, stack, attrs.attrs);
345 Ok((uid, thread))
346 })
347 .and_then(|k| {
348 task.register_thread(k, &mut self.threads)?;
349 Ok(k)
350 })
351 }
352
353 pub fn kill_by_thread(&mut self, uid: Option<thread::UId>) -> Result<()> {
359 let uid = uid.unwrap_or(self.current.ok_or(kerr!(InvalidArgument))?);
360 self.dequeue(uid)?;
361
362 self.tasks
363 .get_mut(uid.tid().owner())
364 .ok_or(kerr!(InvalidArgument))?
365 .threads_mut()
366 .remove(uid, &mut self.threads)?;
367
368 self.threads.remove(&uid).ok_or(kerr!(InvalidArgument))?;
369
370 if Some(uid) == self.current {
371 self.current = None;
372 reschedule();
373 }
374 Ok(())
375 }
376}
377
378pub fn with<T, F: FnOnce(&mut GlobalScheduler) -> T>(f: F) -> T {
381 sync::atomic::irq_free(|| {
382 let mut sched = SCHED.lock();
383 f(&mut sched)
384 })
385}
386
387pub fn init(kaddr_space: mem::vmm::AddressSpace) {
393 with(|sched| {
394 let attrs = task::Attributes {
395 resrv_pgs: None,
396 address_space: Some(kaddr_space),
397 };
398
399 sched.create_task(attrs).unwrap_or_else(|e| {
400 panic!("failed to create kernel task: {}", e);
401 });
402 })
403}
404
405pub fn needs_reschedule(now: u64) -> bool {
409 if DISABLED.load(Ordering::Acquire) {
410 return false;
411 }
412
413 now >= NEXT_TICK.load(Ordering::Acquire)
414}
415
416#[inline]
418#[allow(dead_code)]
419pub fn disable() {
420 DISABLED.store(true, Ordering::Release);
421}
422
423#[inline]
424pub fn enable() {
425 DISABLED.store(false, Ordering::Release);
426}
427
428pub fn reschedule() {
431 if DISABLED.load(Ordering::Acquire) {
432 return;
433 }
434
435 hal::Machine::trigger_reschedule();
436}
437
438#[unsafe(no_mangle)]
440pub extern "C" fn sched_enter(mut ctx: *mut c_void) -> *mut c_void {
441 with(|sched| {
442 let old = sched.current.map(|c| c.owner());
443 sched.land(ctx);
444
445 if let Some((new, task)) = sched.do_sched(time::tick()) {
446 if old != Some(task.id) {
447 dispch::prepare(task);
448 }
449 ctx = new;
450 }
451
452 ctx
453 })
454}