java-nio

Java NIO

为什么要使用 NIO

在大多数情况下,Java 应用程序并非真的受着 I/O 的束缚。操作系统并非不能快速传送数据,让 Java 有事可做;相反,是 JVM 自身在 I/O 方面效率欠佳。操作系统与 Java 基于流的 I/O模型有些不匹配。操作系统要移动的是大块数据(缓冲区),这往往是在硬件直接存储器存取(DMA)的协助下完成的。而 JVM 的 I/O 类喜欢操作小块数据——单个字节、几行文本。结果,操作系统送来整缓冲区的数据,java.io的流数据类再花大量时间把它们拆成小块,往往拷贝一个小块就要往返于几层对象。操作系统喜欢整卡车地运来数据,java.io 类则喜欢一铲子一铲子地加工数据。有了 NIO,就可以轻松地把一卡车数据备份到您能直接使用的地方(ByteBuffer 对象)。

这并不是说使用传统的 I/O 模型无法移动大量数据——当然可以(现在依然可以)。具体地说,RandomAccessFile 类在这方面的效率就不低,只要坚持使用基于数组的 read( )write( )方法。这些方法与底层操作系统调用相当接近,尽管必须保留至少一份缓冲区拷贝。

内存页面调度

为了支持虚拟内存的第二个特性(寻址空间大于物理内存),就必须进行虚拟内存分页(经常称为交换,虽然真正的交换是在进程层面完成,而非页层面)。依照该方案,虚拟内存空间的页面能够继续存在于外部磁盘存储,这样就为物理内存中的其他虚拟页面腾出了空间。从本质上说,物理内存充当了分页区的高速缓存;而所谓分页区,即从物理内存置换出来,转而存储于磁盘上的内存页面。

Buffer 线程安全吗?

缓冲区并不是多线程安全的。如果您想以多线程同时存取特定的缓冲区,您需要在存取缓冲区之前进行同步(例如对缓冲区对象进行跟踪) 。

ByteBuffer 的重要属性

  • ByteBuffer.position(): 下一个要被读或写的元素的索引。位置会自动由相应的 get()put() 函数更新。
  • ByteBuffer.limit(): 缓冲区的第一个不能被读或写的元素。 或者说,缓冲区中现存元素的计数。
1
int length = byteBuffer.limit();
  • ByteBuffer.hasRemaining(): Tells whether there are any elements between the current position and the limit.
1
2
3
4
5
6
7
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (this.byteBufferHeader.hasRemaining()) {
transferred += target.write(this.byteBufferHeader);
return transferred;
}
return 0;
}

读取与写入:

① 准备写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ByteBuffer result = ByteBuffer.allocate(size);
result.put(...);
result.put(...);
result.put(...);
// ...
// ...
// ...
// flip it !
result.flip();
// ...
// ...
// ...
while (result.hasRemaining()) {
int length = socketChannel.write(result);
// ...
}

② 准备读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
while (byteBufferBody.hasRemaining()) {
int length = socketChannel.read(byteBufferBody);
// ...
}
// ...
// ...
// ...
// flip it !
byteBufferBody.flip();
// ...
// ...
// ...
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);

byte[] headerData2 = new byte[headerLength];
byteBuffer.get(headerData2);

byte[] headerData3 = new byte[headerLength];
byteBuffer.get(headerData3);

复制缓冲区:

  • duplicate(): 创建了一个与原始缓冲区相似的新缓冲区。两个缓冲区共享数据元素,拥有同样的容量,但每个缓冲区拥有各自的位置,上界和标记属性。对一个缓冲区内的数据元素所做的改变会反映在另外一个缓冲区上。这一副本缓冲区具有与原始缓冲区同样的数据视图。如果原始的缓冲区为只读,或者为直接缓冲区,新的缓冲区将继承这些属性。
  • asReadOnlyBuffer():生成一个只读的缓冲区视图。这与 duplicate() 相同,除了这个新的缓冲区不允许使用 put(),并且其 isReadOnly() 函数将会返回 true
  • slice(): 创建一个从原始缓冲区的当前位置开始的新缓冲区,并且其容量是原始缓冲区的剩余元素数量(limit - position)。这个新缓冲区与原始缓冲区共享一段数据元素子序列。
