IO

IO - Input 和 Output

5 种 I/O 模型

  • 阻塞式 I/O

可能阻塞的套接字调用可分为如下几类:

  • 输入操作: read、readv、recv、recvfrom、recvmsg 5 个函数
  • 输出操作: write、writev、send、sendto、sendmsg 5 个函数
  • 接受外来连接: accept 函数
  • 发出外来连接: connect 函数

标准访问文的方式: 当应用程序调用 read() 接口时,操作系统检查在内核的告诉缓存中有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回。写入的时候,从用户地址空间复制到内核地址空间的缓存中,什么时候写到磁盘由操作系统决定。

直接 I/O 的方式: 应用程序直接访问磁盘数据,不经过操作系统内核数据缓冲区,减少一次从内核缓冲区到用户数据缓存的数据复制。这种访问文件的方式通常是在对数据的缓存管理由应用程序的数据库管理系统中。

  • 非阻塞式 I/O

设置非阻塞 I/O:

1
2
int val = fcntl(STDOUT_FILENO, F_GETFL, 0);
fcntl(STDOUT_FILENO, F_SETFL, val | O_NONBLOCK);
  • I/O 复用 (selectpoll)

  • 信号驱动式 I/O (SIGIO)

  • 异步 I/O (POSIX 的 aio_ 系列函数)

信号驱动式 I/O 是由内核通知我们何时可以启动一个 I/O 操作,而异步 I/O 模型是由内核通知我们何时完成

5 种 I/O 模型比较:

select 指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它,函数原型:

1
2
3
#include <sys/select.h>
#include <time.h>
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timespec *timeout);

我们可以在下列情况收到内核通知:

  • 集合 {1, 4, 5} 中的任何描述符准备好读
  • 集合 {2, 7} 中的任何描述符准备好写
  • 集合 {1, 4} 中的任何描述符有异常条件待处理
  • 已经历了 10.2 秒
1
2
3
4
5
6
7
8
9
10
fd_set rset; // 比特位
FD_ZERO(&rset);
FD_SET(1, &rset);
FD_SET(5, &rset);

select(maxfdp1, &rset, NULL, NULL, NULL);

if (FD_ISSET(sockfd, &rset)) {
// socket is readable
}

select 最大缺陷就是单个进程所打开的文件描述符是有一定限制的,它由 FD_SETSIZE 设置,默认值是 1024。可以选择修改这个宏然后重新编译内核,不过这会带来网络效率的下降。

pselect 函数:

1
2
3
4
#include <sys/select.h>
#include <signal.h>
#include <time.h>
int pselect(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timespec *timeout, const sigset_t *sigmask);

pselect 是能够处理信号阻塞并提供更高事件分辨率的 select 的增强版本。

poll 函数:

1
2
#include <poll.h>
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

epoll 函数:

  • 一个进程打开的 socket 描述符不受限制 (仅受限于操作系统的最大文件句柄数 cat /proc/sys/fs/file-max )
  • I/O 效率不会随着文件描述符数目的增加而线性下降,select/poll 每次都会线性扫描全部的集合
  • 把内核和用户空间 mmap 到同一块内存来加速这两者之间的消息传递

文件 I/O

在 UNIX 系统上大多数 I/O 只需要通过这五个函数完成: open, read, write, lseek, and close。相对于标准 I/O,我们这里讨论的都是 unbuffered I/O (每一次 readwrite 都会产生一个系统调用)

通常,UNIX 系统 shell 把文件描述符 0 与进程的标准输入关联,文件描述符 1 与标准输出关联,文件描述符 2 与标准错误关联。

文件描述符的最大限制:

1
cat /proc/sys/fs/file-max # 599216

文件描述符的当前数量:

1
cat /proc/sys/fs/file-nr

函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <fcntl.h>
int open(const char *path, int oflag, ... /* mode_t mode */ );
int creat(const char *path, mode_t mode);
int fcntl(int fd, int cmd, ... /* int arg */); // 改变已经打开文件的属性

#include <unistd.h>
int close(int fd); // 成功: 返回 0
off_t lseek(int fd, off_t offset, int whence); // 成功: 返回新的文件偏移量
ssize_t read(int fd, void *buf, size_t nbytes); // 成功: 读到的字节数; 文件末尾: 返回 0
ssize_t write(int fd, const void *buf, size_t nbytes); // 成功: 已写的字节数
int dup(int fd); // 复制一个现有的文件描述符
void sync(void); // 将所有修改的块缓冲区排入写队列,然后返回,并不等待磁盘操作结束
int fsync(int fd); // 等待磁盘写操作结束

#include <sys/ioctl.h>
int ioctl(int fd, int request, ...); // I/O 操作的杂物箱,一般用于中断 I/O

