1mod dispch;
4pub mod rr;
5pub mod rt;
6pub mod task;
7pub mod thread;
8
9use core::{
10 ffi::c_void,
11 sync::atomic::{AtomicBool, Ordering},
12};
13
14use crate::hal::{self, Schedable};
15
16use crate::{
17 error::Result,
18 mem,
19 sync::{self, atomic::AtomicU64, spinlock::SpinLocked},
20 time::{self},
21 types::{
22 array::BitReclaimMap,
23 rbtree::RbTree,
24 traits::{Get, GetMut},
25 view::ViewMut,
26 },
27};
28
29type ThreadMap<const N: usize> = BitReclaimMap<thread::UId, thread::Thread, N>;
30type TaskMap<const N: usize> = BitReclaimMap<task::UId, task::Task, N>;
31
32pub(crate) const THREAD_COUNT: usize = 32;
33type GlobalScheduler = Scheduler<THREAD_COUNT>;
34
35static SCHED: SpinLocked<GlobalScheduler> = SpinLocked::new(GlobalScheduler::new());
36
37static DISABLED: AtomicBool = AtomicBool::new(true);
38static NEXT_TICK: AtomicU64 = AtomicU64::new(0);
39
40type WaiterView<'a, const N: usize> = ViewMut<'a, thread::UId, thread::Waiter, ThreadMap<N>>;
41
42pub struct Scheduler<const N: usize> {
43 threads: ThreadMap<N>,
44 tasks: TaskMap<N>,
45
46 rt_scheduler: rt::Scheduler<N>,
47 rr_scheduler: rr::Scheduler<N>,
48
49 wakeup: RbTree<thread::WakupTree, thread::UId>,
50
51 current: Option<thread::UId>,
52 last_tick: u64,
53}
54
55unsafe impl<const N: usize> Send for Scheduler<N> {}
58unsafe impl<const N: usize> Sync for Scheduler<N> {}
60
61macro_rules! kill {
63 ($self:expr, $uid:expr) => {{
64 let _ = rt::ServerView::<N>::with(&mut $self.threads, |view| {
65 $self.rt_scheduler.dequeue($uid, view)
66 });
67 let _ = $self.rr_scheduler.dequeue($uid, &mut $self.threads);
68 let _ = $self
69 .wakeup
70 .remove($uid, &mut WaiterView::<N>::new(&mut $self.threads));
71 if let Some(thread) = $self.threads.get_mut($uid) {
72 thread.resume();
73 }
74 Ok::<(), crate::error::Error>(())
75 }};
76}
77
78impl<const N: usize> Scheduler<N> {
79 const fn new() -> Self {
80 Self {
81 threads: ThreadMap::new(),
82 tasks: TaskMap::new(),
83 rt_scheduler: rt::Scheduler::new(),
84 rr_scheduler: rr::Scheduler::new(),
85 wakeup: RbTree::new(),
86 current: None,
87 last_tick: 0,
88 }
89 }
90
91 fn land(&mut self, ctx: *mut c_void) {
92 if let Some(current) = self.current {
93 let mut kill = None;
94 if let Some(thread) = self.threads.get_mut(current) {
95 if thread.save_ctx(ctx).is_err() {
96 warn!(
97 "failed to save context (SP: {:x}) of thread {}.",
98 ctx as usize, current
99 );
100 kill = Some(thread.task_id());
101 }
102 } else {
103 bug!("failed to land thread {}. Does not exist.", current);
104 }
105
106 if let Some(task_id) = kill {
107 if self.kill_by_task(task_id).is_err() {
108 bug!("failed to kill task {}", task_id);
110 }
111 }
112 }
113 }
114
115 fn next_resched(now: u64, next: u64) {
121 let old = NEXT_TICK.load(Ordering::Acquire);
122
123 if old > now && old <= next {
124 return;
125 }
126
127 NEXT_TICK.store(next, Ordering::Release);
128 }
129
130 pub fn enqueue(&mut self, now: u64, uid: thread::UId) -> Result<()> {
137 let thread = self.threads.get(uid).ok_or(kerr!(EINVAL))?;
138
139 if thread.rt_server().is_some() {
140 let mut view = rt::ServerView::<N>::new(&mut self.threads);
141 self.rt_scheduler.enqueue(uid, now, &mut view)?;
142 } else {
143 if self.rr_scheduler.enqueue(uid, &mut self.threads).is_err() {
144 bug!("failed to enqueue thread {} into RR scheduler.", uid);
148 }
149 }
150 reschedule();
151 Ok(())
152 }
153
154 fn do_wakeups(&mut self, now: u64) {
155 while let Some(uid) = self.wakeup.min() {
156 let mut stop = false;
157 WaiterView::<N>::with(&mut self.threads, |view| {
158 if let Some(waiter) = view.get(uid) {
159 if waiter.until() > now {
160 Self::next_resched(now, waiter.until());
161 stop = true;
162 return;
163 }
164
165 if let Err(_) = self.wakeup.remove(uid, view) {
166 bug!("failed to remove thread {} from wakeup tree.", uid);
167 }
168 } else {
169 bug!("failed to get thread {} from wakeup tree.", uid);
170 }
171 });
172
173 if stop {
174 break;
175 }
176
177 if let Some(thread) = self.threads.get_mut(uid) {
178 thread.resume();
179 } else {
180 bug!("failed to wake thread {}. Does not exist.", uid);
182 }
183
184 if self.enqueue(now, uid).is_err() {
185 bug!("failed to enqueue thread {} after wakeup.", uid);
186 }
187 }
188 }
189
190 fn sync_to_sched(&mut self, now: u64) {
192 let dt = now - self.last_tick;
193 self.last_tick = now;
194
195 if let Some(old) = self.current {
196 let throttle = rt::ServerView::<N>::with(&mut self.threads, |view| {
197 self.rt_scheduler.put(old, dt, view)
198 });
199
200 if let Some(throttle) = throttle {
201 if throttle <= now {
202 rt::ServerView::<N>::with(&mut self.threads, |view| {
203 let _ = self.rt_scheduler.dequeue(old, view);
204 let _ = self.rt_scheduler.enqueue(old, now, view);
205 });
206 } else {
207 self.current = None;
209 let _ = self.sleep_until(Some(old), throttle, now);
210 self.current = Some(old);
211 }
212 } else {
213 self.rr_scheduler.put(old, dt as u32);
214 }
215 }
216
217 self.do_wakeups(now);
218 }
219
220 fn select_next(&mut self) -> (thread::UId, u32) {
221 rt::ServerView::<N>::with(&mut self.threads, |view| self.rt_scheduler.pick(view))
222 .or_else(|| self.rr_scheduler.pick(&mut self.threads))
223 .unwrap_or((thread::IDLE_THREAD, 1000))
224 }
225
226 fn do_sched(&mut self, now: u64) -> Option<(*mut c_void, &mut task::Task)> {
228 self.sync_to_sched(now);
230
231 let (new, budget) = self.select_next();
233
234 let Some(thread) = self.threads.get(new) else {
236 bug!("failed to pick thread {}. Does not exist.", new);
237 };
238 let (ctx, task_id) = (thread.ctx(), thread.task_id());
239
240 let Some(task) = self.tasks.get_mut(task_id) else {
241 bug!("failed to get task {}. Does not exist.", task_id);
242 };
243
244 self.current = Some(new);
246 Self::next_resched(now, now.saturating_add(budget as u64));
247 Some((ctx, task))
248 }
249
250 pub fn sleep_until(&mut self, uid: Option<thread::UId>, until: u64, now: u64) -> Result<()> {
258 if until <= now {
259 return Ok(());
260 }
261 let uid = match uid {
262 Some(uid) => uid,
263 None => self.current.ok_or(kerr!(EINVAL))?,
264 };
265 let _ = self.dequeue(uid);
268
269 let already = match self.threads.get_mut(uid) {
271 Some(thread) if thread.is_waiting() => true,
272 Some(_) => false,
273 None => return Err(kerr!(EINVAL)),
274 };
275
276 if already {
278 WaiterView::with(&mut self.threads, |view| self.wakeup.remove(uid, view))?;
279 }
280
281 if let Some(thread) = self.threads.get_mut(uid) {
283 thread.wait(until);
284 } else {
285 bug!("failed to set thread {} to waiting. Does not exist.", uid);
287 }
288
289 let res = WaiterView::with(&mut self.threads, |view| self.wakeup.insert(uid, view));
291
292 if res.is_err() {
293 bug!("failed to insert thread {} into wakeup tree.", uid);
295 }
296 Ok(())
297 }
298
299 pub fn kick_by_uid(&mut self, uid: usize) -> Result<()> {
301 let lookup_uid = thread::UId::new(uid, thread::Id::new(0, crate::sched::task::UId::new(0)));
302 self.kick(lookup_uid)
303 }
304
305 pub fn current_uid(&self) -> Option<usize> {
306 self.current.map(|uid| uid.as_usize())
307 }
308
309 pub fn kick(&mut self, uid: thread::UId) -> Result<()> {
313 let now = time::tick();
314 let res = WaiterView::with(&mut self.threads, |view| self.wakeup.remove(uid, view));
315
316 if let Some(thread) = self.threads.get_mut(uid) {
317 thread.resume();
318 } else {
319 return Err(kerr!(EINVAL)); }
321
322 if res.is_ok() {
323 self.enqueue(now, uid)?;
324 }
325 Ok(())
326 }
327
328 pub fn dequeue(&mut self, uid: thread::UId) -> Result<()> {
333 rt::ServerView::<N>::with(&mut self.threads, |view| {
334 self.rt_scheduler.dequeue(uid, view)
335 })
336 .or_else(|_| self.rr_scheduler.dequeue(uid, &mut self.threads))?;
337
338 if Some(uid) == self.current {
339 reschedule();
340 }
341 Ok(())
342 }
343
344 pub fn create_task(&mut self, attrs: task::Attributes) -> Result<task::UId> {
345 let task_id = self.tasks.insert_with(|idx| {
346 let task = task::Task::new(task::UId::new(idx), attrs);
347 task.map(|t| (task::UId::new(idx), t))
348 })?;
349
350 #[cfg(any(feature = "metrics", metrics))]
351 if let Some(task) = self.tasks.get(task_id) {
352 crate::metrics::store::write_task_heap(
353 task_id.as_usize(),
354 task.allocator_metrics().into(),
355 );
356 }
357
358 Ok(task_id)
359 }
360
361 pub fn kill_by_task(&mut self, uid: task::UId) -> Result<()> {
365 let task = self.tasks.get_mut(uid).ok_or(kerr!(EINVAL))?;
366
367 while let Some(id) = task.threads().head() {
368 kill!(self, id)?;
369
370 if task.threads_mut().remove(id, &mut self.threads).is_err() {
371 bug!("failed to remove thread {} from task {}.", id, uid);
373 }
374
375 if self.threads.remove(&id).is_none() {
376 bug!("failed to remove thread {} from thread list.", id);
378 }
379
380 #[cfg(any(feature = "metrics", metrics))]
381 crate::metrics::store::clear_thread_stack(id.as_usize());
382
383 if Some(id) == self.current {
384 self.current = None;
385 reschedule();
386 }
387 }
388
389 self.tasks.remove(&uid).ok_or(kerr!(EINVAL))?;
390
391 #[cfg(any(feature = "metrics", metrics))]
392 crate::metrics::store::clear_task_heap(uid.as_usize());
393
394 Ok(())
395 }
396
397 pub fn create_thread(
398 &mut self,
399 task: Option<task::UId>,
400 attrs: &thread::Attributes,
401 ) -> Result<thread::UId> {
402 let task = match task {
403 Some(t) => t,
404 None => self.current.ok_or(kerr!(EINVAL))?.owner(),
405 };
406 let task = self.tasks.get_mut(task).ok_or(kerr!(EINVAL))?;
407
408 let uid = self
409 .threads
410 .insert_with(|idx| {
411 let uid = task.allocate_tid().get_uid(idx);
412 let stack = task.allocate_stack(attrs)?;
413 let thread = thread::Thread::new(uid, stack, attrs.attrs);
414 Ok((uid, thread))
415 })
416 .and_then(|k| {
417 task.register_thread(k, &mut self.threads)?;
418 Ok(k)
419 })?;
420
421 #[cfg(any(feature = "metrics", metrics))]
422 if let Some(thread) = self.threads.get(uid) {
423 crate::metrics::store::write_thread_stack(
424 uid.as_usize(),
425 thread.stack_metrics().into(),
426 );
427 }
428
429 Ok(uid)
430 }
431
432 pub fn kill_by_thread(&mut self, uid: Option<thread::UId>) -> Result<()> {
438 let uid = match uid {
439 Some(uid) => uid,
440 None => self.current.ok_or(kerr!(EINVAL))?,
441 };
442 kill!(self, uid)?;
443
444 self.tasks
445 .get_mut(uid.tid().owner())
446 .ok_or(kerr!(EINVAL))?
447 .threads_mut()
448 .remove(uid, &mut self.threads)?;
449
450 self.threads.remove(&uid).ok_or(kerr!(EINVAL))?;
451
452 #[cfg(any(feature = "metrics", metrics))]
453 crate::metrics::store::clear_thread_stack(uid.as_usize());
454
455 if Some(uid) == self.current {
456 self.current = None;
457 reschedule();
458 }
459 Ok(())
460 }
461
462 #[cfg(any(feature = "metrics", metrics))]
465 fn mirror_stats(&self) {
466 use crate::metrics::store;
467
468 store::write_global_heap(crate::mem::global_metrics().into());
469
470 if let Some(uid) = self.current {
471 if let Some(thread) = self.threads.get(uid) {
472 store::write_thread_stack(uid.as_usize(), thread.stack_metrics().into());
473
474 let task_id = thread.task_id();
475 if let Some(task) = self.tasks.get(task_id) {
476 store::write_task_heap(task_id.as_usize(), task.allocator_metrics().into());
477 }
478 }
479 }
480 }
481}
482
483pub fn with<T, F: FnOnce(&mut GlobalScheduler) -> T>(f: F) -> T {
486 sync::atomic::irq_free(|| {
490 let mut sched = SCHED.lock();
491 f(&mut sched)
492 })
493}
494
495pub fn init(kaddr_space: mem::vmm::AddressSpace) {
501 with(|sched| {
502 let attrs = task::Attributes {
503 resrv_pgs: None,
504 address_space: Some(kaddr_space),
505 };
506
507 sched.create_task(attrs).unwrap_or_else(|e| {
508 panic!("failed to create kernel task: {}", e);
509 });
510 })
511}
512
513pub fn needs_reschedule(now: u64) -> bool {
517 if DISABLED.load(Ordering::Acquire) {
518 return false;
519 }
520
521 now >= NEXT_TICK.load(Ordering::Acquire)
522}
523
524#[inline]
526#[allow(dead_code)]
527pub fn disable() {
528 DISABLED.store(true, Ordering::Release);
529}
530
531#[inline]
532pub fn enable() {
533 DISABLED.store(false, Ordering::Release);
534}
535
536pub fn reschedule() {
539 if DISABLED.load(Ordering::Acquire) {
540 return;
541 }
542
543 hal::Machine::trigger_reschedule();
544}
545
546#[unsafe(no_mangle)]
550pub extern "C" fn kick_thread(uid: u32) {
551 with(|sched| {
552 let _ = sched.kick_by_uid(uid as usize);
553 });
554 reschedule();
555}
556
557#[unsafe(no_mangle)]
559pub extern "C" fn sched_enter(mut ctx: *mut c_void) -> *mut c_void {
560 with(|sched| {
561 let old = sched.current.map(|c| c.owner());
562 sched.land(ctx);
563
564 #[cfg(any(feature = "metrics", metrics))]
568 sched.mirror_stats();
569
570 if let Some((new, task)) = sched.do_sched(time::tick()) {
571 if old != Some(task.id) {
572 dispch::prepare(task);
573 }
574 ctx = new;
575 }
576
577 ctx
578 })
579}
580
581extern "C" fn thread_finalizer() -> ! {
582 with(|sched| {
583 if sched.kill_by_thread(None).is_err() {
584 bug!("failed to terminate returned thread.");
585 }
586 });
587 loop {
588 hal::asm::nop!();
589 }
590}