1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | private with Ada.Task_Identification;
with Rx.Dispatchers;
private with Rx.Dispatchers.Immediate;
private with Rx.Dispatchers.Pools;
private with System.Multiprocessors;
package Rx.Schedulers is
-- Note: there's is never, in the current implementation, reclaiming of created threads.
-- So you might try to be conservative (i.e. use Idle_Thread instead of New_Thread).
-- New_Thread may be justified in setup stages where you are reserving threads for tasks.
-- For custom allocation of threads, see Schedulers.Pools
type Thread is access all Rx.Dispatchers.Dispatcher'Class;
-- An actual scheduler thread
type Pool is limited interface;
type Pool_Access is access all Pool'Class;
function Get_Thread (This : in out Pool) return Thread is abstract;
type Thread_Allocator is access function return Thread;
-----------------
-- Scheduler --
-----------------
type Scheduler is tagged private;
function Get_Thread (This : Scheduler) return Thread;
-- This retrieves one thread reference for use
function To_Scheduler (This : Thread_Allocator) return Scheduler;
function To_Scheduler (This : aliased in out Pool'Class) return Scheduler;
-- Two ways of obtaining a Scheduler
-- Operators that change thread (Observe_On/Subscribe_On) do so only once,
-- during Subscribe. Hence these operators must store a scheduler
--------------------------
-- Default Schedulers --
--------------------------
function Input_Output return Scheduler;
function IO return Scheduler renames Input_Output; -- Rx usual name
-- This is backed by a thread pool that always returns an idle thread
-- so it can grow unboundedly
function Computation return Scheduler;
-- This is backed by a thread pool with as many threads as CPUs
function Idle_Thread return Scheduler;
-- This is backed by a thread pool that is reused for idle threads whenever possible
-- Like IO, can grow as much as needed
function New_Thread return Scheduler;
-- Returns a new thread that won't be used anywhere else (nor garbage collected!)
function Immediate return Scheduler;
-- Does nothing, code is executed as it arrives
procedure Shutdown renames Dispatchers.Shutdown;
-- Signal schedulers to exit.
-- Necessary when there are infinite sequences going on (e.g. Interval)
function Terminating return Boolean renames Dispatchers.Terminating;
-- Will be true after shutdown has been invoked
function Current_Thread_Id return String;
-- Shortcut for Ada.Task_Identification
private
type Scheduler is tagged record
Allocator : Thread_Allocator;
Pool : Pool_Access;
end record;
function To_Scheduler (This : Thread_Allocator) return Scheduler is
(Scheduler'(Allocator => This,
Pool => null));
function To_Scheduler (This : aliased in out Pool'Class) return Scheduler is
(Scheduler'(Allocator => null,
Pool => This'Unchecked_Access));
function Get_Thread (This : Scheduler) return Thread is
(if This.Allocator /= null then This.Allocator.all
elsif This.Pool /= null then This.Pool.Get_Thread
else raise Program_Error with "Uninitialized Scheduler");
use Rx.Dispatchers;
Pool_CPU : Pools.Pool (Positive (System.Multiprocessors.Number_Of_CPUs), new String'("CPU"));
Pool_IO : Pools.Pool (1, new String'("IO"));
Pool_Idle : Pools.Pool (1, new String'("Idle"));
Pool_Excl : Pools.Pool (1, new String'("Excl"));
Real_Immed : aliased Dispatchers.Immediate.Dispatcher;
function Immediate_Impl return Thread is (Real_Immed'Access);
function Immediate return Scheduler is (To_Scheduler (Immediate_Impl'Access));
function IO_Impl return Thread is (Thread (Pool_IO.Find_Idle));
function Input_Output return Scheduler is (To_Scheduler (IO_Impl'Access));
function Computation_Impl return Thread is (Thread (Pool_CPU.Get));
function Computation return Scheduler is (To_Scheduler (Computation_Impl'Access));
function Idle_Thread_Impl return Thread is (Thread (Pool_Idle.Find_Idle));
function Idle_Thread return Scheduler is (To_Scheduler (Idle_Thread_Impl'Access));
function New_Thread_Impl return Thread is (Thread (Pool_Excl.New_One));
function New_Thread return Scheduler is (To_Scheduler (New_Thread_Impl'Access));
use Ada.Task_Identification;
function Current_Thread_Id return String is (Image (Current_Task));
end Rx.Schedulers;
|