rxada_0.1.1_dd9da799/src/body/rx-impl-transformers.adb

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
with Ada.Tags;
with Rx.Debug;

package body Rx.Impl.Transformers is

   ----------------
   -- Debug_Dump --
   ----------------

   procedure Debug_Dump (This : in out Operator'Class) is

      procedure Print_Upstream (This : Operator'Class) is
      begin
         Debug.Trace ("upst: " & Ada.Tags.Expanded_Name (This'Tag));
         if This.Has_Parent then
            if This.Get_Parent in Operator'Class then
               Print_Upstream (Operator'Class (This.Get_Parent));
            else
               Debug.Trace ("upst: " & Ada.Tags.Expanded_Name (This.Get_Parent'Tag));
               Debug.Trace ("----");
            end if;
         else
            Debug.Trace ("----");
         end if;
      end Print_Upstream;

      procedure Print_Downstream (This : in out Operator'Class) is
      begin
         Debug.Trace ("down: " & Ada.Tags.Expanded_Name (This'Tag));
         if This.Is_Subscribed then
            if This.Get_Observer.Actual.all in Operator'Class then
               Print_Downstream (Operator'Class (This.Get_Observer.Actual.all));
            else
               Debug.Trace ("down: " & Ada.Tags.Expanded_Name (This.Get_Observer.Actual.all'Tag));
               Debug.Trace ("----");
            end if;
         else
            Debug.Trace ("----");
         end if;
      end Print_Downstream;

   begin
      Debug.Trace ("SELF: " & Ada.Tags.Expanded_Name (This'Tag));
      Debug.Trace ("UPSTREAM:");
      Print_Upstream (This);
      Debug.Trace ("DOWNSTREAM:");
      Print_Downstream (This);
   end Debug_Dump;

   ------------------
   -- Get_Observer --
   ------------------

   not overriding function Get_Observer (This : in out Operator) return Into.Holders.Observers.Reference is
      --  This same function, as expression in the spec, bugs out with access checks (???) in 7.3
   begin
      if This.Is_Subscribed then
         return This.Downstream.Ref;
      else
         raise No_Longer_Subscribed;
      end if;
   end Get_Observer;

   ------------------
   -- Set_Observer --
   ------------------

   procedure Set_Observer (This : in out Operator; Consumer : Into.Observer'Class) is
   begin
      if This.Downstream.Is_Empty then
         This.Downstream.Hold (Consumer);
      else
         raise Constraint_Error with "Downstream Observer already set";
      end if;
   end Set_Observer;

   ---------------
   -- Subscribe --
   ---------------

   overriding procedure Subscribe (This : in out Operator; Consumer : in out Into.Observer'Class)
   is
   begin
      if This.Has_Parent then
         declare
            Parent : From.Observable := This.Get_Parent; -- Our own copy
         begin
            This.Set_Observer (Consumer);
            Parent.Subscribe (This);
         end;
      else
         raise Constraint_Error with "Attempting subscription without producer observable";
      end if;
   end Subscribe;

   ------------------
   -- On_Complete  --
   ------------------

   overriding procedure On_Complete  (This : in out Operator) is
   begin
      This.Get_Observer.On_Complete;
      This.Unsubscribe;
   end On_Complete ;

   --------------
   -- On_Error --
   --------------

   overriding procedure On_Error (This : in out Operator; Error : Errors.Occurrence) is
   begin
      This.Get_Observer.On_Error (Error);
      This.Unsubscribe;
   end On_Error;

   -------------
   -- On_Next --
   -------------

   overriding procedure On_Next (This : in out Operator; V : From.T) is
   begin
      raise Program_Error with "Must be overriden";
   end On_Next;

   -------------------
   -- Unsubscribe --
   -------------------

   overriding procedure Unsubscribe (This : in out Operator) is
   begin
      This.Downstream.Clear;
   end Unsubscribe;

   ------------------
   -- Concatenate --
   ------------------

   function Concatenate (Producer : From.Observable;
                         Consumer : Operator'Class)
                          return Into.Observable
   is
   begin
      return Actual : Operator'Class := Consumer do
         Actual.Set_Parent (Producer);
      end return;
   end Concatenate;

end Rx.Impl.Transformers;