调用 dup(fd) 等效于 fcntl(fd, F_DUPFD, 0); fcntl 函数可以:

  • 复制一个已有的文件描述符
  • 获取/设置文件描述符标志
  • 获取/设置文件状态标志
  • 获取/设置异步 I/O 所有权
  • 获取/设置记录锁

标准 I/O

文件 I/O 围绕文件描述符,标准 I/O 围绕流

Binary I/O

一次读或写一个 structure,下述两个函数提供了二进制 I/O:

1
2
3
4
5
#include <stdio.h>
size_t fread(void *restrict ptr, size_t size, size_t nobj,
FILE *restrict fp);
size_t fwrite(const void *restrict ptr, size_t size, size_t nobj,
FILE *restrict fp);

我们可以这样操作一个结构体:

1
2
3
4
5
6
7
8
struct {
short count;
long total;
char name[NAMESIZE];
} item;

if (fwrite(&item, sizeof(item), 1, fp) != 1)
err_sys("fwrite error");

基于字节的 Java I/O 操作接口:

基于字符的 Java I/O 操作接口:

字节与字符的转化接口: InputStreamReader 派生出 FileReader

高级 I/O

涵盖了一些 非阻塞 I/O、记录锁、I/O 多路复用 (selectpoll)、异步 I/O、readvwritev、内存映射 I/O (mmap)

readvwritev

这两个函数可以让我们从多个缓冲区里面读和写,这些操作称之为 scatter readgather write

1
2
3
4
5
6
7
8
#include <sys/uio.h>
ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);

struct iovec {
void *iov_base; /* starting address of buffer */
size_t iov_len; /* size of buffer */
};

Memory-Mapped I/O 内存映射 I/O

Memory-Mapped I/O 让我们可以将位于磁盘上的一个文件映射到内存上的一个缓冲区中,当我们从缓冲区中读,我们就是从文件中读;当我们向缓冲区写,对象的文件也被写入了。Memory-Mapped I/O 已经被用于 virtual memory systems 很多年了。

1
2
#include <sys/mman.h>
void *mmap(void *addr, size_t len, int prot, int flag, int fd, off_t off );

内存映射文件:

Client/Server IO

BIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ServerSocket server = null;
try {
server = new ServerSocket(port);
Socket socket = null;
while (true) {
socket = server.accept();
new Thread(new Request(socket)).start();
}
} finally {
if (server != null) {
server.close();
server = null;
}
}

伪异步 IO

1
2
3
4
5
TimeServerHandlerExcutePool singleExecutor = new TimeServerHandlerExecutePool(50, 10000);
while (true) {
socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}

当对 Socket 的输入流进行读取的操作的时候,它会一直阻塞下去,直到发生三种事件:

  • 有数据可读
  • 可用数据已经读取完毕
  • 发生空指针或 I/O 异常

这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输缓慢时,读取输入流的一方的通信线程将被长时间阻塞,如果对方要 60s 才能够将数据发送完成,读取一方的 I/O 线程也将会被同步阻塞 60s,在此期间,其他接入消息只能在消息队列中排队。当调用 OutputStreamwrite 方法时,也面临同样的问题。当消息的接收方处理缓慢的时候,将不能及时地从 TCP 缓冲区中读取数据,这将会导致发送方的 TCP window size 不断减小,直到为 0,双方处于 Keep-Alive 状态,消息发送方将不能再向 TCP 缓冲区写入消息,这时如果采用的是同步阻塞 I/O,write 操作将会被无期限阻塞,直到 TCP window size 大于 0 或者发生 I/O 异常。

NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
}
}

@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectonKey> selectedKeys = selector.selectedKeys();
}
}
}
// 代码未完待续 ...

由于 SocketChannel 是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现 “写半包” 问题。我们需要注册写操作,不断轮询 Selector 将没有发送完的 ByteBuffer 发送完毕,然后可以通过 ByteBufferhasRemain() 方法判断消息是否发送完成。

相比于 BIO,NIO 的一个明显的好处是不需要为每一个 Socket 分配一个线程,而可以在一个线程中处理多个 Socket 套接字相关的工作。

通过 Channel 对象获取的 I/O 数据首先要经过操作系统的 Socket 缓冲区,再将数据复制到 Buffer 中,这个操作系统缓冲区就是底层的 TCP 所关联的 RecvQ 或者 SendQ 队列,从操作系统缓冲区到用户缓冲区复制数据比较消耗性能,Buffer 还提供了另外一种直接操作系统缓冲区的方式,即 ByteBuffer.allocateDirector(size),这个方法返回的 DirectByteBuffer 就是与底层存储空间关联的缓冲区,它通过 Native 代码操作非 JVM 堆的内存空间。每次创建或者释放的时候都会调用一次 System.gc()。一般在数据量比较大、生命周期比较长的情况下比较合适。

