View Javadoc
1   /*
2    * Copyright (C) 2007-2012 Argeo GmbH
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.argeo.cms.tabular;
17  
18  import java.io.InputStream;
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  
23  import javax.jcr.Binary;
24  import javax.jcr.Node;
25  import javax.jcr.NodeIterator;
26  import javax.jcr.Property;
27  import javax.jcr.PropertyType;
28  import javax.jcr.RepositoryException;
29  
30  import org.apache.commons.io.IOUtils;
31  import org.argeo.api.tabular.ArrayTabularRow;
32  import org.argeo.api.tabular.TabularColumn;
33  import org.argeo.api.tabular.TabularRow;
34  import org.argeo.api.tabular.TabularRowIterator;
35  import org.argeo.cms.ArgeoTypes;
36  import org.argeo.jcr.ArgeoJcrException;
37  import org.argeo.util.CsvParser;
38  
39  /** Iterates over the rows of a {@link ArgeoTypes#ARGEO_TABLE} node. */
40  public class JcrTabularRowIterator implements TabularRowIterator {
41  	private Boolean hasNext = null;
42  	private Boolean parsingCompleted = false;
43  
44  	private Long currentRowNumber = 0l;
45  
46  	private List<TabularColumn> header = new ArrayList<TabularColumn>();
47  
48  	/** referenced so that we can close it */
49  	private Binary binary;
50  	private InputStream in;
51  
52  	private CsvParser csvParser;
53  	private ArrayBlockingQueue<List<String>> textLines;
54  
55  	public JcrTabularRowIterator(Node tableNode) {
56  		try {
57  			for (NodeIterator it = tableNode.getNodes(); it.hasNext();) {
58  				Node node = it.nextNode();
59  				if (node.isNodeType(ArgeoTypes.ARGEO_COLUMN)) {
60  					Integer type = PropertyType.valueFromName(node.getProperty(
61  							Property.JCR_REQUIRED_TYPE).getString());
62  					TabularColumn tc = new TabularColumn(node.getProperty(
63  							Property.JCR_TITLE).getString(), type);
64  					header.add(tc);
65  				}
66  			}
67  			Node contentNode = tableNode.getNode(Property.JCR_CONTENT);
68  			if (contentNode.isNodeType(ArgeoTypes.ARGEO_CSV)) {
69  				textLines = new ArrayBlockingQueue<List<String>>(1000);
70  				csvParser = new CsvParser() {
71  					protected void processLine(Integer lineNumber,
72  							List<String> header, List<String> tokens) {
73  						try {
74  							textLines.put(tokens);
75  						} catch (InterruptedException e) {
76  							// TODO Auto-generated catch block
77  							e.printStackTrace();
78  						}
79  						// textLines.add(tokens);
80  						if (hasNext == null) {
81  							hasNext = true;
82  							synchronized (JcrTabularRowIterator.this) {
83  								JcrTabularRowIterator.this.notifyAll();
84  							}
85  						}
86  					}
87  				};
88  				csvParser.setNoHeader(true);
89  				binary = contentNode.getProperty(Property.JCR_DATA).getBinary();
90  				in = binary.getStream();
91  				Thread thread = new Thread(contentNode.getPath() + " reader") {
92  					public void run() {
93  						try {
94  							csvParser.parse(in);
95  						} finally {
96  							parsingCompleted = true;
97  							IOUtils.closeQuietly(in);
98  						}
99  					}
100 				};
101 				thread.start();
102 			}
103 		} catch (RepositoryException e) {
104 			throw new ArgeoJcrException("Cannot read table " + tableNode, e);
105 		}
106 	}
107 
108 	public synchronized boolean hasNext() {
109 		// we don't know if there is anything available
110 		// while (hasNext == null)
111 		// try {
112 		// wait();
113 		// } catch (InterruptedException e) {
114 		// // silent
115 		// // FIXME better deal with interruption
116 		// Thread.currentThread().interrupt();
117 		// break;
118 		// }
119 
120 		// buffer not empty
121 		if (!textLines.isEmpty())
122 			return true;
123 
124 		// maybe the parsing is finished but the flag has not been set
125 		while (!parsingCompleted && textLines.isEmpty())
126 			try {
127 				wait(100);
128 			} catch (InterruptedException e) {
129 				// silent
130 				// FIXME better deal with interruption
131 				Thread.currentThread().interrupt();
132 				break;
133 			}
134 
135 		// buffer not empty
136 		if (!textLines.isEmpty())
137 			return true;
138 
139 		// (parsingCompleted && textLines.isEmpty())
140 		return false;
141 
142 		// if (!hasNext && textLines.isEmpty()) {
143 		// if (in != null) {
144 		// IOUtils.closeQuietly(in);
145 		// in = null;
146 		// }
147 		// if (binary != null) {
148 		// JcrUtils.closeQuietly(binary);
149 		// binary = null;
150 		// }
151 		// return false;
152 		// } else
153 		// return true;
154 	}
155 
156 	public synchronized TabularRow next() {
157 		try {
158 			List<String> tokens = textLines.take();
159 			List<Object> objs = new ArrayList<Object>(tokens.size());
160 			for (String token : tokens) {
161 				// TODO convert to other formats using header
162 				objs.add(token);
163 			}
164 			currentRowNumber++;
165 			return new ArrayTabularRow(objs);
166 		} catch (InterruptedException e) {
167 			// silent
168 			// FIXME better deal with interruption
169 		}
170 		return null;
171 	}
172 
173 	public void remove() {
174 		throw new UnsupportedOperationException();
175 	}
176 
177 	public Long getCurrentRowNumber() {
178 		return currentRowNumber;
179 	}
180 
181 	public List<TabularColumn> getHeader() {
182 		return header;
183 	}
184 
185 }