1
2
3
CharBuffer buffer = CharBuffer.allocate(8);
buffer.position(3).limit (5);
CharBuffer sliceBuffer = buffer.slice();

假设原来的数据集是这样的:

1
|A|B|C|D|E|F|G|H| |
  • 8 位置不存储字符,capacity 为 8
  • position 当前指向 3,即 D 字符
  • limit 当前指向 5,即 F 字符

那么,当调用 slice 之后,新创建的 CharBuffer 的数据集是这样的:

1
|D|E| |
  • 2 位置不存储字符,capacity 为 2
  • position 当前指向 0,即 D 字符
  • limit 当前指向 2,即结束字符

ZooKeeper 中的直接应用:

1
2
3
4
5
6
7
8
9
10
11
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable "
+ message.readableBytes()
+ " bb len " + bb.remaining() + " " + bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes "
+ Long.toHexString(sessionId)
+ " bb 0x"
+ ChannelBuffers.hexDump(ChannelBuffers.copiedBuffer(dat)));
}

  • clear() :将缓冲区重置为空状态。 它并不改变缓冲区中的任何数据元素,而是仅仅将上界设为容量的值,并把位置设回 0

创建缓冲区

新的缓冲区是由分配或包装操作创建的。allocate 操作创建一个缓冲区对象并分配一个私有的空间来储存容量大小的数据元素。wrap 操作创建一个缓冲区对象但是不分配任何空间来储存数据元素。它使用您所提供的数组作为存储空间来储存缓冲区中的数据元素。

要分配一个容量为 100 个 char 变量的 Charbuffer:

1
CharBuffer charBuffer = CharBuffer.allocate (100);

这段代码隐含地从堆空间中分配了一个 char 型数组作为备份存储器来储存 100 个 char 变量。

如果您想提供您自己的数组用做缓冲区的备份存储器,请调用 wrap() 函数:

1
2
char [] myArray = new char [100];
CharBuffer charbuffer = CharBuffer.wrap (myArray);

这段代码构造了一个新的缓冲区对象,但数据元素会存在于数组中。这意味着通过调用 put() 函数造成的对缓冲区的改动会直接影响这个数组,而且对这个数组的任何改动也会对这个缓冲区对象可见。


ZooKeeper 中的应用:

1
2
3
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());

翻转

我们已经写满了缓冲区,现在我们必须准备将其清空。我们想把这个缓冲区传递给一个通道,以使内容能被全部写出。但如果通道现在在缓冲区上执行 get(),那么它将从我们刚刚插入的有用数据之外取出未定义数据。如果我们将位置值重新设为 0,通道就会从正确位置开始获取,但是它是怎样知道何时到达我们所插入数据末端的呢?这就是上界属性被引入的目的。上界属性指明了缓冲区有效内容的末端。我们需要将上界属性设置为当前位置,然后将位置重置为 0。我们可以人工用下面的代码实现:

1
buffer.limit(buffer.position()).position(0);

但这种从填充到释放状态的缓冲区翻转是 API 设计者预先设计好的,他们为我们提供了一个非常便利的函数:

1
Buffer.flip();

flip() 函数将一个能够继续添加数据元素的填充状态的缓冲区翻转成一个准备读出元素的释放状态

rewind() 函数与 flip() 相似,但不影响上界属性。它只是将位置值设回 0。您可以使用 rewind() 后退,重读已经被翻转的缓冲区中的数据。

如果将缓冲区翻转两次会怎样呢?它实际上会大小变为 0。把上界设为位置的值,并把位置设为 0。上界和位置都变成 0。尝试对缓冲区上
位置和上界都为 0 的 get() 操作会导致 BufferUnderflowException 异常。而 put() 则会导致 BufferOverflowException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final int remaining() {
return limit - position;
}

