View Javadoc
1   /*
2    * Copyright (C) 2007-2012 Argeo GmbH
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.argeo.slc.core.execution;
17  
18  import java.io.UnsupportedEncodingException;
19  import java.net.URI;
20  import java.net.URLDecoder;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.LinkedHashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.UUID;
28  
29  import org.argeo.slc.DefaultNameVersion;
30  import org.argeo.slc.NameVersion;
31  import org.argeo.slc.SlcException;
32  import org.argeo.slc.execution.ExecutionModuleDescriptor;
33  import org.argeo.slc.execution.ExecutionModulesManager;
34  import org.argeo.slc.execution.ExecutionProcess;
35  import org.argeo.slc.execution.SlcAgent;
36  
37  /** Implements the base methods of an SLC agent. */
38  public class DefaultAgent implements SlcAgent {
39  	// private final static Log log = LogFactory.getLog(DefaultAgent.class);
40  	/** UTF-8 charset for encoding. */
41  	private final static String UTF8 = "UTF-8";
42  
43  	private String agentUuid = null;
44  	private ExecutionModulesManager modulesManager;
45  
46  	private ThreadGroup processesThreadGroup;
47  	private Map<String, ProcessThread> runningProcesses = Collections
48  			.synchronizedMap(new HashMap<String, ProcessThread>());
49  
50  	private String defaultModulePrefix = null;
51  
52  	/*
53  	 * LIFECYCLE
54  	 */
55  	/** Initialization */
56  	public void init() {
57  		agentUuid = initAgentUuid();
58  		processesThreadGroup = new ThreadGroup("SLC Processes of Agent #"
59  				+ agentUuid);
60  	}
61  
62  	/** Clean up (needs to be called by overriding method) */
63  	public void destroy() {
64  	}
65  
66  	/**
67  	 * Called during initialization in order to determines the agent UUID. To be
68  	 * overridden. By default creates a new one per instance.
69  	 */
70  	protected String initAgentUuid() {
71  		return UUID.randomUUID().toString();
72  	}
73  
74  	/*
75  	 * SLC AGENT
76  	 */
77  	public void process(ExecutionProcess process) {
78  		ProcessThread processThread = createProcessThread(processesThreadGroup,
79  				modulesManager, process);
80  		processThread.start();
81  		runningProcesses.put(process.getUuid(), processThread);
82  
83  		// clean up old processes
84  		Iterator<ProcessThread> it = runningProcesses.values().iterator();
85  		while (it.hasNext()) {
86  			ProcessThread pThread = it.next();
87  			if (!pThread.isAlive())
88  				it.remove();
89  		}
90  	}
91  
92  	public String process(List<URI> uris) {
93  		DefaultProcess process = new DefaultProcess();
94  		for (URI uri : uris) {
95  			String[] path = uri.getPath().split("/");
96  			if (path.length < 3)
97  				throw new SlcException("Badly formatted URI: " + uri);
98  			NameVersion nameVersion = new DefaultNameVersion(path[1]);
99  			StringBuilder flow = new StringBuilder();
100 			for (int i = 2; i < path.length; i++)
101 				flow.append('/').append(path[i]);
102 
103 			Map<String, Object> values = getQueryMap(uri.getQuery());
104 			// Get execution module descriptor
105 			ExecutionModuleDescriptor emd = getExecutionModuleDescriptor(
106 					nameVersion.getName(), nameVersion.getVersion());
107 			process.getRealizedFlows().add(
108 					emd.asRealizedFlow(flow.toString(), values));
109 		}
110 		process(process);
111 		return process.getUuid();
112 	}
113 
114 	public void kill(String processUuid) {
115 		if (runningProcesses.containsKey(processUuid)) {
116 			runningProcesses.get(processUuid).interrupt();
117 		} else {
118 			// assume is finished
119 		}
120 	}
121 
122 	public void waitFor(String processUuid, Long millis) {
123 		if (runningProcesses.containsKey(processUuid)) {
124 			try {
125 				if (millis != null)
126 					runningProcesses.get(processUuid).join(millis);
127 				else
128 					runningProcesses.get(processUuid).join();
129 			} catch (InterruptedException e) {
130 				// silent
131 			}
132 		} else {
133 			// assume is finished
134 		}
135 	}
136 
137 	/** Creates the thread which will coordinate the execution for this agent. */
138 	protected ProcessThread createProcessThread(
139 			ThreadGroup processesThreadGroup,
140 			ExecutionModulesManager modulesManager, ExecutionProcess process) {
141 		ProcessThread processThread = new ProcessThread(processesThreadGroup,
142 				modulesManager, process);
143 		return processThread;
144 	}
145 
146 	public ExecutionModuleDescriptor getExecutionModuleDescriptor(
147 			String moduleName, String moduleVersion) {
148 		// Get execution module descriptor
149 		ExecutionModuleDescriptor emd;
150 		try {
151 			modulesManager
152 					.start(new DefaultNameVersion(moduleName, moduleVersion));
153 			emd = modulesManager.getExecutionModuleDescriptor(moduleName,
154 					moduleVersion);
155 		} catch (SlcException e) {
156 			if (defaultModulePrefix != null) {
157 				moduleName = defaultModulePrefix + "." + moduleName;
158 				modulesManager.start(new DefaultNameVersion(moduleName,
159 						moduleVersion));
160 				emd = modulesManager.getExecutionModuleDescriptor(moduleName,
161 						moduleVersion);
162 			} else
163 				throw e;
164 		}
165 		return emd;
166 	}
167 
168 	public List<ExecutionModuleDescriptor> listExecutionModuleDescriptors() {
169 		return modulesManager.listExecutionModules();
170 	}
171 
172 	public boolean ping() {
173 		return true;
174 	}
175 
176 	/*
177 	 * UTILITIES
178 	 */
179 	/**
180 	 * @param query
181 	 *            can be null
182 	 */
183 	static Map<String, Object> getQueryMap(String query) {
184 		Map<String, Object> map = new LinkedHashMap<String, Object>();
185 		if (query == null)
186 			return map;
187 		String[] params = query.split("&");
188 		for (String param : params) {
189 			String[] arr = param.split("=");
190 			String name = arr[0];
191 			Object value = arr.length > 1 ? param.split("=")[1] : Boolean.TRUE;
192 			try {
193 				map.put(URLDecoder.decode(name, UTF8),
194 						URLDecoder.decode(value.toString(), UTF8));
195 			} catch (UnsupportedEncodingException e) {
196 				throw new SlcException("Cannot decode '" + param + "'", e);
197 			}
198 		}
199 		return map;
200 	}
201 
202 	/*
203 	 * BEAN
204 	 */
205 	public void setModulesManager(ExecutionModulesManager modulesManager) {
206 		this.modulesManager = modulesManager;
207 	}
208 
209 	public void setDefaultModulePrefix(String defaultModulePrefix) {
210 		this.defaultModulePrefix = defaultModulePrefix;
211 	}
212 
213 	public String getAgentUuid() {
214 		return agentUuid;
215 	}
216 
217 	@Override
218 	public String toString() {
219 		return "Agent #" + getAgentUuid();
220 	}
221 }