View Javadoc
1   package org.argeo.util;
2   
3   import java.io.IOException;
4   import java.nio.ByteBuffer;
5   import java.nio.channels.AsynchronousByteChannel;
6   import java.nio.channels.CompletionHandler;
7   import java.nio.channels.ReadableByteChannel;
8   import java.nio.channels.WritableByteChannel;
9   import java.util.concurrent.ExecutorService;
10  import java.util.concurrent.Future;
11  
12  /** An {@link AsynchronousByteChannel} based on an {@link ExecutorService}. */
13  public class ServiceChannel implements AsynchronousByteChannel {
14  	private final ReadableByteChannel in;
15  	private final WritableByteChannel out;
16  
17  	private boolean open = true;
18  
19  	private ExecutorService executor;
20  
21  	public ServiceChannel(ReadableByteChannel in, WritableByteChannel out, ExecutorService executor) {
22  		this.in = in;
23  		this.out = out;
24  		this.executor = executor;
25  	}
26  
27  	@Override
28  	public Future<Integer> read(ByteBuffer dst) {
29  		return executor.submit(() -> in.read(dst));
30  	}
31  
32  	@Override
33  	public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
34  		try {
35  			Future<Integer> res = read(dst);
36  			handler.completed(res.get(), attachment);
37  		} catch (Exception e) {
38  			handler.failed(e, attachment);
39  		}
40  	}
41  
42  	@Override
43  	public Future<Integer> write(ByteBuffer src) {
44  		return executor.submit(() -> out.write(src));
45  	}
46  
47  	@Override
48  	public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
49  		try {
50  			Future<Integer> res = write(src);
51  			handler.completed(res.get(), attachment);
52  		} catch (Exception e) {
53  			handler.failed(e, attachment);
54  		}
55  	}
56  
57  	@Override
58  	public synchronized void close() throws IOException {
59  		try {
60  			in.close();
61  		} catch (Exception e) {
62  			e.printStackTrace();
63  		}
64  		try {
65  			out.close();
66  		} catch (Exception e) {
67  			e.printStackTrace();
68  		}
69  		open = false;
70  		notifyAll();
71  	}
72  
73  	@Override
74  	public synchronized boolean isOpen() {
75  		return open;
76  	}
77  
78  }