public final Buffer flip() {
limit = position; // 比 rewind() 多了这一行
position = 0;
mark = -1;
return this;
}

public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

Selector

仅用单个线程来处理多个 Channels 的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

下面是单线程使用一个 Selector 处理 3 个 channel 的示例图:

Java NIO 根据操作系统不同, 针对 nio 中的 Selector 有不同的实现:


参考:

MappedFileAppending

使用 MappedByteBuffer 来向一个文件里面追加数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 分配 1M 内存
int count = 1024 * 1000;
String bigFile = "D:\\android\\nio\\FileUtils.java";
String date = "abclasdjfladslfjaldsfj";

// 打开一个文件
RandomAccessFile memoryMappedFile = new RandomAccessFile(bigFile, "rw");
int fileSize = (int) memoryMappedFile.length();
FileChannel fileChannel = memoryMappedFile.getChannel();

// 将文件映射到内存
MappedByteBuffer mBuff = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, count);

//指针移动到文件最后
mBuff.position(fileSize);

// 往 memory mapped file 中添加数据
mBuff.put(data.getBytes());
int newFileSize = mBuff.position();

// We have to unmap the memory file first
Class<?> fcClass = fileChannel.getClass();
try {
java.lang.reflect.Method unmapMethod =
fcClass.getDeclaredMethod("unmap", new Class[]{ java.nio.MappedByteBuffer.class });
unmapMethod.setAccessible(true);
unmapMethod.invoke(null, new Object[]{ mBuff });
} catch (Exception e) {
e.printStackTrace();
}

// 我们分配了 1MB, but we just use newFileSize bytes
if (newFileSize < count) {
fileChannel.truncate(newFileSize);
}
memoryMappedFile.close();
fileChannel.close();

ByteOrder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bbuf.order(ByteOrder.BIG_ENDIAN);
// ==> [7,9,0,0,0,0,0,0,0,0]
bbuf.asShortBuffer().put(0, (short) 0x0709);
// ==> [0,0,7,9,0,0,0,0,0,0]
// bbuf.asShortBuffer().put(1, (short) 0x0709);
// ==> [0,0,0,0,7,9,0,0,0,0]
// bbuf.asShortBuffer().put(2, (short) 0x0709);
System.out.println(Arrays.toString(bbuf.array()));

bbuf.order(ByteOrder.LITTLE_ENDIAN);
// ==> [9,7,0,0,0,0,0,0,0,0]
bbuf.asShortBuffer().put(0, (short) 0x0709);
// ==> [0,0,9,7,0,0,0,0,0,0]
// bbuf.asShortBuffer().put(1, (short) 0x0709);
// ==> [0,0,0,0,9,7,0,0,0,0]
// bbuf.asShortBuffer().put(1, (short) 0x0709);
System.out.println(Arrays.toString(bbuf.array()));

// Endian byte ordering affects integer and floating point data, does not affect character strings as they maintain the string order as viewed and intended by the programmer
// 没有影响
bbuf.order(ByteOrder.LITTLE_ENDIAN);
bbuf.put("ABC".getBytes());

使用 FileChannel 来拷贝文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
File srcFile = new File(srcFileName);
File dstFile = new File(dstFileName);
if (!dstFile.exists()) {
dstFile.createNewFile();
}

FileInputStream in = new FileInputStream(srcFile);
FileOutputStream out = new FileOutputStream(dstFile);
FileChannel inc = in.getChannel();
FileChannel outc = out.getChannel();

long count = 0;
long size = inc.size();
while (count < size) {
count += outc.transferFrom(inc, 0, size - count);
}

in.close();
out.close();
inc.close();
outc.close();

使用 AsynchronousFileChannel 来读取大文件

ByteBuffer 转为字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
String content = "0123456789";
// 分配 ByteBuffer
ByteBuffer buffer = ByteBuffer.wrap(content.getBytes());

