with Ada.Calendar;
with Rx.Debug;
with Rx.Dispatchers;
with Rx.Impl.Shared_Observer;
with Rx.Src.Create;
with Rx.Subscriptions;
package body Rx.Src.Interval is
package Shared is new Rx.Impl.Shared_Observer (Typed);
use Typed.Conversions;
type Runner is new Dispatchers.Runnable with record
Sched : Schedulers.Scheduler;
Pause : Duration; -- Repetitive period
Value : Typed.D; -- Next value to emit
Next : Ada.Calendar.Time; -- Reference for next deadline
Child : Shared.Observer; -- Reduce copy stress with a shared observer across runnables
end record;
---------
-- Run --
---------
overriding procedure Run (R : Runner) is
use Ada.Calendar;
RW : Runner := R;
begin
RW.Child.On_Next (+R.Value);
RW.Value := +Succ (+R.Value);
RW.Next := R.Next + R.Pause;
RW.Sched.Schedule (RW, RW.Next);
exception
when No_Longer_Subscribed =>
Debug.Log ("Interval runner: caught No_Longer_Subscribed", Debug.Note);
when E : others =>
Typed.Defaults.Default_Error_Handler (RW.Child, E);
end Run;
type State is record
First : Typed.D;
Pause,
First_Pause : Duration;
Scheduler : Schedulers.Scheduler;
end record;
------------------
-- On_Subscribe --
------------------
procedure On_Subscribe (S : State; Observer : in out Typed.Observer) is
use Ada.Calendar;
R : constant Runner := (S.Scheduler,
S.Pause,
S.First,
Clock + S.First_Pause,
Shared.Create (Observer));
begin
S.Scheduler.Schedule (R, Clock + S.First_Pause);
end On_Subscribe;
package Pre is new Src.Create (Typed);
package Source is new Pre.With_State (State, On_Subscribe, Autocompletes => False);
------------
-- Create --
------------
function Create
(First : Typed.T;
Period : Duration := 1.0;
First_Pause : Duration := 1.0;
Scheduler : Schedulers.Scheduler := Schedulers.Computation)
return Typed.Observable
is
begin
return Source.Create (State'(+First, Period, First_Pause, Scheduler));
end Create;
end Rx.Src.Interval;