View Javadoc
1   package org.argeo.cms.internal.kernel;
2   
3   import java.util.GregorianCalendar;
4   import java.util.concurrent.LinkedBlockingDeque;
5   import java.util.concurrent.atomic.AtomicBoolean;
6   
7   import javax.jcr.Node;
8   import javax.jcr.Property;
9   import javax.jcr.PropertyType;
10  import javax.jcr.RepositoryException;
11  import javax.jcr.Session;
12  import javax.jcr.Value;
13  import javax.jcr.nodetype.NodeType;
14  import javax.jcr.observation.Event;
15  import javax.jcr.observation.EventIterator;
16  import javax.jcr.observation.EventListener;
17  import javax.jcr.version.VersionManager;
18  
19  import org.apache.commons.logging.Log;
20  import org.apache.commons.logging.LogFactory;
21  import org.apache.jackrabbit.api.JackrabbitValue;
22  import org.apache.jackrabbit.core.RepositoryImpl;
23  import org.argeo.jcr.JcrUtils;
24  
25  /** Ensure consistency of files, folder and last modified nodes. */
26  class CmsWorkspaceIndexer implements EventListener {
27  	private final static Log log = LogFactory.getLog(CmsWorkspaceIndexer.class);
28  
29  //	private final static String MIX_ETAG = "mix:etag";
30  	private final static String JCR_ETAG = "jcr:etag";
31  //	private final static String JCR_LAST_MODIFIED = "jcr:lastModified";
32  //	private final static String JCR_LAST_MODIFIED_BY = "jcr:lastModifiedBy";
33  //	private final static String JCR_MIXIN_TYPES = "jcr:mixinTypes";
34  	private final static String JCR_DATA = "jcr:data";
35  	private final static String JCR_CONTENT = "jcr:data";
36  
37  	private String cn;
38  	private String workspaceName;
39  	private RepositoryImpl repositoryImpl;
40  	private Session session;
41  	private VersionManager versionManager;
42  
43  	private LinkedBlockingDeque<Event> toProcess = new LinkedBlockingDeque<>();
44  	private IndexingThread indexingThread;
45  	private AtomicBoolean stopping = new AtomicBoolean(false);
46  
47  	public CmsWorkspaceIndexer(RepositoryImpl repositoryImpl, String cn, String workspaceName)
48  			throws RepositoryException {
49  		this.cn = cn;
50  		this.workspaceName = workspaceName;
51  		this.repositoryImpl = repositoryImpl;
52  	}
53  
54  	public void init() {
55  		session = KernelUtils.openAdminSession(repositoryImpl, workspaceName);
56  		try {
57  			String[] nodeTypes = { NodeType.NT_FILE, NodeType.MIX_LAST_MODIFIED };
58  			session.getWorkspace().getObservationManager().addEventListener(this,
59  					Event.NODE_ADDED | Event.PROPERTY_CHANGED, "/", true, null, nodeTypes, true);
60  			versionManager = session.getWorkspace().getVersionManager();
61  
62  			indexingThread = new IndexingThread();
63  			indexingThread.start();
64  		} catch (RepositoryException e1) {
65  			throw new IllegalStateException(e1);
66  		}
67  	}
68  
69  	public void destroy() {
70  		stopping.set(true);
71  		indexingThread.interrupt();
72  		// TODO make it configurable
73  		try {
74  			indexingThread.join(10 * 60 * 1000);
75  		} catch (InterruptedException e1) {
76  			log.warn("Indexing thread interrupted. Will log out session.");
77  		}
78  
79  		try {
80  			session.getWorkspace().getObservationManager().removeEventListener(this);
81  		} catch (RepositoryException e) {
82  			if (log.isTraceEnabled())
83  				log.warn("Cannot unregistered JCR event listener", e);
84  		} finally {
85  			JcrUtils.logoutQuietly(session);
86  		}
87  	}
88  
89  	private synchronized void processEvents(EventIterator events) {
90  		long begin = System.currentTimeMillis();
91  		long count = 0;
92  		while (events.hasNext()) {
93  			Event event = events.nextEvent();
94  			try {
95  				toProcess.put(event);
96  			} catch (InterruptedException e) {
97  				e.printStackTrace();
98  			}
99  //			processEvent(event);
100 			count++;
101 		}
102 		long duration = System.currentTimeMillis() - begin;
103 		if (log.isTraceEnabled())
104 			log.trace("Processed " + count + " events in " + duration + " ms");
105 		notifyAll();
106 	}
107 
108 	protected synchronized void processEvent(Event event) {
109 		try {
110 			String eventPath = event.getPath();
111 			if (event.getType() == Event.NODE_ADDED) {
112 				if (!versionManager.isCheckedOut(eventPath))
113 					return;// ignore checked-in nodes
114 				if (log.isTraceEnabled())
115 					log.trace("NODE_ADDED " + eventPath);
116 //				session.refresh(true);
117 				session.refresh(false);
118 				Node node = session.getNode(eventPath);
119 				Node parentNode = node.getParent();
120 				if (parentNode.isNodeType(NodeType.NT_FILE)) {
121 					if (node.isNodeType(NodeType.NT_UNSTRUCTURED)) {
122 						if (!node.isNodeType(NodeType.MIX_LAST_MODIFIED))
123 							node.addMixin(NodeType.MIX_LAST_MODIFIED);
124 						Property property = node.getProperty(Property.JCR_DATA);
125 						String etag = toEtag(property.getValue());
126 						session.save();
127 						node.setProperty(JCR_ETAG, etag);
128 						if (log.isTraceEnabled())
129 							log.trace("ETag and last modified added to new " + node);
130 					} else if (node.isNodeType(NodeType.NT_RESOURCE)) {
131 //						if (!node.isNodeType(MIX_ETAG))
132 //							node.addMixin(MIX_ETAG);
133 //						session.save();
134 //						Property property = node.getProperty(Property.JCR_DATA);
135 //						String etag = toEtag(property.getValue());
136 //						node.setProperty(JCR_ETAG, etag);
137 //						session.save();
138 					}
139 //					setLastModifiedRecursive(parentNode, event);
140 //					session.save();
141 //					if (log.isTraceEnabled())
142 //						log.trace("ETag and last modified added to new " + node);
143 				}
144 
145 //				if (node.isNodeType(NodeType.NT_FOLDER)) {
146 //					setLastModifiedRecursive(node, event);
147 //					session.save();
148 //					if (log.isTraceEnabled())
149 //						log.trace("Last modified added to new " + node);
150 //				}
151 			} else if (event.getType() == Event.PROPERTY_CHANGED) {
152 				String propertyName = extractItemName(eventPath);
153 				// skip if last modified properties are explicitly set
154 				if (!propertyName.equals(JCR_DATA))
155 					return;
156 //				if (propertyName.equals(JCR_LAST_MODIFIED))
157 //					return;
158 //				if (propertyName.equals(JCR_LAST_MODIFIED_BY))
159 //					return;
160 //				if (propertyName.equals(JCR_MIXIN_TYPES))
161 //					return;
162 //				if (propertyName.equals(JCR_ETAG))
163 //					return;
164 
165 				if (log.isTraceEnabled())
166 					log.trace("PROPERTY_CHANGED " + eventPath);
167 
168 				if (!session.propertyExists(eventPath))
169 					return;
170 				session.refresh(false);
171 				Property property = session.getProperty(eventPath);
172 				Node node = property.getParent();
173 				if (property.getType() == PropertyType.BINARY && propertyName.equals(JCR_DATA)
174 						&& node.isNodeType(NodeType.NT_UNSTRUCTURED)) {
175 					String etag = toEtag(property.getValue());
176 					node.setProperty(JCR_ETAG, etag);
177 					Node parentNode = node.getParent();
178 					if (parentNode.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
179 						setLastModified(parentNode, event);
180 					}
181 					if (log.isTraceEnabled())
182 						log.trace("ETag and last modified updated for " + node);
183 				}
184 //				setLastModified(node, event);
185 //				session.save();
186 //				if (log.isTraceEnabled())
187 //					log.trace("ETag and last modified updated for " + node);
188 			} else if (event.getType() == Event.NODE_REMOVED) {
189 				String removeNodePath = eventPath;
190 				String nodeName = extractItemName(eventPath);
191 				if (JCR_CONTENT.equals(nodeName)) // parent is a file, deleted anyhow
192 					return;
193 				if (log.isTraceEnabled())
194 					log.trace("NODE_REMOVED " + eventPath);
195 //				String parentPath = JcrUtils.parentPath(removeNodePath);
196 //				session.refresh(true);
197 //				setLastModified(parentPath, event);
198 //				session.save();
199 				if (log.isTraceEnabled())
200 					log.trace("Last modified updated for parents of removed " + removeNodePath);
201 			}
202 		} catch (Exception e) {
203 			if (log.isTraceEnabled())
204 				log.warn("Cannot process event " + event, e);
205 		} finally {
206 //			try {
207 //				session.refresh(true);
208 //				if (session.hasPendingChanges())
209 //					session.save();
210 ////				session.refresh(false);
211 //			} catch (RepositoryException e) {
212 //				if (log.isTraceEnabled())
213 //					log.warn("Cannot refresh JCR session", e);
214 //			}
215 		}
216 
217 	}
218 
219 	private String extractItemName(String path) {
220 		if (path == null || path.length() <= 1)
221 			return null;
222 		int lastIndex = path.lastIndexOf('/');
223 		if (lastIndex >= 0) {
224 			return path.substring(lastIndex + 1);
225 		} else {
226 			return path;
227 		}
228 	}
229 
230 	@Override
231 	public void onEvent(EventIterator events) {
232 		processEvents(events);
233 //		Runnable toRun = new Runnable() {
234 //
235 //			@Override
236 //			public void run() {
237 //				processEvents(events);
238 //			}
239 //		};
240 //		Future<?> future = Activator.getInternalExecutorService().submit(toRun);
241 //		try {
242 //			// make the call synchronous
243 //			future.get(60, TimeUnit.SECONDS);
244 //		} catch (TimeoutException | ExecutionException | InterruptedException e) {
245 //			// silent
246 //		}
247 	}
248 
249 	static String toEtag(Value v) {
250 		if (v instanceof JackrabbitValue) {
251 			JackrabbitValue value = (JackrabbitValue) v;
252 			return '\"' + value.getContentIdentity() + '\"';
253 		} else {
254 			return null;
255 		}
256 
257 	}
258 
259 	protected synchronized void setLastModified(Node node, Event event) throws RepositoryException {
260 		GregorianCalendar calendar = new GregorianCalendar();
261 		calendar.setTimeInMillis(event.getDate());
262 		node.setProperty(Property.JCR_LAST_MODIFIED, calendar);
263 		node.setProperty(Property.JCR_LAST_MODIFIED_BY, event.getUserID());
264 		if (log.isTraceEnabled())
265 			log.trace("Last modified set on " + node);
266 	}
267 
268 	/** Recursively set the last updated time on parents. */
269 	protected synchronized void setLastModifiedRecursive(Node node, Event event) throws RepositoryException {
270 		if (versionManager.isCheckedOut(node.getPath())) {
271 			if (node.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
272 				setLastModified(node, event);
273 			}
274 			if (node.isNodeType(NodeType.NT_FOLDER) && !node.isNodeType(NodeType.MIX_LAST_MODIFIED)) {
275 				node.addMixin(NodeType.MIX_LAST_MODIFIED);
276 				if (log.isTraceEnabled())
277 					log.trace("Last modified mix-in added to " + node);
278 			}
279 
280 		}
281 
282 		// end condition
283 		if (node.getDepth() == 0) {
284 //			try {
285 //				node.getSession().save();
286 //			} catch (RepositoryException e) {
287 //				log.warn("Cannot index workspace", e);
288 //			}
289 			return;
290 		} else {
291 			Node parent = node.getParent();
292 			setLastModifiedRecursive(parent, event);
293 		}
294 	}
295 
296 	/**
297 	 * Recursively set the last updated time on parents. Useful to use paths when
298 	 * dealing with deletions.
299 	 */
300 	protected synchronized void setLastModifiedRecursive(String path, Event event) throws RepositoryException {
301 		// root node will always exist, so end condition is delegated to the other
302 		// recursive setLastModified method
303 		if (session.nodeExists(path)) {
304 			setLastModifiedRecursive(session.getNode(path), event);
305 		} else {
306 			setLastModifiedRecursive(JcrUtils.parentPath(path), event);
307 		}
308 	}
309 
310 	@Override
311 	public String toString() {
312 		return "Indexer for workspace " + workspaceName + " of repository " + cn;
313 	}
314 
315 	class IndexingThread extends Thread {
316 
317 		public IndexingThread() {
318 			super(CmsWorkspaceIndexer.this.toString());
319 			// TODO Auto-generated constructor stub
320 		}
321 
322 		@Override
323 		public void run() {
324 			life: while (session != null && session.isLive()) {
325 				try {
326 					Event nextEvent = toProcess.take();
327 					processEvent(nextEvent);
328 				} catch (InterruptedException e) {
329 					// silent
330 					interrupted();
331 				}
332 
333 				if (stopping.get() && toProcess.isEmpty()) {
334 					break life;
335 				}
336 			}
337 			if (log.isDebugEnabled())
338 				log.debug(CmsWorkspaceIndexer.this.toString() + " has shut down.");
339 		}
340 
341 	}
342 
343 }