rxada_0.1.1_dd9da799/src/rx-schedulers.ads

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
private with Ada.Task_Identification;

with Rx.Dispatchers;

private with Rx.Dispatchers.Immediate;
private with Rx.Dispatchers.Pools;

private with System.Multiprocessors;

package Rx.Schedulers is

   --  Note: there's is never, in the current implementation, reclaiming of created threads.
   --  So you might try to be conservative (i.e. use Idle_Thread instead of New_Thread).
   --  New_Thread may be justified in setup stages where you are reserving threads for tasks.

   --  For custom allocation of threads, see Schedulers.Pools

   type Thread is access all Rx.Dispatchers.Dispatcher'Class;
   --  An actual scheduler thread

   type Pool is limited interface;

   type Pool_Access is access all Pool'Class;

   function Get_Thread (This : in out Pool) return Thread is abstract;

   type Thread_Allocator is access function return Thread;

   -----------------
   --  Scheduler  --
   -----------------

   type Scheduler is tagged private;

   function Get_Thread (This : Scheduler) return Thread;
   --  This retrieves one thread reference for use

   function To_Scheduler (This : Thread_Allocator)   return Scheduler;
   function To_Scheduler (This : aliased in out Pool'Class) return Scheduler;
   --  Two ways of obtaining a Scheduler

   --  Operators that change thread (Observe_On/Subscribe_On) do so only once,
   --    during Subscribe. Hence these operators must store a scheduler

   --------------------------
   --  Default Schedulers  --
   --------------------------

   function Input_Output return Scheduler;
   function IO 		 return Scheduler renames Input_Output; -- Rx usual name
   --  This is backed by a thread pool that always returns an idle thread
   --  so it can grow unboundedly

   function Computation return Scheduler;
   --  This is backed by a thread pool with as many threads as CPUs

   function Idle_Thread return Scheduler;
   --  This is backed by a thread pool that is reused for idle threads whenever possible
   --  Like IO, can grow as much as needed

   function New_Thread  return Scheduler;
   --  Returns a new thread that won't be used anywhere else (nor garbage collected!)

   function Immediate 	return Scheduler;
   --  Does nothing, code is executed as it arrives

   procedure Shutdown renames Dispatchers.Shutdown;
   --  Signal schedulers to exit.
   --  Necessary when there are infinite sequences going on (e.g. Interval)

   function Terminating return Boolean renames Dispatchers.Terminating;
   --  Will be true after shutdown has been invoked

   function Current_Thread_Id return String;
   --  Shortcut for Ada.Task_Identification

private

   type Scheduler is tagged record
      Allocator : Thread_Allocator;
      Pool      : Pool_Access;
   end record;

   function To_Scheduler (This : Thread_Allocator)   return Scheduler is
     (Scheduler'(Allocator => This,
                 Pool      => null));

   function To_Scheduler (This : aliased in out Pool'Class) return Scheduler is
     (Scheduler'(Allocator => null,
                 Pool      => This'Unchecked_Access));

   function Get_Thread (This : Scheduler) return Thread is
     (if This.Allocator /= null then This.Allocator.all
      elsif This.Pool   /= null then This.Pool.Get_Thread
      else raise Program_Error with "Uninitialized Scheduler");

   use Rx.Dispatchers;

   Pool_CPU  : Pools.Pool (Positive (System.Multiprocessors.Number_Of_CPUs), new String'("CPU"));
   Pool_IO   : Pools.Pool (1, new String'("IO"));
   Pool_Idle : Pools.Pool (1, new String'("Idle"));
   Pool_Excl : Pools.Pool (1, new String'("Excl"));

   Real_Immed : aliased Dispatchers.Immediate.Dispatcher;
   function Immediate_Impl return Thread    is (Real_Immed'Access);
   function Immediate      return Scheduler is (To_Scheduler (Immediate_Impl'Access));

   function IO_Impl return Thread    is (Thread (Pool_IO.Find_Idle));
   function Input_Output return Scheduler is (To_Scheduler (IO_Impl'Access));

   function Computation_Impl return Thread    is (Thread (Pool_CPU.Get));
   function Computation      return Scheduler is (To_Scheduler (Computation_Impl'Access));

   function Idle_Thread_Impl return Thread    is (Thread (Pool_Idle.Find_Idle));
   function Idle_Thread      return Scheduler is (To_Scheduler (Idle_Thread_Impl'Access));

   function New_Thread_Impl return Thread    is (Thread (Pool_Excl.New_One));
   function New_Thread      return Scheduler is (To_Scheduler (New_Thread_Impl'Access));

   use Ada.Task_Identification;
   function Current_Thread_Id return String is (Image (Current_Task));

end Rx.Schedulers;