// ByteBuffer buffer = ByteBuffer.allocateDirect(100);
// buffer.put(content.getBytes());
// buffer.position(0);

String s;

// way 1
// Remember to check hasArray() before using this method
// hasArray() return false if we allocate the buffer using allocateDirect() method
if (buffer.hasArray()) {
s = new String(buffer.array(), Charset.forName("US-ASCII"));
}

// way 2
CharBuffer cb = Charset.forName("US-ASCII").decode(buffer);
s = sb.toString();

// way 3
buffer.position(0);
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
v = new String(bytes, Charset.forName("US-ASCII"));

从字符串创建 ByteBuffer


Working with Buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 1. Filling the buffer
char[] mbuff = new char[20];

// [x,x,x,1,2,3,4,5,6,7,8,9,x,x,x,x,x,x,x,x]
// we want to put char from position=3, and max length that allows is 9
// CharBuffer cbuf = CharBuffer.wrap(mbuff, 3, 9);
// capacity: 20, limit: 12, position: 12
// cbuf.put("1234456789");

// BufferOverflowException
// cbuf.put("1234567890");

CharBuffer cbuf = CharBuffer.wrap(mbuff);
cbuf.put("1234456789");

// [1,2,3,4,5,6,7,8,9,x,x,x,x,x,x,x,x,x,x,x]
// Arrays.toString(cbuf.array());

// 20
// cbuf.capacity();

// 20
// cbuf.limit();

// 9
int currentPos = cbuf.position();

// ====================================================

cbuf.position(0);
// 加上这句话,输出 123456789;否则:123456789xxxxxxxxxxx
cbuf.limit(currentPos);

// we can replace position() and limit() by flip()
// cbuf.flip();

while (cbuf.hasRemaining()) {
System.out.println(cbuf.get());
}

// ================================

// Once you done reading data out of the buffer
// you have to make the buffer ready for write again
// You can do so either by calling clear() or by calling compact()

// 123456789
cbuf.rewind();

// 123456789xxxxxxxxxxx
// cbuf.compact();

// 123456789xxxxxxxxxxx
// cbuf.clear();
while (cbuf.hasRemaining()) {
System.out.println(cbuf.get());
}

// ===================================

int k = 0;
while (cbuf.hasRemaining()) {
if (k == 5)
cbuf.mark();
System.out.println(cbuf.get());
k++;
}

// 6789, 不加这句话,还是 123456789
// The reset() method sets the position to current mark
// If the mark is undefined, calling reset() throw InvalidMarkException
cbuf.reset();
while (cbuf.hasRemaining()) {
System.out.println(cbuf.get());
}

使用 FileChannel 读取文件的几种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// ====Read file using ByteBuffer of a size========
Path path = Paths.get(fileName);
FileChannel fileChannel = FileChannel.open(path);
long fileSize = fileChannel.size();
ByteBuffer buf = ByteBuffer.allocate((int) fileSize);
fileChannel.read(buf);
// set limit = current position, set position = 0
// 如果不加这句话,那么会报异常:
// BufferUnderflowException
buf.flip();
for (int i=0; i<fileSize; i++) {
System.out.print((char) buf.get());
}
fileChannel.close();

// ====Read a file with fixed size ByteBuffer====
int BUFFER = 1024;

Path path = Paths.get(fileName);
FileChannel fileChannel = FileChannel.open(path);
long fileSize = fileChannel.size();
ByteBuffer buf = ByteBuffer.allocate(BUFFER);

int count = 0;
int bread = 0;
while ((bread = fileChannel.read(buf)) != -1) {
count += bread;
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
// 如果不加这句话,屏幕上一直重复输出读取的这个文件的内容…
buf.clear();
}

for (int i=0; i<fileSize; i++) {
System.out.print((char) buf.get());
}
fileChannel.close();

//=======Read Small Text File=======
Path path = Paths.get(fileName);
List<String> list = Files.readAllLines(path, Charset.forName("US-ASCII"));
for (int i=0; i<list.size(); i++) {
System.out.print(list.get(i));
}

