rxada_0.1.0_6ff779c7/src/body/rx-src-interval.adb

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
with Ada.Calendar;

with Rx.Debug;
with Rx.Dispatchers;
with Rx.Impl.Shared_Observer;
with Rx.Src.Create;
with Rx.Subscriptions;

package body Rx.Src.Interval is

   package Shared is new Rx.Impl.Shared_Observer (Typed);

   use Typed.Conversions;

   type Runner is new Dispatchers.Runnable with record
      Sched  : Schedulers.Scheduler;
      Pause  : Duration;		 -- Repetitive period
      Value  : Typed.D;	   	 	 -- Next value to emit
      Next   : Ada.Calendar.Time;	 -- Reference for next deadline

      Child  : Shared.Observer; -- Reduce copy stress with a shared observer across runnables
   end record;

   ---------
   -- Run --
   ---------

   overriding procedure Run (R : Runner) is
      use Ada.Calendar;
      RW : Runner := R;
   begin
      RW.Child.On_Next (+R.Value);
      RW.Value := +Succ (+R.Value);
      RW.Next  := R.Next + R.Pause;
      RW.Sched.Schedule (RW, RW.Next);
   exception
      when No_Longer_Subscribed =>
         Debug.Log ("Interval runner: caught No_Longer_Subscribed", Debug.Note);
      when E : others =>
         Typed.Defaults.Default_Error_Handler (RW.Child, E);
   end Run;

   type State is record
      First       : Typed.D;
      Pause,
      First_Pause : Duration;
      Scheduler   : Schedulers.Scheduler;
   end record;

   ------------------
   -- On_Subscribe --
   ------------------

   procedure On_Subscribe (S : State; Observer : in out Typed.Observer) is
      use Ada.Calendar;
      R : constant Runner := (S.Scheduler,
                              S.Pause,
                              S.First,
                              Clock + S.First_Pause,
                              Shared.Create (Observer));
   begin
      S.Scheduler.Schedule (R, Clock + S.First_Pause);
   end On_Subscribe;

   package Pre is new Src.Create (Typed);
   package Source is new Pre.With_State (State, On_Subscribe, Autocompletes => False);

   ------------
   -- Create --
   ------------

   function Create
     (First       : Typed.T;
                    Period       : Duration := 1.0;
                    First_Pause : Duration := 1.0;
                    Scheduler   : Schedulers.Scheduler := Schedulers.Computation)
                    return Typed.Observable
   is
   begin
      return Source.Create (State'(+First, Period, First_Pause, Scheduler));
   end Create;

end Rx.Src.Interval;