View Javadoc
1   /*
2    * Copyright (C) 2007-2012 Argeo GmbH
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.argeo.slc.core.execution;
17  
18  import java.security.AccessControlContext;
19  import java.security.AccessController;
20  import java.security.PrivilegedActionException;
21  import java.security.PrivilegedExceptionAction;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import javax.security.auth.Subject;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.argeo.slc.SlcException;
30  import org.argeo.slc.execution.ExecutionFlowDescriptor;
31  import org.argeo.slc.execution.ExecutionModulesManager;
32  import org.argeo.slc.execution.ExecutionStep;
33  import org.argeo.slc.execution.RealizedFlow;
34  
35  /** Thread of a single execution */
36  public class ExecutionThread extends Thread {
37  	public final static String SYSPROP_EXECUTION_AUTO_UPGRADE = "slc.execution.autoupgrade";
38  	private final static Log log = LogFactory.getLog(ExecutionThread.class);
39  
40  	private ExecutionModulesManager executionModulesManager;
41  	private final RealizedFlow realizedFlow;
42  	private final AccessControlContext accessControlContext;
43  
44  	private List<Runnable> destructionCallbacks = new ArrayList<Runnable>();
45  
46  	public ExecutionThread(ProcessThreadGroup processThreadGroup, ExecutionModulesManager executionModulesManager,
47  			RealizedFlow realizedFlow) {
48  		super(processThreadGroup, "Flow " + realizedFlow.getFlowDescriptor().getName());
49  		this.realizedFlow = realizedFlow;
50  		this.executionModulesManager = executionModulesManager;
51  		accessControlContext = AccessController.getContext();
52  	}
53  
54  	public void run() {
55  		// authenticate thread
56  		// Authentication authentication = getProcessThreadGroup()
57  		// .getAuthentication();
58  		// if (authentication == null)
59  		// throw new SlcException("Can only execute authenticated threads");
60  		// SecurityContextHolder.getContext().setAuthentication(authentication);
61  
62  		// Retrieve execution flow descriptor
63  		ExecutionFlowDescriptor executionFlowDescriptor = realizedFlow.getFlowDescriptor();
64  		String flowName = executionFlowDescriptor.getName();
65  
66  		getProcessThreadGroup().dispatchAddStep(
67  				new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_START, "Flow " + flowName));
68  
69  		try {
70  			Subject subject = Subject.getSubject(accessControlContext);
71  			try {
72  				Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
73  
74  					@Override
75  					public Void run() throws Exception {
76  						String autoUpgrade = System.getProperty(SYSPROP_EXECUTION_AUTO_UPGRADE);
77  						if (autoUpgrade != null && autoUpgrade.equals("true"))
78  							executionModulesManager.upgrade(realizedFlow.getModuleNameVersion());
79  						executionModulesManager.start(realizedFlow.getModuleNameVersion());
80  						//
81  						// START FLOW
82  						//
83  						executionModulesManager.execute(realizedFlow);
84  						// END FLOW
85  						return null;
86  					}
87  
88  				});
89  			} catch (PrivilegedActionException privilegedActionException) {
90  				throw (Exception) privilegedActionException.getCause();
91  			}
92  		} catch (FlowConfigurationException e) {
93  			String msg = "Configuration problem with flow " + flowName + ":\n" + e.getMessage();
94  			log.error(msg);
95  			getProcessThreadGroup().dispatchAddStep(
96  					new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
97  		} catch (Exception e) {
98  			// TODO: re-throw exception ?
99  			String msg = "Execution of flow " + flowName + " failed.";
100 			log.error(msg, e);
101 			getProcessThreadGroup().dispatchAddStep(
102 					new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.ERROR, msg + " " + e.getMessage()));
103 		} finally {
104 			getProcessThreadGroup().dispatchAddStep(
105 					new ExecutionStep(realizedFlow.getModuleName(), ExecutionStep.PHASE_END, "Flow " + flowName));
106 			processDestructionCallbacks();
107 		}
108 	}
109 
110 	private synchronized void processDestructionCallbacks() {
111 		for (int i = destructionCallbacks.size() - 1; i >= 0; i--) {
112 			try {
113 				destructionCallbacks.get(i).run();
114 			} catch (Exception e) {
115 				log.warn("Could not process destruction callback " + i + " in thread " + getName(), e);
116 			}
117 		}
118 	}
119 
120 	/**
121 	 * Gather object destruction callback to be called in reverse order at the
122 	 * end of the thread
123 	 */
124 	synchronized void registerDestructionCallback(String name, Runnable callback) {
125 		destructionCallbacks.add(callback);
126 	}
127 
128 	protected ProcessThreadGroup getProcessThreadGroup() {
129 		return (ProcessThreadGroup) getThreadGroup();
130 	}
131 }