FileChannel.transferXXX 与传统的访问文件方式相比可以减少数据从内核到用户空间的复制,数据直接在内核空间中移动:

FileChannel.map 将文件按照一定大小块映射为内存区域,当程序访问这个内存区域时,将直接操作这个文件数据,这种方式省去了数据从内核空间向用户空间复制的损耗。这种方式适合对大文件的只读性操作,如大文件的 MD5 校验。

AIO 编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AsyncTimeServerHandler implements Runnable {

CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;

public AsyncTimeServerHandler(int port) {
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
}
}

@Override
public void run() {
latch = new CountDownLatch();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}

@Override
public void completed(AsynchronousServerSocketChannel result, AcceptCompletionHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
}

}
// 代码未完继续 ...

I/O 调优

性能检测: 通过压力测试,看系统 I/O wait 指标是否异常。例如,测试机器有 4 个 CPU,那么理想的 I/O wait 参数不应该超过 25%,如果超过,则 I/O 很可能成为应用程序的性能瓶颈。在 Linux 下通过 iostat 命令查看。通常我们还会查看另外一个参数,就是 IOPS,即要查看应用程序需要的最低的 IOPS 是多少,磁盘的 IOPS 能不能达到要求。

1
(磁盘数 * 每块磁盘的 IOPS) / (磁盘块的吞吐量 + RAID 因子 * 磁盘写的吞吐量) = IOPS

TCP 连接查看主机可以使用的端口范围:

1
cat /proc/sys/net/ipv4/ip_local_port_range

可用端口数量: 60999 - 32768 = 28231,如果这个值偏小,则遇到大量并发请求时就会成为性能瓶颈。

网络调优参数 说明
echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range 设置端口可用范围
echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse 设置 time_wait 连接重用
echo 1 > /proc/sys/net/ipv4/tcp_tw_recycle 设置快速回收 time_wait 连接
echo 180000 > /proc/sys/net/ipv4/tcp_max_tw_buckets 设置最大 time_wait 连接长度
echo 0 > /proc/sys/net/ipv4/tcp_timestamps 表示是否启用以一种比超时重发更精确的方法来启用对 RTT 的计算
echo 1 > /proc/sys/net/ipv4/tcp_window_scaling 设置 TCP/IP 会话的滑动窗口大小是否可变
echo 20000 > /proc/sys/net/ipv4/tcp_max_syn_backlog 设置最大等待处于客户端还没有应答回来的连接数
echo 10000 > /proc/sys/net/core/somaxconn 设置每一个处于监听状态的端口的监听队列的长度
echo 10000 > /proc/sys/net/core/netdev_max_backlog 设置最大等待 CPU 处理的包的数目
echo 2000000 > /proc/sys/fs/file-max 设置最大打开的文件数

以上设置都是临时性的,系统重新启动后就会丢失。

Buffered I/O

BufferedOutputStream 经常被用在避免频繁操作磁盘和网络的地方(参考):

1
2
3
BufferedOutputStream bos = new BufferedOutputStream(zip);
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(prices);

ByteArrayOutputStream 只是在内存里面做一层缓冲:

1
2
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream zip = new GZIPOutputStream(baos);

文件 I/O - CSAPP

文件描述符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

int main() {
int fd1, fd2;
fd1 = open("data/foo.txt", O_RDONLY, 0);
close(fd1);
fd2 = open("data/baz.txt", O_RDONLY, 0);
// fd2 = 3
printf("fd2 = %d\n", fd2);
return 0;
}

Unix 进程生命周期开始时,打开的描述符赋给了 stdin (0)stdout (1)stderr (2)open 函数总是返回最低的未打开的描述符,所以第一次调用 open 会返回描述符 3调用 close 函数会释放描述符 3。最后依然会返回 3


一次一个字节从标准输入拷贝到标准输出:

1
2
3
4
5
6
7
8
9
10
11
12
#include <stdio.h>
#include <unistd.h>

int main() {
char c;

while (read(STDIN_FILENO, &c, 1) != 0) {
write(STDOUT_FILENO, &c, 1);
}

return 0;
}


读取文件元数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>

int main(int argc, char **argv) {
struct stat stat_buf;
char *type, *readok;

stat(argv[1], &stat_buf);
if (S_ISREG(stat_buf.st_mode)) {
type = "regular";
} else if (S_ISDIR(stat_buf.st_mode)) {
type = "directory";
} else {
type = "other";
}

if ((stat_buf.st_mode & S_IRUSR)) {
readok = "yes";
} else {
readok = "no";
}

printf("type: %s, read: %s\n", type, readok);

return 0;
}

共享文件:

