orka_b455160b/orka/src/orka/implementation/orka-jobs-executors.adb

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
--  SPDX-License-Identifier: Apache-2.0
--
--  Copyright (c) 2017 onox <denkpadje@gmail.com>
--
--  Licensed under the Apache License, Version 2.0 (the "License");
--  you may not use this file except in compliance with the License.
--  You may obtain a copy of the License at
--
--      http://www.apache.org/licenses/LICENSE-2.0
--
--  Unless required by applicable law or agreed to in writing, software
--  distributed under the License is distributed on an "AS IS" BASIS,
--  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--  See the License for the specific language governing permissions and
--  limitations under the License.

with Ada.Exceptions;
with Ada.Tags;

with Orka.Containers.Bounded_Vectors;
with Orka.Futures;
with Orka.Loggers;
with Orka.Logging.Default;

package body Orka.Jobs.Executors is

   use all type Orka.Logging.Default_Module;
   use all type Orka.Logging.Severity;

   procedure Log is new Orka.Logging.Default.Generic_Log (Engine);

   function Get_Root_Dependent (Element : Job_Ptr) return Job_Ptr is
      Result : Job_Ptr := Element;
   begin
      while Result.Dependent /= Null_Job loop
         Result := Result.Dependent;
      end loop;
      return Result;
   end Get_Root_Dependent;

   procedure Execute_Jobs
     (Name  : String;
      Kind  : Queues.Executor_Kind;
      Queue : Queues.Queue_Ptr)
   is
      use type Futures.Status;
      use Ada.Exceptions;

      Pair : Queues.Pair;
      Stop : Boolean := False;

      Null_Pair : constant Queues.Pair := (others => <>);

      package Vectors is new Orka.Containers.Bounded_Vectors (Positive, Job_Ptr);

      type Executor_Context (Jobs : not null access Vectors.Vector)
        is new Execution_Context with null record;

      overriding
      procedure Enqueue (Object : Executor_Context; Element : Job_Ptr) is
      begin
         Object.Jobs.Append (Element);
      end Enqueue;
   begin
      loop
         Queue.Dequeue (Kind) (Pair, Stop);
         exit when Stop;

         declare
            Job     : Job_Ptr renames Pair.Job;

            Future  : Futures.Pointers.Reference renames Pair.Future.Get;
            Promise : Futures.Promise'Class renames Futures.Promise'Class (Future.Value.all);

            Jobs    : aliased Vectors.Vector (Capacity => Maximum_Enqueued_By_Job);
            Context : Executor_Context (Jobs'Access);

            procedure Set_Root_Dependent (Last_Job : Job_Ptr) is
               Root_Dependents : Vectors.Vector (Capacity => Jobs.Length);

               procedure Set_Dependencies (Elements : Vectors.Element_Array) is
               begin
                  Last_Job.Set_Dependencies (Dependency_Array (Elements));
               end Set_Dependencies;
            begin
               for Job of Jobs loop
                  declare
                     Root : constant Job_Ptr := Get_Root_Dependent (Job);
                  begin
                     if (for all Dependent of Root_Dependents => Root /= Dependent) then
                        Root_Dependents.Append (Root);
                     end if;
                  end;
               end loop;

               Root_Dependents.Query (Set_Dependencies'Access);
            end Set_Root_Dependent;

            Tag : String renames Ada.Tags.Expanded_Name (Job'Tag);
         begin
            if Future.Current_Status = Futures.Waiting then
               Promise.Set_Status (Futures.Running);
            end if;

            if Future.Current_Status = Futures.Running then
               begin
                  Job.Execute (Context);
               exception
                  when Error : others =>
                     Promise.Set_Failed (Error);

                     Log (Loggers.Error,
                       Kind'Image & " job " & Tag & " " & Exception_Information (Error));
               end;
            else
               Log (Warning,
                 Kind'Image & " job " & Tag & " already " & Future.Current_Status'Image);
            end if;

            if Job.Dependent /= Null_Job then
               --  Make the root dependents of the jobs in Jobs
               --  dependencies of Job.Dependent
               if not Jobs.Is_Empty then
                  Set_Root_Dependent (Job.Dependent);
               end if;

               --  If another job depends on this job, decrement its dependencies counter
               --  and if it has reached zero then it can be scheduled
               if Job.Dependent.Decrement_Dependencies then
                  pragma Assert (Jobs.Is_Empty);
                  Queue.Enqueue (Job.Dependent, Pair.Future);
               end if;
            elsif Jobs.Is_Empty then
               Promise.Set_Status (Futures.Done);
            else
               --  If the job has enqueued new jobs, we need to create an
               --  empty job which has the root dependents of these new jobs
               --  as dependencies. This is so that the empty job will be the
               --  last job that is given Pair.Future
               Set_Root_Dependent (Create_Empty_Job);
            end if;

            if not Jobs.Is_Empty then
               for Job of Jobs loop
                  Queue.Enqueue (Job, Pair.Future);
               end loop;
            end if;

            Free (Job);
         end;

         --  Finalize the smart pointer (Pair.Future) to reduce the number
         --  of references to the Future object
         Pair := Null_Pair;
      end loop;
   exception
      when Error : others =>
         Log (Loggers.Error, Exception_Information (Error));
   end Execute_Jobs;

end Orka.Jobs.Executors;