//=======Read Large Text File=======
Path path = Paths.get(fileName);
Scanner scanner = new Scanner(path, Charset.forName("US-ASCII").name());
while (scanner.hasNextLine()) {
System.out.println(scanner.nextLine());
}
scanner.close();

//======Read Large Text File Way 2======
Path path = Paths.get(fileName);
BufferedReader reader = Files.newBufferedReader(path, Charset.forName("US-ASCII"));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();

//======Read File with MappedByteBuffer======
Path path = Paths.get(fileName);
FileChannel fileChannel = FileChannel.open(path);
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY,0,fileChannel.size());
buffer.load();
int count = buffer.limit();
for (int i=0; i<count; i++) {
System.out.print((char) buffer.get());
}

使用 FileChannel 写文件的几种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
String input = "";
String fileName = "D:\\output.txt";

//=========Write String to File============
ByteBuffer buf = ByteBuffer.wrap(input.getBytes());
FileOutputStream fos = new FileOutputStream(fileName);
FileChannel fileChannel = fos.getChannel();
fileChannel.write(buf);
fileChannel.close();
fos.close();

//=========Write String to File Way 2============
ByteBuffer buf = ByteBuffer.wrap(input.getBytes());
FileChannel fileChannel = openFile();
// NonWritableChannelException
// 将 openFile 改为 createFile 即可
fileChannel.write(buf);
fileChannel.close();

//=========Write Small Text to File============
List<String> list = Collections.unmodifiableList(Arrays.asList("hello", "world"));
Path path = Paths.get(fileName);
Files.write(file, list, charset);

//=========Write Large Text to File============
Path path = Paths.get(fileName);
try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
for (String line: list) {
writer.write(line);
writer.newLine();
}
}

//========Write Ints to File===============
int[] ints = new int[]{ 23,456,67,45 };

void writeIntsToFile() throws IOException {
FileChannel fileChannel = createFile();
// 4 可以替换为 Integer.SIZE / 8
ByteBuffer buf = ByteBuffer.allocate(ints.length * 4);
IntBuffer ib = buf.asIntBuffer();
for (int i=0; i<ints.length; i++) {
ib.put(ints[i]);
}
fileChannel.write(buf);
fileChannel.close();
}

void readIntsFromFile() throws IOException {
FileChannel fileChannel = openFile();
ByteBuffer buf = ByteBuffer.allocate((int) fileChannel.size());
IntBuffer = buf.asIntBuffer();
fileChannel.read(buf);
while (ib.hasRemaining()) {
System.out.println(ib.get());
}
fileChannel.close();
}

FileChannel openFile() throws IOException {
Path path = Paths.get(fileName);
FileChannel fileChannel = FileChannel.open(path);
return fileChannel();
}

FileChannel createFile() throws IOException {
Path path = Paths.get(filename);
FileChannel fileChannel = FileChannel.open(path,
EnumSet.of(StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE));
return fileChannel;
}

使用 SocketChannel 来传输一个 Object

使用 SocketChannel 来接受对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Transfer a object over SocketChannel in non blocking mode
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress("localhost", 9000));
server.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
continue;
}

if (key.isReadable()) {
SocketChannel channel = key.channel();
getSocketObject(channel);
channel.close();
}
}
}

void getSocketObject(SocketChannel channel) {
ByteBuffer data = ByteBuffer.allocate(100);
channel.read(data);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data.array());
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
Student student = (Student) objectInputStream.readObject();
}

public class Student implements Serializable {
}

使用 SocketChannel 来发送一个对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Selector selector = Selector.open();
SocketChannel clientSocket = SocketChannel.open();
clientSocket.configureBlocking(false);
clientSocket.connect(new InetSocketAddress("localhost", 9000));
clientSocket.register(selector, SelectionKey.OP_CONNECT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
iterator.remove();
SocketChannel client = (SocketChannel) key.channel();
// Check a connection was established with a remote server
if (key.isConnectable()) {
if (client.isConnectionPending()) {
try {
client.finishConnect();
} catch (IOException e) {

}
}

client.register(selector, SelectionKey.OP_WRITE);
continue;
}

if (key.isWritable()) {
sendSocketObject(client);
client.close();
return;
}
}

