1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.argeo.cms.tabular;
17
18 import java.io.InputStream;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.concurrent.ArrayBlockingQueue;
22
23 import javax.jcr.Binary;
24 import javax.jcr.Node;
25 import javax.jcr.NodeIterator;
26 import javax.jcr.Property;
27 import javax.jcr.PropertyType;
28 import javax.jcr.RepositoryException;
29
30 import org.apache.commons.io.IOUtils;
31 import org.argeo.api.tabular.ArrayTabularRow;
32 import org.argeo.api.tabular.TabularColumn;
33 import org.argeo.api.tabular.TabularRow;
34 import org.argeo.api.tabular.TabularRowIterator;
35 import org.argeo.cms.ArgeoTypes;
36 import org.argeo.jcr.ArgeoJcrException;
37 import org.argeo.util.CsvParser;
38
39
40 public class JcrTabularRowIterator implements TabularRowIterator {
41 private Boolean hasNext = null;
42 private Boolean parsingCompleted = false;
43
44 private Long currentRowNumber = 0l;
45
46 private List<TabularColumn> header = new ArrayList<TabularColumn>();
47
48
49 private Binary binary;
50 private InputStream in;
51
52 private CsvParser csvParser;
53 private ArrayBlockingQueue<List<String>> textLines;
54
55 public JcrTabularRowIterator(Node tableNode) {
56 try {
57 for (NodeIterator it = tableNode.getNodes(); it.hasNext();) {
58 Node node = it.nextNode();
59 if (node.isNodeType(ArgeoTypes.ARGEO_COLUMN)) {
60 Integer type = PropertyType.valueFromName(node.getProperty(
61 Property.JCR_REQUIRED_TYPE).getString());
62 TabularColumn tc = new TabularColumn(node.getProperty(
63 Property.JCR_TITLE).getString(), type);
64 header.add(tc);
65 }
66 }
67 Node contentNode = tableNode.getNode(Property.JCR_CONTENT);
68 if (contentNode.isNodeType(ArgeoTypes.ARGEO_CSV)) {
69 textLines = new ArrayBlockingQueue<List<String>>(1000);
70 csvParser = new CsvParser() {
71 protected void processLine(Integer lineNumber,
72 List<String> header, List<String> tokens) {
73 try {
74 textLines.put(tokens);
75 } catch (InterruptedException e) {
76
77 e.printStackTrace();
78 }
79
80 if (hasNext == null) {
81 hasNext = true;
82 synchronized (JcrTabularRowIterator.this) {
83 JcrTabularRowIterator.this.notifyAll();
84 }
85 }
86 }
87 };
88 csvParser.setNoHeader(true);
89 binary = contentNode.getProperty(Property.JCR_DATA).getBinary();
90 in = binary.getStream();
91 Thread thread = new Thread(contentNode.getPath() + " reader") {
92 public void run() {
93 try {
94 csvParser.parse(in);
95 } finally {
96 parsingCompleted = true;
97 IOUtils.closeQuietly(in);
98 }
99 }
100 };
101 thread.start();
102 }
103 } catch (RepositoryException e) {
104 throw new ArgeoJcrException("Cannot read table " + tableNode, e);
105 }
106 }
107
108 public synchronized boolean hasNext() {
109
110
111
112
113
114
115
116
117
118
119
120
121 if (!textLines.isEmpty())
122 return true;
123
124
125 while (!parsingCompleted && textLines.isEmpty())
126 try {
127 wait(100);
128 } catch (InterruptedException e) {
129
130
131 Thread.currentThread().interrupt();
132 break;
133 }
134
135
136 if (!textLines.isEmpty())
137 return true;
138
139
140 return false;
141
142
143
144
145
146
147
148
149
150
151
152
153
154 }
155
156 public synchronized TabularRow next() {
157 try {
158 List<String> tokens = textLines.take();
159 List<Object> objs = new ArrayList<Object>(tokens.size());
160 for (String token : tokens) {
161
162 objs.add(token);
163 }
164 currentRowNumber++;
165 return new ArrayTabularRow(objs);
166 } catch (InterruptedException e) {
167
168
169 }
170 return null;
171 }
172
173 public void remove() {
174 throw new UnsupportedOperationException();
175 }
176
177 public Long getCurrentRowNumber() {
178 return currentRowNumber;
179 }
180
181 public List<TabularColumn> getHeader() {
182 return header;
183 }
184
185 }