1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.argeo.slc.core.execution;
17
18 import java.security.AccessControlContext;
19 import java.security.AccessController;
20 import java.security.PrivilegedActionException;
21 import java.security.PrivilegedExceptionAction;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Set;
27
28 import javax.security.auth.Subject;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.argeo.slc.SlcException;
33 import org.argeo.slc.execution.ExecutionModulesManager;
34 import org.argeo.slc.execution.ExecutionProcess;
35 import org.argeo.slc.execution.ExecutionStep;
36 import org.argeo.slc.execution.RealizedFlow;
37
38
39
40
41
42 public class ProcessThread extends Thread {
43 private final static Log log = LogFactory.getLog(ProcessThread.class);
44
45 private final ExecutionModulesManager executionModulesManager;
46 private final ExecutionProcess process;
47 private final ProcessThreadGroup processThreadGroup;
48
49 private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
50
51
52 private Boolean killed = false;
53
54 private final AccessControlContext accessControlContext;
55
56 public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
57 ExecutionProcess process) {
58 super(processesThreadGroup, "SLC Process #" + process.getUuid());
59 this.executionModulesManager = executionModulesManager;
60 this.process = process;
61 processThreadGroup = new ProcessThreadGroup(process);
62 accessControlContext = AccessController.getContext();
63 }
64
65 public final void run() {
66
67
68
69
70
71
72
73 log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
74
75
76 new LoggingThread().start();
77
78 process.setStatus(ExecutionProcess.RUNNING);
79 try {
80 Subject subject = Subject.getSubject(accessControlContext);
81 try {
82 Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
83
84 @Override
85 public Void run() throws Exception {
86 process();
87 return null;
88 }
89
90 });
91 } catch (PrivilegedActionException privilegedActionException) {
92 Throwable cause = privilegedActionException.getCause();
93 if (cause instanceof InterruptedException)
94 throw (InterruptedException) cause;
95 else
96 throw new SlcException("Cannot process", cause);
97 }
98
99 } catch (InterruptedException e) {
100 die();
101 return;
102 } catch (Exception e) {
103 String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
104 log.error(msg, e);
105 getProcessThreadGroup()
106 .dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
107 }
108
109
110
111 for (ExecutionThread executionThread : executionThreads) {
112 if (executionThread.isAlive()) {
113 try {
114 executionThread.join();
115 } catch (InterruptedException e) {
116 die();
117 return;
118 }
119 }
120 }
121
122 computeFinalStatus();
123 }
124
125
126 private void computeFinalStatus() {
127
128
129 if (killed)
130 process.setStatus(ExecutionProcess.KILLED);
131 else if (processThreadGroup.hadAnError())
132 process.setStatus(ExecutionProcess.ERROR);
133 else
134 process.setStatus(ExecutionProcess.COMPLETED);
135
136
137 log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
138 }
139
140
141 private synchronized void die() {
142 killed = true;
143 computeFinalStatus();
144 for (ExecutionThread executionThread : executionThreads) {
145 try {
146 executionThread.interrupt();
147 } catch (Exception e) {
148 log.error("Cannot interrupt " + executionThread);
149 }
150 }
151 processThreadGroup.interrupt();
152 }
153
154
155
156
157
158 protected void process() throws InterruptedException {
159 List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
160 flowsToProcess.addAll(process.getRealizedFlows());
161 while (flowsToProcess.size() > 0) {
162 RealizedFlow realizedFlow = flowsToProcess.remove(0);
163 execute(realizedFlow, true);
164 }
165 }
166
167
168 protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
169 if (killed)
170 return;
171
172 ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
173 executionThreads.add(thread);
174 thread.start();
175
176 if (synchronous)
177 thread.join();
178
179 return;
180 }
181
182
183
184
185
186
187
188
189
190 public ExecutionProcess getProcess() {
191 return process;
192 }
193
194 public ProcessThreadGroup getProcessThreadGroup() {
195 return processThreadGroup;
196 }
197
198 public ExecutionModulesManager getExecutionModulesManager() {
199 return executionModulesManager;
200 }
201
202 private class LoggingThread extends Thread {
203
204 public LoggingThread() {
205 super("SLC Process Logger #" + process.getUuid());
206 }
207
208 public void run() {
209 boolean run = true;
210 while (run) {
211 List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
212 processThreadGroup.getSteps().drainTo(newSteps);
213 if (newSteps.size() > 0) {
214
215 process.addSteps(newSteps);
216 }
217
218 try {
219 Thread.sleep(1000);
220 } catch (InterruptedException e) {
221 break;
222 }
223
224 if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
225 run = false;
226 }
227 }
228
229 }
230 }