void sendSocketObject(SocketChannel client) throws IOException {
Student student = new Student();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutptuStream();

ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(student);
objectOutputStream.flush();

client.write(ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
}

Transfer Large file through SocketChannel

SocketChannel 读取大文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void readFileFromSocketChannel(SocketChannel socketChannel) throws IOException {
Path path = Paths.get("d:\\a.pdf");
FileChannel fileChannel = FileChannel.open(path,
EnumSet.of(StandardOpenOperation.CREATE,
StandardOpenOperation.TRUNCATE_EXISTING,
StandradOpenOperation.WRITE));
// Allocate a ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) > 0) {
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
}
fileChannel.close();
}

使用 SocketChannel 发送大文件:

1
2
3
4
5
6
7
8
9
10
11
12
void sendFile(SocketChannel socketChannel) throws IOException {
Path path = Paths.get("d:\\src.pdf");
FileChannel inChannel = FileChannel.open(path);

ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) > 0) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
socketChannel.close();
}

使用 AsynchrousFileChannelFuture 来读取大文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ByteBuffer buffer = ByteBuffer.allocate(20);
Path path = Paths.get(FILENAME);

AsynchronousFileChannel asyncChannel = AsynchronousFileChannel.open(path);
int pos = 0;

do {
// read many times until done
Future<Integer> fileResult = asyncChannel.read(buffer, pos);

// wait if neccessary for the computation to complete, and then retrieves its result
Integer blockread = fileResult.get();
if (blockread < 0) {
break;
}

// read a next block with the current position
pos += blockread;

buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
buffer.clear();
} while (true);

使用 AsynchonousFileChannelCompletionHandler 来读取大文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int capacity = 4 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
String inFileName = "src.pdf";
String outFileName = "dst.pdf";

FileChannel fileChannel = FileChannel.open(Paths.get(inFileName));
AsynchronousFileChannel outAsynchChannel = AsynchronousFileChannel.open(Paths.get(outFileName),
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);

BlockingQueue<Boolean> done = new ArrayBlockingQueue<Boolean>(1);
long position = 0;
int receiveBytes;

while ((receiveBytes = inChannel.read(buffer)) >= 0) {
buffer.flip();
outAsynchChannel.write(buffer, position, done, new CompletionHandler<Integer, BlockingQueue<Boolean>>() {

@Override
public void completed(Integer result, BlockingQueue<Boolean> attachment) {
try {
attachment.put(true);
} catch (InterruptedException e) {
}
}

@Override
public void failed(Throwable exc, BlockingQueue<Boolean> attachment) {
}

});

done.take();
position += receiveBytes;
buffer.clear();
}

inChannel.close();
outAsynchChannel.close();

Transfer data using DatagramChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ========Datagram Server==================

DatagramChannel server = DatagramChannel.open();
server.bind(new InetSocketAddress(9100));

// receive buffer from client
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress remoteAddr = server.receive(buffer);

String s = ". I am server";
for (int i=0; i<s.length(); i++) {
buffer.put((byte) s.charAt(i));
}

buffer.flip();

// Send a response string to client
server.send(buffer, remoteAddr);
server.close();

// =============Datagram Client================
DatagramChannel client = DatagramChannel.open();
client.bind(null);

String msg = "hello";
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.put(msg.getBytes());
buffer.flip();

// send buffer to server
InetSocketAddress serverAddr = new InetSocketAddress("localhost", 9100);
client.send(buffer, serfverAddr);

// receive buffer from server
buffer.clear();
client.receive(buffer);
buffer.flip();

// Convert ByteBuffer to string
int limit = buffer.limit();
byte bytes[] = new byte[limit];
buffer.get(bytes, 0, limit);
String s = new String(bytes);

推荐文章