spat_1.3.0_4ad4ab14/src/util/spat-spark_files.adb

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
------------------------------------------------------------------------------
--  Copyright (C) 2020 by Heisenbug Ltd. (gh+spat@heisenbug.eu)
--
--  This work is free. You can redistribute it and/or modify it under the
--  terms of the Do What The Fuck You Want To Public License, Version 2,
--  as published by Sam Hocevar. See the LICENSE file for more details.
------------------------------------------------------------------------------
pragma License (Unrestricted);

with Ada.Containers.Synchronized_Queue_Interfaces;
with Ada.Containers.Unbounded_Synchronized_Queues;
with Ada.Exceptions;
with Ada.IO_Exceptions;
with Ada.Text_IO;

with SPAT.Log;
with SPAT.Strings;

with System.Multiprocessors;

package body SPAT.Spark_Files is

   ---------------------------------------------------------------------------
   --  Parallelization support.
   --
   --  Reading and parsing the JSON formatted .spark files can easily be
   --  parallelized, there is no dependency between them.
   ---------------------------------------------------------------------------
   package Worker_Tasks is

      --  Just use the number of CPUs on the system.
      Num_Workers : constant System.Multiprocessors.CPU :=
        System.Multiprocessors.Number_Of_CPUs;

      --  The tasks get an input file name and return the read JSON result.
      --  To be able to map the file name and the corresponding result, we
      --  merge them into a single result record type.
      type Parse_Result is
         record
            Key    : SPARK_File_Name;
            Result : GNATCOLL.JSON.Read_Result;
         end record;

      package Implementation is

         --  Needed queue (interface) instantiations for input and output types.

         package File_Queue_Interface is new
           Ada.Containers.Synchronized_Queue_Interfaces
             (Element_Type => SPARK_File_Name);

         package File_Queues is new
           Ada.Containers.Unbounded_Synchronized_Queues
             (Queue_Interfaces => File_Queue_Interface);

         package Result_Queue_Interface is new
           Ada.Containers.Synchronized_Queue_Interfaces
             (Element_Type => Parse_Result);

         package Result_Queues is new
           Ada.Containers.Unbounded_Synchronized_Queues
             (Queue_Interfaces => Result_Queue_Interface);

      end Implementation;

      --  Establish the input queue type.  This one expects a file name to
      --  parse and any instantiated worker tasks will pick it up from there.
      Input_File_Queue : Implementation.File_Queues.Queue;

      --  Establish the output queue type. Here you can pick up the parsing
      --  results.
      Result_Queue     : Implementation.Result_Queues.Queue;

      ------------------------------------------------------------------------
      --  Stop
      --
      --  Tell all worker tasks to terminate.
      ------------------------------------------------------------------------
      procedure Stop;

   end Worker_Tasks;

   package body Worker_Tasks is

      task type Parse_Task;

      type Task_List is
        array (System.Multiprocessors.CPU range <>) of Parse_Task;

      Workers : Task_List (1 .. Num_Workers);

      ------------------------------------------------------------------------
      --  Parse_File
      ------------------------------------------------------------------------
      function Parse_File
        (Name : in SPARK_File_Name) return GNATCOLL.JSON.Read_Result;

      ------------------------------------------------------------------------
      --  Parse_File
      ------------------------------------------------------------------------
      function Parse_File (Name : in SPARK_File_Name)
                           return GNATCOLL.JSON.Read_Result
      is
         JSON_File    : Ada.Text_IO.File_Type;
         File_Content : JSON_Data;
      begin
         Ada.Text_IO.Open (File => JSON_File,
                           Mode => Ada.Text_IO.In_File,
                           Name => To_String (Source => Name));

         while not Ada.Text_IO.End_Of_File (File => JSON_File) loop
            Ada.Strings.Unbounded.Append
              (Source   => File_Content,
               New_Item => Ada.Text_IO.Get_Line (File => JSON_File));
         end loop;

         Ada.Text_IO.Close (File => JSON_File);

         return GNATCOLL.JSON.Read (Strm => File_Content);
      end Parse_File;

      task body Parse_Task is
         Element : SPARK_File_Name;
         Result  : Worker_Tasks.Parse_Result;
      begin
         Main_Loop :
         loop
            Input_File_Queue.Dequeue (Element => Element);

            Result.Key := Element;

            begin
               Result.Result := Parse_File (Name => Element);
            exception
               when E : Ada.IO_Exceptions.Name_Error =>
                  Result.Result :=
                    GNATCOLL.JSON.Read_Result'
                      (Success => False,
                       Error   =>
                          GNATCOLL.JSON.Parsing_Error'
                            (Line    => 1,
                             Column  => 1,
                             Message =>
                                To_Name (Ada.Exceptions.Exception_Message (E))));
            end;

            Result_Queue.Enqueue (New_Item => Result);
         end loop Main_Loop;
      end Parse_Task;

      ------------------------------------------------------------------------
      --  Stop
      ------------------------------------------------------------------------
      procedure Stop is
      begin
         for Worker of Workers loop
            abort Worker;
         end loop;
      end Stop;

   end Worker_Tasks;

   ---------------------------------------------------------------------------
   --  Num_Workers
   --
   --  Report the number of tasks used for parallel file reads.
   ---------------------------------------------------------------------------
   function Num_Workers return Positive is
     (Positive (Worker_Tasks.Num_Workers));

   Pending : constant GNATCOLL.JSON.Read_Result :=
     GNATCOLL.JSON.Read_Result'
       (Success => False,
        Error   =>
          GNATCOLL.JSON.Parsing_Error'(Line    => Positive'Last,
                                       Column  => Positive'Last,
                                       Message => To_Name ("Queued...")));

   ---------------------------------------------------------------------------
   --  Read
   ---------------------------------------------------------------------------
   not overriding
   procedure Read (This  : in out T;
                   Names : in     Strings.SPARK_File_Names)
   is
      use type File_Maps.Cursor;
   begin
      --  Clear any old remnants and reserve capacity for the number of
      --  files we will be trying to add, so we avoid rehashing during
      --  the loop.
      This.Clear;
      This.Reserve_Capacity (Capacity => Names.Length);

      Run_Parsers :
      declare
         Num_Expected : Natural := 0; --  How many responses to expect.
      begin
         --  Queue input files to the worker tasks.
         for Name of Names loop
            if This.Find (Key => Name) = File_Maps.No_Element then
               This.Insert (Key      => Name,
                            New_Item => Pending);
               --  Establish key/value pair. The actual value will be
               --  established later, but we need the key in the table,
               --  otherwise Find above will never find anything.

               Log.Debug
                 (Message =>
                    "Queuing """ & To_String (Source => Name) & """...");

               Num_Expected := Num_Expected + 1;
               Worker_Tasks.Input_File_Queue.Enqueue (New_Item => Name);
            else
               --  Skip file, we already got that one.
               --
               --  TODO: Normally we shouldn't be too concerned about
               --        duplicates. This check is a leftover from the times
               --        when we still got these values from the command-line
               --        which could have specified the same file more than
               --        once.
               Log.Warning
                 (Message =>
                    "Duplicate file """ & To_String (Source => Name) &
                    """ ignored.");
            end if;
         end loop;

         --  Collect results.
         Collect_Results :
         declare
            Result : Worker_Tasks.Parse_Result;
         begin
            Log.Debug (Message => "Picking up results...");

            for i in 1 .. Num_Expected loop
               Wait_Loop :
               loop
                  select
                     Worker_Tasks.Result_Queue.Dequeue (Element => Result);
                     This.Replace (Key      => Result.Key,
                                   New_Item => Result.Result);
                     Log.Debug
                       (Message =>
                           """" & To_String (Result.Key) & """..." &
                        (if Result.Result.Success
                           then "ok."
                           else "error."));
                     --  Actual error message will be done by caller.
                     exit Wait_Loop;
                  or
                     delay 10.0;

                     --  Inform the user if this is taking way too long...
                     Log.Warning (Message => "Ten seconds have passed.");
                     Log.Warning (Message => "Still waiting for results.");
                     Log.Warning (Message => "This is taking awfully long!?");
                  end select;
               end loop Wait_Loop;
            end loop;
         end Collect_Results;
      end Run_Parsers;
   end Read;

   ---------------------------------------------------------------------------
   --  Shutdown
   ---------------------------------------------------------------------------
   procedure Shutdown is
   begin
      Worker_Tasks.Stop;
   end Shutdown;

end SPAT.Spark_Files;