View Javadoc
1   /*
2    * Copyright (C) 2007-2012 Argeo GmbH
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.argeo.slc.core.execution;
17  
18  import java.security.AccessControlContext;
19  import java.security.AccessController;
20  import java.security.PrivilegedActionException;
21  import java.security.PrivilegedExceptionAction;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Set;
27  
28  import javax.security.auth.Subject;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.argeo.slc.SlcException;
33  import org.argeo.slc.execution.ExecutionModulesManager;
34  import org.argeo.slc.execution.ExecutionProcess;
35  import org.argeo.slc.execution.ExecutionStep;
36  import org.argeo.slc.execution.RealizedFlow;
37  
38  /**
39   * Main thread coordinating an {@link ExecutionProcess}, launching parallel or
40   * sequential {@link ExecutionThread}s.
41   */
42  public class ProcessThread extends Thread {
43  	private final static Log log = LogFactory.getLog(ProcessThread.class);
44  
45  	private final ExecutionModulesManager executionModulesManager;
46  	private final ExecutionProcess process;
47  	private final ProcessThreadGroup processThreadGroup;
48  
49  	private Set<ExecutionThread> executionThreads = Collections.synchronizedSet(new HashSet<ExecutionThread>());
50  
51  	// private Boolean hadAnError = false;
52  	private Boolean killed = false;
53  
54  	private final AccessControlContext accessControlContext;
55  
56  	public ProcessThread(ThreadGroup processesThreadGroup, ExecutionModulesManager executionModulesManager,
57  			ExecutionProcess process) {
58  		super(processesThreadGroup, "SLC Process #" + process.getUuid());
59  		this.executionModulesManager = executionModulesManager;
60  		this.process = process;
61  		processThreadGroup = new ProcessThreadGroup(process);
62  		accessControlContext = AccessController.getContext();
63  	}
64  
65  	public final void run() {
66  		// authenticate thread
67  		// Authentication authentication = getProcessThreadGroup()
68  		// .getAuthentication();
69  		// if (authentication == null)
70  		// throw new SlcException("Can only execute authenticated threads");
71  		// SecurityContextHolder.getContext().setAuthentication(authentication);
72  
73  		log.info("\n##\n## SLC Process #" + process.getUuid() + " STARTED\n##\n");
74  
75  		// Start logging
76  		new LoggingThread().start();
77  
78  		process.setStatus(ExecutionProcess.RUNNING);
79  		try {
80  			Subject subject = Subject.getSubject(accessControlContext);
81  			try {
82  				Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
83  
84  					@Override
85  					public Void run() throws Exception {
86  						process();
87  						return null;
88  					}
89  
90  				});
91  			} catch (PrivilegedActionException privilegedActionException) {
92  				Throwable cause = privilegedActionException.getCause();
93  				if (cause instanceof InterruptedException)
94  					throw (InterruptedException) cause;
95  				else
96  					throw new SlcException("Cannot process", cause);
97  			}
98  			// process();
99  		} catch (InterruptedException e) {
100 			die();
101 			return;
102 		} catch (Exception e) {
103 			String msg = "Process " + getProcess().getUuid() + " failed unexpectedly.";
104 			log.error(msg, e);
105 			getProcessThreadGroup()
106 					.dispatchAddStep(new ExecutionStep("Process", ExecutionStep.ERROR, msg + " " + e.getMessage()));
107 		}
108 
109 		// waits for all execution threads to complete (in case they were
110 		// started asynchronously)
111 		for (ExecutionThread executionThread : executionThreads) {
112 			if (executionThread.isAlive()) {
113 				try {
114 					executionThread.join();
115 				} catch (InterruptedException e) {
116 					die();
117 					return;
118 				}
119 			}
120 		}
121 
122 		computeFinalStatus();
123 	}
124 
125 	/** Make sure this is called BEFORE all the threads are interrupted. */
126 	private void computeFinalStatus() {
127 		// String oldStatus = process.getStatus();
128 		// TODO: error management at flow level?
129 		if (killed)
130 			process.setStatus(ExecutionProcess.KILLED);
131 		else if (processThreadGroup.hadAnError())
132 			process.setStatus(ExecutionProcess.ERROR);
133 		else
134 			process.setStatus(ExecutionProcess.COMPLETED);
135 		// executionModulesManager.dispatchUpdateStatus(process, oldStatus,
136 		// process.getStatus());
137 		log.info("\n## SLC Process #" + process.getUuid() + " " + process.getStatus() + "\n");
138 	}
139 
140 	/** Called when being killed */
141 	private synchronized void die() {
142 		killed = true;
143 		computeFinalStatus();
144 		for (ExecutionThread executionThread : executionThreads) {
145 			try {
146 				executionThread.interrupt();
147 			} catch (Exception e) {
148 				log.error("Cannot interrupt " + executionThread);
149 			}
150 		}
151 		processThreadGroup.interrupt();
152 	}
153 
154 	/**
155 	 * Implementation specific execution. To be overridden in order to deal with
156 	 * custom process types. Default expects an {@link SlcExecution}.
157 	 */
158 	protected void process() throws InterruptedException {
159 		List<RealizedFlow> flowsToProcess = new ArrayList<RealizedFlow>();
160 		flowsToProcess.addAll(process.getRealizedFlows());
161 		while (flowsToProcess.size() > 0) {
162 			RealizedFlow realizedFlow = flowsToProcess.remove(0);
163 			execute(realizedFlow, true);
164 		}
165 	}
166 
167 	/** @return the (distinct) thread used for this execution */
168 	protected final void execute(RealizedFlow realizedFlow, Boolean synchronous) throws InterruptedException {
169 		if (killed)
170 			return;
171 
172 		ExecutionThread thread = new ExecutionThread(processThreadGroup, executionModulesManager, realizedFlow);
173 		executionThreads.add(thread);
174 		thread.start();
175 
176 		if (synchronous)
177 			thread.join();
178 
179 		return;
180 	}
181 
182 	// public void notifyError() {
183 	// hadAnError = true;
184 	// }
185 	//
186 	// public synchronized void flowCompleted() {
187 	// // notifyAll();
188 	// }
189 
190 	public ExecutionProcess getProcess() {
191 		return process;
192 	}
193 
194 	public ProcessThreadGroup getProcessThreadGroup() {
195 		return processThreadGroup;
196 	}
197 
198 	public ExecutionModulesManager getExecutionModulesManager() {
199 		return executionModulesManager;
200 	}
201 
202 	private class LoggingThread extends Thread {
203 
204 		public LoggingThread() {
205 			super("SLC Process Logger #" + process.getUuid());
206 		}
207 
208 		public void run() {
209 			boolean run = true;
210 			while (run) {
211 				List<ExecutionStep> newSteps = new ArrayList<ExecutionStep>();
212 				processThreadGroup.getSteps().drainTo(newSteps);
213 				if (newSteps.size() > 0) {
214 					// System.out.println(steps.size() + " steps");
215 					process.addSteps(newSteps);
216 				}
217 
218 				try {
219 					Thread.sleep(1000);
220 				} catch (InterruptedException e) {
221 					break;
222 				}
223 
224 				if (!ProcessThread.this.isAlive() && processThreadGroup.getSteps().size() == 0)
225 					run = false;
226 			}
227 		}
228 
229 	}
230 }