1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 | with Ada.Tags;
with Rx.Debug;
package body Rx.Impl.Transformers is
----------------
-- Debug_Dump --
----------------
procedure Debug_Dump (This : in out Operator'Class) is
procedure Print_Upstream (This : Operator'Class) is
begin
Debug.Trace ("upst: " & Ada.Tags.Expanded_Name (This'Tag));
if This.Has_Parent then
if This.Get_Parent in Operator'Class then
Print_Upstream (Operator'Class (This.Get_Parent));
else
Debug.Trace ("upst: " & Ada.Tags.Expanded_Name (This.Get_Parent'Tag));
Debug.Trace ("----");
end if;
else
Debug.Trace ("----");
end if;
end Print_Upstream;
procedure Print_Downstream (This : in out Operator'Class) is
begin
Debug.Trace ("down: " & Ada.Tags.Expanded_Name (This'Tag));
if This.Is_Subscribed then
if This.Get_Observer.Actual.all in Operator'Class then
Print_Downstream (Operator'Class (This.Get_Observer.Actual.all));
else
Debug.Trace ("down: " & Ada.Tags.Expanded_Name (This.Get_Observer.Actual.all'Tag));
Debug.Trace ("----");
end if;
else
Debug.Trace ("----");
end if;
end Print_Downstream;
begin
Debug.Trace ("SELF: " & Ada.Tags.Expanded_Name (This'Tag));
Debug.Trace ("UPSTREAM:");
Print_Upstream (This);
Debug.Trace ("DOWNSTREAM:");
Print_Downstream (This);
end Debug_Dump;
------------------
-- Get_Observer --
------------------
not overriding function Get_Observer (This : in out Operator) return Into.Holders.Observers.Reference is
-- This same function, as expression in the spec, bugs out with access checks (???) in 7.3
begin
if This.Is_Subscribed then
return This.Downstream.Ref;
else
raise No_Longer_Subscribed;
end if;
end Get_Observer;
------------------
-- Set_Observer --
------------------
procedure Set_Observer (This : in out Operator; Consumer : Into.Observer'Class) is
begin
if This.Downstream.Is_Empty then
This.Downstream.Hold (Consumer);
else
raise Constraint_Error with "Downstream Observer already set";
end if;
end Set_Observer;
---------------
-- Subscribe --
---------------
overriding procedure Subscribe (This : in out Operator; Consumer : in out Into.Observer'Class)
is
begin
if This.Has_Parent then
declare
Parent : From.Observable := This.Get_Parent; -- Our own copy
begin
This.Set_Observer (Consumer);
Parent.Subscribe (This);
end;
else
raise Constraint_Error with "Attempting subscription without producer observable";
end if;
end Subscribe;
------------------
-- On_Complete --
------------------
overriding procedure On_Complete (This : in out Operator) is
begin
This.Get_Observer.On_Complete;
This.Unsubscribe;
end On_Complete ;
--------------
-- On_Error --
--------------
overriding procedure On_Error (This : in out Operator; Error : Errors.Occurrence) is
begin
This.Get_Observer.On_Error (Error);
This.Unsubscribe;
end On_Error;
-------------
-- On_Next --
-------------
overriding procedure On_Next (This : in out Operator; V : From.T) is
begin
raise Program_Error with "Must be overriden";
end On_Next;
-------------------
-- Unsubscribe --
-------------------
overriding procedure Unsubscribe (This : in out Operator) is
begin
This.Downstream.Clear;
end Unsubscribe;
------------------
-- Concatenate --
------------------
function Concatenate (Producer : From.Observable;
Consumer : Operator'Class)
return Into.Observable
is
begin
return Actual : Operator'Class := Consumer do
Actual.Set_Parent (Producer);
end return;
end Concatenate;
end Rx.Impl.Transformers;
|