利用异步I/O复制文件及详解
AIO 簡介
Linux 異步 I/O 是 Linux 內核中提供的一個相當新的增強。它是 2.6 版本內核的一個標準特性,但是我們在 2.4 版本內核的補丁中也可以找到它。AIO 背后的基本思想是允許進程發起很多 I/O 操作,而不用阻塞或等待任何操作完成。稍后或在接收到 I/O 操作完成的通知時,進程就可以檢索 I/O 操作的結果。
?
I/O 操作。這個 API 的展示顯示了如何使用它。
AIO API
AIO 接口的 API 非常簡單,但是它為數據傳輸提供了必需的功能,并給出了兩個不同的通知模型。表 1 給出了 AIO 的接口函數,本節稍后會更詳細進行介紹。
表 1. AIO 接口 API
| aio_read | 請求異步讀操作 |
| aio_error | 檢查異步請求的狀態 |
| aio_return | 獲得完成的異步請求的返回狀態 |
| aio_write | 請求異步寫操作 |
| aio_suspend | 掛起調用進程,直到一個或多個異步請求已經完成(或失敗) |
| aio_cancel | 取消異步 I/O 請求 |
| lio_listio | 發起一系列 I/O 操作 |
每個 API 函數都使用?aiocb?結構開始或檢查。這個結構有很多元素,但是清單 1 僅僅給出了需要(或可以)使用的元素。
清單 1. aiocb 結構中相關的域
| 1 2 3 4 5 6 7 8 9 10 11 12 | struct aiocb { ? ??int aio_fildes;?????????????? // File Descriptor ??int aio_lio_opcode;?????????? // Valid only for lio_listio (r/w/nop) ??volatile void *aio_buf;?????? // Data Buffer ??size_t aio_nbytes;??????????? // Number of Bytes in Data Buffer ??struct sigevent aio_sigevent; // Notification Structure ? ??/* Internal fields */ ??... ? }; |
sigevent?結構告訴 AIO 在 I/O 操作完成時應該執行什么操作。我們將在 AIO 的展示中對這個結構進行探索。現在我們將展示各個 AIO 的 API 函數是如何工作的,以及我們應該如何使用它們。?
aio_read
aio_read?函數請求對一個有效的文件描述符進行異步讀操作。這個文件描述符可以表示一個文件、套接字甚至管道。aio_read?函數的原型如下:
| 1 | int aio_read( struct aiocb *aiocbp ); |
aio_read?函數在請求進行排隊之后會立即返回。如果執行成功,返回值就為 0;如果出現錯誤,返回值就為 -1,并設置?errno?的值。
要執行讀操作,應用程序必須對?aiocb?結構進行初始化。下面這個簡短的例子就展示了如何填充?aiocb?請求結構,并使用?aio_read?來執行異步讀請求(現在暫時忽略通知)操作。它還展示了?aio_error?的用法,不過我們將稍后再作解釋。
清單 2. 使用 aio_read 進行異步讀操作的例子
#include <aio.h>...int fd, ret;struct aiocb my_aiocb;fd = open( "file.txt", O_RDONLY );if (fd < 0) perror("open");/* Zero out the aiocb structure (recommended) */bzero( (char *)&my_aiocb, sizeof(struct aiocb) );/* Allocate a data buffer for the aiocb request */my_aiocb.aio_buf = malloc(BUFSIZE+1);if (!my_aiocb.aio_buf) perror("malloc");/* Initialize the necessary fields in the aiocb */my_aiocb.aio_fildes = fd;my_aiocb.aio_nbytes = BUFSIZE;my_aiocb.aio_offset = 0;ret = aio_read( &my_aiocb );if (ret < 0) perror("aio_read");while ( aio_error( &my_aiocb ) == EINPROGRESS ) ;if ((ret = aio_return( &my_iocb )) > 0) {/* got ret bytes on the read */} else {/* read failed, consult errno */}在清單 2 中,在打開要從中讀取數據的文件之后,我們就清空了?aiocb?結構,然后分配一個數據緩沖區。并將對這個數據緩沖區的引用放到?aio_buf?中。然后,我們將?aio_nbytes?初始化成緩沖區的大小。并將?aio_offset?設置成 0(該文件中的第一個偏移量)。我們將?aio_fildes?設置為從中讀取數據的文件描述符。在設置這些域之后,就調用?aio_read?請求進行讀操作。我們然后可以調用?aio_error?來確定?aio_read?的狀態。只要狀態是?EINPROGRESS,就一直忙碌等待,直到狀態發生變化為止?,F在,請求可能成功,也可能失敗。
使用 AIO 接口來編譯程序
我們可以在?aio.h?頭文件中找到函數原型和其他需要的符號。在編譯使用這種接口的程序時,我們必須使用 POSIX 實時擴展庫(librt)。
注意使用這個 API 與標準的庫函數從文件中讀取內容是非常相似的。除了?aio_read?的一些異步特性之外,另外一個區別是讀操作偏移量的設置。在傳統的?read?調用中,偏移量是在文件描述符上下文中進行維護的。對于每個讀操作來說,偏移量都需要進行更新,這樣后續的讀操作才能對下一塊數據進行尋址。對于異步 I/O 操作來說這是不可能的,因為我們可以同時執行很多讀請求,因此必須為每個特定的讀請求都指定偏移量。
aio_error
aio_error?函數被用來確定請求的狀態。其原型如下:
| 1 | int aio_error( struct aiocb *aiocbp ); |
這個函數可以返回以下內容:
- EINPROGRESS,說明請求尚未完成
- ECANCELLED,說明請求被應用程序取消了
- -1,說明發生了錯誤,具體錯誤原因可以查閱?errno
aio_return
異步 I/O 和標準塊 I/O 之間的另外一個區別是我們不能立即訪問這個函數的返回狀態,因為我們并沒有阻塞在?read?調用上。在標準的?read?調用中,返回狀態是在該函數返回時提供的。但是在異步 I/O 中,我們要使用?aio_return?函數。這個函數的原型如下:
| 1 | ssize_t aio_return( struct aiocb *aiocbp ); |
只有在?aio_error?調用確定請求已經完成(可能成功,也可能發生了錯誤)之后,才會調用這個函數。aio_return?的返回值就等價于同步情況中?read?或?write?系統調用的返回值(所傳輸的字節數,如果發生錯誤,返回值就為?-1)。
aio_write
aio_write?函數用來請求一個異步寫操作。其函數原型如下:
| 1 | int aio_write( struct aiocb *aiocbp ); |
aio_write?函數會立即返回,說明請求已經進行排隊(成功時返回值為?0,失敗時返回值為?-1,并相應地設置?errno)。
這與?read?系統調用類似,但是有一點不一樣的行為需要注意?;叵胍幌聦τ?read?調用來說,要使用的偏移量是非常重要的。然而,對于?write?來說,這個偏移量只有在沒有設置?O_APPEND?選項的文件上下文中才會非常重要。如果設置了?O_APPEND,那么這個偏移量就會被忽略,數據都會被附加到文件的末尾。否則,aio_offset?域就確定了數據在要寫入的文件中的偏移量。
aio_suspend
我們可以使用?aio_suspend?函數來掛起(或阻塞)調用進程,直到異步請求完成為止,此時會產生一個信號,或者發生其他超時操作。調用者提供了一個?aiocb?引用列表,其中任何一個完成都會導致?aio_suspend?返回。?aio_suspend?的函數原型如下:
| 1 | int aio_suspend( const struct aiocb *const cblist[], int n, const struct timespec *timeout ); |
aio_suspend?的使用非常簡單。我們要提供一個?aiocb?引用列表。如果任何一個完成了,這個調用就會返回?0。否則就會返回?-1,說明發生了錯誤。請參看清單 3。
清單 3. 使用 aio_suspend 函數阻塞異步 I/O
struct aioct *cblist[MAX_LIST]/* Clear the list. */ bzero( (char *)cblist, sizeof(cblist) );/* Load one or more references into the list */ cblist[0] = &my_aiocb;ret = aio_read( &my_aiocb );ret = aio_suspend( cblist, MAX_LIST, NULL );注意,aio_suspend?的第二個參數是?cblist?中元素的個數,而不是?aiocb?引用的個數。cblist?中任何?NULL?元素都會被?aio_suspend?忽略。
如果為?aio_suspend?提供了超時,而超時情況的確發生了,那么它就會返回?-1,errno?中會包含?EAGAIN。
aio_cancel
aio_cancel?函數允許我們取消對某個文件描述符執行的一個或所有 I/O 請求。其原型如下:
| 1 | int aio_cancel( int fd, struct aiocb *aiocbp ); |
要取消一個請求,我們需要提供文件描述符和?aiocb?引用。如果這個請求被成功取消了,那么這個函數就會返回?AIO_CANCELED。如果請求完成了,這個函數就會返回?AIO_NOTCANCELED。
要取消對某個給定文件描述符的所有請求,我們需要提供這個文件的描述符,以及一個對?aiocbp?的?NULL?引用。如果所有的請求都取消了,這個函數就會返回?AIO_CANCELED;如果至少有一個請求沒有被取消,那么這個函數就會返回?AIO_NOT_CANCELED;如果沒有一個請求可以被取消,那么這個函數就會返回?AIO_ALLDONE。我們然后可以使用?aio_error?來驗證每個 AIO 請求。如果這個請求已經被取消了,那么?aio_error?就會返回?-1,并且?errno?會被設置為?ECANCELED。
lio_listio
最后,AIO 提供了一種方法使用?lio_listio?API 函數同時發起多個傳輸。這個函數非常重要,因為這意味著我們可以在一個系統調用(一次內核上下文切換)中啟動大量的 I/O 操作。從性能的角度來看,這非常重要,因此值得我們花點時間探索一下。lio_listio?API 函數的原型如下:
| 1 2 | int lio_listio( int mode, struct aiocb *list[], int nent, ???????????????????struct sigevent *sig ); |
mode?參數可以是?LIO_WAIT?或?LIO_NOWAIT。LIO_WAIT?會阻塞這個調用,直到所有的 I/O 都完成為止。在操作進行排隊之后,LIO_NOWAIT?就會返回。list?是一個?aiocb?引用的列表,最大元素的個數是由?nent?定義的。注意?list?的元素可以為?NULL,lio_listio?會將其忽略。sigevent?引用定義了在所有 I/O 操作都完成時產生信號的方法。
對于?lio_listio?的請求與傳統的?read?或?write?請求在必須指定的操作方面稍有不同,如清單 4 所示。
清單 4. 使用 lio_listio 函數發起一系列請求
struct aiocb aiocb1, aiocb2; struct aiocb *list[MAX_LIST];.../* Prepare the first aiocb */ aiocb1.aio_fildes = fd; aiocb1.aio_buf = malloc( BUFSIZE+1 ); aiocb1.aio_nbytes = BUFSIZE; aiocb1.aio_offset = next_offset; aiocb1.aio_lio_opcode = LIO_READ;...bzero( (char *)list, sizeof(list) ); list[0] = &aiocb1; list[1] = &aiocb2;ret = lio_listio( LIO_WAIT, list, MAX_LIST, NULL );對于讀操作來說,aio_lio_opcode?域的值為?LIO_READ。對于寫操作來說,我們要使用?LIO_WRITE,不過?LIO_NOP?對于不執行操作來說也是有效的。
AIO 通知
現在我們已經看過了可用的 AIO 函數,本節將深入介紹對異步通知可以使用的方法。我們將通過信號和函數回調來探索異步函數的通知機制。
使用信號進行異步通知
使用信號進行進程間通信(IPC)是 UNIX 中的一種傳統機制,AIO 也可以支持這種機制。在這種范例中,應用程序需要定義信號處理程序,在產生指定的信號時就會調用這個處理程序。應用程序然后配置一個異步請求將在請求完成時產生一個信號。作為信號上下文的一部分,特定的?aiocb?請求被提供用來記錄多個可能會出現的請求。清單 5 展示了這種通知方法。
清單 5. 使用信號作為 AIO 請求的通知
void setup_io( ... ) {int fd;struct sigaction sig_act;struct aiocb my_aiocb;.../* Set up the signal handler */sigemptyset(&sig_act.sa_mask);sig_act.sa_flags = SA_SIGINFO;sig_act.sa_sigaction = aio_completion_handler;/* Set up the AIO request */bzero( (char *)&my_aiocb, sizeof(struct aiocb) );my_aiocb.aio_fildes = fd;my_aiocb.aio_buf = malloc(BUF_SIZE+1);my_aiocb.aio_nbytes = BUF_SIZE;my_aiocb.aio_offset = next_offset;/* Link the AIO request with the Signal Handler */my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;my_aiocb.aio_sigevent.sigev_signo = SIGIO;my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;/* Map the Signal to the Signal Handler */ret = sigaction( SIGIO, &sig_act, NULL );...ret = aio_read( &my_aiocb );}void aio_completion_handler( int signo, siginfo_t *info, void *context ) {struct aiocb *req;/* Ensure it's our signal */if (info->si_signo == SIGIO) {req = (struct aiocb *)info->si_value.sival_ptr;/* Did the request complete? */if (aio_error( req ) == 0) {/* Request completed successfully, get the return status */ret = aio_return( req );}}return; }在清單 5 中,我們在?aio_completion_handler?函數中設置信號處理程序來捕獲?SIGIO?信號。然后初始化?aio_sigevent?結構產生?SIGIO?信號來進行通知(這是通過?sigev_notify?中的?SIGEV_SIGNAL?定義來指定的)。當讀操作完成時,信號處理程序就從該信號的?si_value?結構中提取出?aiocb,并檢查錯誤狀態和返回狀態來確定 I/O 操作是否完成。
對于性能來說,這個處理程序也是通過請求下一次異步傳輸而繼續進行 I/O 操作的理想地方。采用這種方式,在一次數據傳輸完成時,我們就可以立即開始下一次數據傳輸操作。
使用回調函數進行異步通知
另外一種通知方式是系統回調函數。這種機制不會為通知而產生一個信號,而是會調用用戶空間的一個函數來實現通知功能。我們在?sigevent?結構中設置了對?aiocb?的引用,從而可以惟一標識正在完成的特定請求。請參看清單 6。
清單 6. 對 AIO 請求使用線程回調通知
void setup_io( ... ) {int fd;struct aiocb my_aiocb;.../* Set up the AIO request */bzero( (char *)&my_aiocb, sizeof(struct aiocb) );my_aiocb.aio_fildes = fd;my_aiocb.aio_buf = malloc(BUF_SIZE+1);my_aiocb.aio_nbytes = BUF_SIZE;my_aiocb.aio_offset = next_offset;/* Link the AIO request with a thread callback */my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;my_aiocb.aio_sigevent.notify_function = aio_completion_handler;my_aiocb.aio_sigevent.notify_attributes = NULL;my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;...ret = aio_read( &my_aiocb );}void aio_completion_handler( sigval_t sigval ) {struct aiocb *req;req = (struct aiocb *)sigval.sival_ptr;/* Did the request complete? */if (aio_error( req ) == 0) {/* Request completed successfully, get the return status */ret = aio_return( req );}return; }在清單 6 中,在創建自己的?aiocb?請求之后,我們使用?SIGEV_THREAD?請求了一個線程回調函數來作為通知方法。然后我們將指定特定的通知處理程序,并將要傳輸的上下文加載到處理程序中(在這種情況中,是個對?aiocb?請求自己的引用)。在這個處理程序中,我們簡單地引用到達的?sigval?指針并使用 AIO 函數來驗證請求已經完成。
對 AIO 進行系統優化
proc 文件系統包含了兩個虛擬文件,它們可以用來對異步 I/O 的性能進行優化:
- /proc/sys/fs/aio-nr 文件提供了系統范圍異步 I/O 請求現在的數目。
- /proc/sys/fs/aio-max-nr 文件是所允許的并發請求的最大個數。最大個數通常是 64KB,這對于大部分應用程序來說都已經足夠了。
結束語
使用異步 I/O 可以幫助我們構建 I/O 速度更快、效率更高的應用程序。如果我們的應用程序可以對處理和 I/O 操作重疊進行,那么 AIO 就可以幫助我們構建可以更高效地使用可用 CPU 資源的應用程序。盡管這種 I/O 模型與在大部分 Linux 應用程序中使用的傳統阻塞模式都不同,但是異步通知模型在概念上來說卻非常簡單,可以簡化我們的設計。
?
實例:
#include "include/apue.h"
#include <ctype.h>
#include <fcntl.h>
#include <aio.h>
#include <errno.h>
#define BSZ 4096
#define NBUF 8
enum rwop{
?? ?UNUSED = 0,
?? ?READ_PENDING = 1,
?? ?WRITE_PENDING = 2
};
struct buf{
?? ?enum rwop op;
?? ?int last;
?? ?struct aiocb aiocb;
?? ?unsigned char data[BSZ];
};
struct buf bufs[NBUF];
unsigned char translate(unsigned char c)
{
?? ?return c;
}
int main(int argc, char *argv[])
{
?? ?int ifd, ofd, i, j, n, err, numop;
?? ?struct stat sbuf;
?? ?const struct aiocb *aiolist[NBUF];
?? ?off_t off = 0;
?? ?
?? ?if (argc != 3)
?? ??? ?err_quit("usage : rot13 infile outfile");
?? ?if ((ifd = open(argv[1], O_RDONLY)) < 0)
?? ??? ?err_sys("can not open %s", argv[1]);
?? ?if ((ofd = open(argv[2], O_RDWR|O_CREAT|O_TRUNC, FILE_MODE)) < 0)
?? ??? ?err_sys("can not create %s", argv[2]);
?? ?if (fstat(ifd, &sbuf) < 0)
?? ??? ?err_sys("fstat failed");
?? ?//初始化NBUF
?? ?for (i = 0; i < NBUF; ++i)
?? ?{
?? ??? ?bufs[i].op = UNUSED;
?? ??? ?bufs[i].aiocb.aio_buf = bufs[i].data;
?? ??? ?bufs[i].aiocb.aio_sigevent.sigev_notify = SIGEV_NONE;
?? ??? ?aiolist[i] = NULL;
?? ?}
?? ?
?? ?numop = 0;
?? ?for ( ; ; )
?? ?{
?? ??? ?for (i = 0; i < NBUF; ++i)
?? ??? ?{
?? ??? ??? ?switch (bufs[i].op)
?? ??? ??? ?{
?? ??? ??? ?case UNUSED:
?? ??? ??? ??? ?if (off < sbuf.st_size)
?? ??? ??? ??? ?{
?? ??? ??? ??? ??? ?//如果還有更多數據未讀,則從輸入文件讀取
?? ??? ??? ??? ??? ?bufs[i].op = READ_PENDING;
?? ??? ??? ??? ??? ?bufs[i].aiocb.aio_fildes = ifd;
?? ??? ??? ??? ??? ?bufs[i].aiocb.aio_offset = off;
?? ??? ??? ??? ??? ?off += BSZ;
?? ??? ??? ??? ??? ?if (off >= sbuf.st_size)
?? ??? ??? ??? ??? ?{
?? ??? ??? ??? ??? ??? ?bufs[i].last = 1;
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?bufs[i].aiocb.aio_nbytes = BSZ;
?? ??? ??? ??? ??? ?if (aio_read(&bufs[i].aiocb) < 0)
?? ??? ??? ??? ??? ??? ?err_sys("aio_read failed");
?? ??? ??? ??? ??? ?aiolist[i] = &bufs[i].aiocb;
?? ??? ??? ??? ??? ?numop++;
?? ??? ??? ??? ?}
?? ??? ??? ??? ?
?? ??? ??? ??? ?break;
?? ??? ??? ?case READ_PENDING:
?? ??? ??? ??? ?if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)
?? ??? ??? ??? ??? ?continue;
?? ??? ??? ??? ?if (err != 0)
?? ??? ??? ??? ?{
?? ??? ??? ??? ??? ?if (err == -1)
?? ??? ??? ??? ??? ??? ?err_sys("aio_error failed");
?? ??? ??? ??? ??? ?else
?? ??? ??? ??? ??? ??? ?err_exit(err, "read failed");
?? ??? ??? ??? ?}
?? ??? ??? ??? ?
?? ??? ??? ??? ?//讀取完成;轉換緩沖區并寫入
?? ??? ??? ??? ?if ((n = aio_return(&bufs[i].aiocb)) < 0)
?? ??? ??? ??? ??? ?err_sys("aio_return failed");
?? ??? ??? ??? ?if (n != BSZ && !bufs[i].last)
?? ??? ??? ??? ??? ?err_quit("short read (%d%d)", n, BSZ);
?? ??? ??? ??? ?for (j = 0; j < n; ++j)
?? ??? ??? ??? ??? ?bufs[i].data[j] = translate(bufs[i].data[j]);
?? ??? ??? ??? ?bufs[i].op = WRITE_PENDING;
?? ??? ??? ??? ?bufs[i].aiocb.aio_fildes = ofd;
?? ??? ??? ??? ?bufs[i].aiocb.aio_nbytes = n;
?? ??? ??? ??? ?if (aio_write(&bufs[i].aiocb) < 0)
?? ??? ??? ??? ??? ?err_sys("aio_write failed");
?? ??? ??? ??? ?//保留我們在aiolist的位置
?? ??? ??? ??? ?break;
?? ??? ??? ?case WRITE_PENDING:
?? ??? ??? ??? ?if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS)
?? ??? ??? ??? ??? ?continue;
?? ??? ??? ??? ?if (err != 0)
?? ??? ??? ??? ?{
?? ??? ??? ??? ??? ?if (err == -1)
?? ??? ??? ??? ??? ??? ?err_sys("aio_error failed");
?? ??? ??? ??? ??? ?else
?? ??? ??? ??? ??? ??? ?err_exit(err, "write failed");
?? ??? ??? ??? ?}
?? ??? ??? ??? ?
?? ??? ??? ??? ?//寫入完成:將緩沖區標記為未使用
?? ??? ??? ??? ?if ((n = aio_return(&bufs[i].aiocb)) < 0)
?? ??? ??? ??? ??? ?err_sys("aio_return failed");
?? ??? ??? ??? ?if (n != bufs[i].aiocb.aio_nbytes)
?? ??? ??? ??? ??? ?err_quit("short write (%d%d)", n, BSZ);
?? ??? ??? ??? ?aiolist[i] = NULL;
?? ??? ??? ??? ?bufs[i].op = UNUSED;
?? ??? ??? ??? ?numop--;
?? ??? ??? ??? ?break;
?? ??? ??? ?}
?? ??? ?}
?? ??? ?
?? ??? ?if (numop == 0)
?? ??? ?{
?? ??? ??? ?if (off >= sbuf.st_size)
?? ??? ??? ??? ?break;
?? ??? ?}
?? ??? ?else
?? ??? ?{
?? ??? ??? ?if (aio_suspend(aiolist, NBUF, NULL) < 0)
?? ??? ??? ??? ?err_sys("aio_suspend failed");
?? ??? ?}
?? ?}
?? ?
?? ?bufs[0].aiocb.aio_fildes = ofd;
?? ?if (aio_fsync(O_SYNC, &bufs[0].aiocb) < 0)
?? ??? ?err_sys("aio_fsync failed");
?? ?
?? ?return 0;
}
?
編譯:?gcc sample.c -lrt?
總結
以上是生活随笔為你收集整理的利用异步I/O复制文件及详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Z-Stack Home Develop
- 下一篇: Docker安装(安装docker)