以同一个 filename 调用 open 函数两次,这个时候,每个描述符都有它自己的文件位置,所以对不同描述符的读操作可以从文件的不同位置获取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main() {
int fd1, fd2;
char c;

// foobar
fd1 = open("data/foobar.txt", O_RDONLY, 0);
fd2 = open("data/foobar.txt", O_RDONLY, 0);
read(fd1, &c, 1);
read(fd2, &c, 1);
// c = f
printf("c = %c\n", c);

return 0;
}

fork 之后,子进程有一个父进程的描述符表的副本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>

int main() {
int fd;
char c;

fd = open("data/foobar.txt", O_RDONLY, 0);
if (fork() == 0) {
read(fd, &c, 1);
return 0;
}

wait(NULL);
read(fd, &c, 1);
// c = o
printf("c = %c\n", c);

return 0;
}

I/O 重定向:

1
2
3
// 拷贝描述符表项 oldfd 到描述符表表项 newfd,覆盖描述符表表项 newfd 以前的内容
// 如果 newfd 已经打开,则会在拷贝 oldfd 之前关闭 newfd
int dup2(int oldfd, int newfd);

以命令

1
ls > foo.txt

为例,描述符 1 (标准输出) 原来对应于文件 A (一个终端),描述符 4 原来对应于 B (foo.txt),经过 dup2(4, 1) 之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

int main() {
int fd1, fd2;
char c;

fd1 = open("data/foobar.txt", O_RDONLY, 0);
fd2 = open("data/foobar.txt", O_RDONLY, 0);
read(fd2, &c, 1);
// int dup2(int oldfd, int newfd);
dup2(fd2, fd1);
read(fd1, &c, 1);
// c = o
printf("c = %c\n", c);
return 0;
}

当我们试图对网络输入使用标准 I/O 时,它会带来一些令人讨厌的问题:

  • 标准 I/O 是全双工的,程序能够在同一个流上执行输入和输出。
  • 跟在输出函数之后的输入函数。如果中间没有插入 fflushfseekfsetpos 或者 rewind 的调用,一个输入函数不能跟随在一个输出函数之后。fflush 清空与流相关的缓冲区。后三个函数调用使用 Unix IOlseek 函数来重置当前的文件位置。
1
2
3
write();
fflush();
read();
  • 跟在输入函数之后的输出函数。如果中间没有插入 fseekfsetpos 或者 rewind 的调用,一个输出函数不能跟随在一个输入函数之后,除非该输入函数遇到了一个 EOF
1
2
3
4
5
6
7
8
9
10
// 一种可能解决方案
// 更正确的解决方案: 建议使用 RIO 函数
FILE *fpin, *fpout;

fpin = fdopen(sockfd, "r");
fpout = fdopen(sockfd, "w");

fclose(fpin);
// 第二个 close 操作可能会失败
fclose(fpout);

对套接字执行 lseek 函数是非法的。

InputStream#available()

InputStream 变成 byte[] 的正确操作:

1
2
3
4
5
6
7
8
9
10
11
public static byte[] inputStream2ByteArray(InputStream inputStream) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

byte[] buf = new byte[4096];
int c;
while ((c = inputStream.read(buf)) >= 0) {
byteArrayOutputStream.write(buf, 0, c);
}

return byteArrayOutputStream.toByteArray();
}

或者这样:

1
2
InputStream is;
byte[] bytes = IOUtils.toByteArray(is);

千万不要像下面这样写 (尤其是网络流):

每次返回的 available 的值都不同:

1
2
3
4
5
public static byte[] inputStream2ByteArray(InputStream inputStream) throws IOException {
byte[] buf = inputStream.available();
inputStream.read(buf);
return buf;
}

The available() method tells you 能够读取多少字节直到 the read() call will block the execution flow of your program. On most of the input streams, all call to read() are blocking, that’s why available returns 0 by default.

However, on some streams (such as BufferedInputStream, that have an 内部缓冲区), some bytes are read and kept in 内存, so you can read them without blocking the program flow. In this case, the available() method tells you how many bytes are kept in the buffer.

从文件末尾读取最多 30000 个字节的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int SHOW_LOG_LENGTH = 30000;

FileInputStream fis = new FileInputStream(file);
FileChannel channel = fis.getChannel();
size = channel.size();

ByteBuffer bb;
if (size <= SHOW_LOG_LENGTH) {
bb = ByteBuffer.allocate((int) size);
channel.read(bb, 0);
} else {
int pos = (int) (size - SHOW_LOG_LENGTH);
bb = ByteBuffer.allocate(SHOW_LOG_LENGTH);
channel.read(bb, pos);
}

bb.flip();
content = new String(bb.array());

IO 重定向

1
2
3
4
5
6
7
// 摘自 Tomcat 9 源代码
// Catalina.java
protected void initStreams() {
// Replace System.out and System.err with a custom PrintStream
System.setOut(new SystemLogHandler(System.out));
System.setErr(new SystemLogHandler(System.err));
}

参考

推荐文章