linux 内核 发送数据,linux 内核tcp数据发送的实现
在分析之前先來看下SO_RCVTIMEO和SO_SNDTIMEO套接口吧,前面分析代碼時沒太注意這兩個.這里算是個補(bǔ)充.
SO_RCVTIMEO和SO_SNDTIMEO套接口選項(xiàng)可以給套接口的讀和寫,來設(shè)置超時時間,在unix網(wǎng)絡(luò)編程中,說是他們只能用于讀和
寫,而像accept和connect都不能用他們來設(shè)置.可是我在閱讀內(nèi)核源碼的過程中看到,在linux中,accept和connect可以分別用
SO_RCVTIMEO和SO_SNDTIMEO套接口來設(shè)置超時,這里他們的超時時間也就是sock的sk_rcvtimeo和sk_sndtimeo
域.accept和connect的相關(guān)代碼我前面都介紹過了,這里再提一下.其中accept的相關(guān)部分在inet_csk_accept中,會調(diào)用
sock_rcvtimeo來取得超時時間(如果是非阻塞則忽略超時間).而connect的相關(guān)代碼在inet_stream_connect中通過調(diào)
用sock_sndtimeo來取得超時時間(如果非阻塞則忽略超時時間).
---------------------------------------------------------------------------------
tcp發(fā)送數(shù)據(jù)最終都會調(diào)用到tcp_sendmsg,舉個例子吧,比如send系統(tǒng)調(diào)用.
send系統(tǒng)調(diào)用會z直接調(diào)用sys_sendto,然后填充msghdr數(shù)據(jù)結(jié)構(gòu),并調(diào)用sock_sendmsg,而在他中,則最終會調(diào)用__sock_sendmsg.在這個函數(shù)里面會初始化sock_iocb結(jié)構(gòu),然后調(diào)用tcp_sendmsg.
在sys_sendto中還會做和前面幾個系統(tǒng)調(diào)用差不多的操作,就是通過fd得到socket,在sock_sendmsg中則會設(shè)置aio所需的操作.
我們簡要的看下__sock_sendmsg的實(shí)現(xiàn).可以看到在內(nèi)核中數(shù)據(jù)都是用msghdr來表示的(也就是會將char *轉(zhuǎn)為msghdr),而這個結(jié)構(gòu)這里就不介紹了,unix網(wǎng)絡(luò)編程里面有詳細(xì)的介紹.而struct kiocb則是aio會用到的.
static inline int __sock_sendmsg(struct kiocb *iocb, struct socket *sock,
struct msghdr *msg, size_t size)
{
struct sock_iocb *si = kiocb_to_siocb(iocb);
int err;
si->sock = sock;
si->scm = NULL;
si->msg = msg;
si->size = size;
err = security_socket_sendmsg(sock, msg, size);
if (err)
return err;
///這里就會調(diào)用tcp_sendmsg.
return sock->ops->sendmsg(iocb, sock, msg, size);
}
我們在前面知道tcp將數(shù)據(jù)傳遞給ip層的時候調(diào)用ip_queue_xmit,而在這個函數(shù)沒有做任何切片的工作,切片的工作都在tcp層完成了.而udp則是需要在ip層進(jìn)行切片(通過ip_append_data). 而tcp的數(shù)據(jù)是字節(jié)流的,因此在
tcp_sendmsg中主要做的工作就是講字節(jié)流分段(根據(jù)mss),然后傳遞給ip層. 可以看到它的任務(wù)和ip_append_data很類似,流程其實(shí)也差不多. 所以有興趣的可以看下我前面的blog
而在tcp_sendmsg中也是要看網(wǎng)卡是否支持Scatter/Gather I/O,從而進(jìn)行相關(guān)操作.
下面我們來看它的實(shí)現(xiàn),我們分段來看:
///首先取出句柄的flag,主要是看是非阻塞還是阻塞模式.
flags = msg->msg_flags;
///這里取得發(fā)送超時時間.
timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
///如果connect還沒有完成則等待連接完成(如是非阻塞則直接返回).
if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)
goto out_err;
/* This should be in poll */
clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
///取出當(dāng)前的mss,在tcp_current_mss還會設(shè)置xmit_size_goal,這個值一般都是等于mss,除非有g(shù)so的情況下,有所不同.這里我們就認(rèn)為他是和mms相等的.
mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));
size_goal = tp->xmit_size_goal;
在取得了相關(guān)的值之后我們進(jìn)入循環(huán)處理msg,我們知道m(xù)sghdr有可能是包含很多buffer的,因此這里我們分為兩層循環(huán),一層是遍歷msg的buffer,一層是對buffer進(jìn)行處理(切包或者組包)并發(fā)送給ip層.
首先來看當(dāng)buf空間不夠時的情況,它這里判斷buf空間是否足夠是通過
!tcp_send_head(sk) ||
(copy = size_goal - skb->len) <= 0
來判斷的,這里稍微解釋下這個:
這里tcp_send_head返回值為sk->sk_send_head,也就是指向當(dāng)前的將要發(fā)送的buf的位置.如果為空,則說明buf沒有空間,我們就需要alloc一個段來保存將要發(fā)送的msg.
而skb->len指的是當(dāng)前的skb的所包含的數(shù)據(jù)的大小(包含頭的大小).而這個值如果大于size_goal,則說明buf已滿,我
們需要重新alloc一個端.如果小于size_goal,則說明buf還有空間來容納一些數(shù)據(jù)來組成一個等于mss的數(shù)據(jù)包再發(fā)送給ip層.
/* Ok commence sending. */
iovlen = msg->msg_iovlen;
iov = msg->msg_iov;
///copy的大小
copied = 0;
err = -EPIPE;
///如果發(fā)送端已經(jīng)完全關(guān)閉則返回,并設(shè)置errno.
if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
goto do_error;
while (--iovlen >= 0) {
///取得當(dāng)前buf長度
int seglen = iov->iov_len;
///buf的基地址.
unsigned char __user *from = iov->iov_base;
iov++;
while (seglen > 0) {
int copy;
///我們知道sock的發(fā)送隊(duì)列sk_write_queue是一個雙向鏈表,而用tcp_write_queue_tail則是取得鏈表的最后一個元素.(如果鏈表為空則返回NULL).
skb = tcp_write_queue_tail(sk);
///上面介紹過了.主要是判斷buf是否有空閑空間.
if (!tcp_send_head(sk) ||
(copy = size_goal - skb->len) <= 0) {
new_segment:
///開始alloc一個新的段.
if (!sk_stream_memory_free(sk))
goto wait_for_sndbuf;
///alloc的大小一般都是等于mss的大小,這里是通過select_size得到的.
skb = sk_stream_alloc_skb(sk, select_size(sk),
sk->sk_allocation);
if (!skb)
goto wait_for_memory;
/*
* Check whether we can use HW checksum.
*/
if (sk->sk_route_caps & NETIF_F_ALL_CSUM)
skb->ip_summed = CHECKSUM_PARTIAL;
///將這個skb加入到sk_write_queue隊(duì)列中,并更新sk_send_head域.
skb_entail(sk, skb);
///將copy值更新.
copy = size_goal;
}
接下來如果走到這里,則說明 要么已經(jīng)alloc一個新的buf,要么當(dāng)前的buf中還有空閑空間.
這里先來分析alloc一個新的buf的情況.
這里先看下skb中的幾個域的含義:
head and end 指的是alloc了的buf的起始和終止位置,而data and tail 指的是數(shù)據(jù)段的起始和終止位置,因此經(jīng)過每一層tail和data都會變化的,而初始值這兩個是相等的.
我們來看skb_tailroom,它主要是用來判斷得到當(dāng)前的skb的tailroom的大小.tailroom也就是當(dāng)前buf的剩余數(shù)據(jù)段的大小,這里也就是用來判斷當(dāng)前buf是否能夠再添加數(shù)據(jù).
static inline int skb_is_nonlinear(const struct sk_buff *skb)
{
return skb->data_len;
}
static inline int skb_tailroom(const struct sk_buff *skb)
{
///如果是新alloc的skb則會返回tailroom否則返回0
return skb_is_nonlinear(skb) ? 0 : skb->end - skb->tail;
}
接下來來看代碼:
while (--iovlen >= 0) {
...........................
while (seglen > 0) {
///如果copy大于buf的大小,則縮小copy.
if (copy > seglen)
copy = seglen;
///這里查看skb的空間.如果大于0,則說明是新建的skb.
if (skb_tailroom(skb) > 0) {
///如果需要復(fù)制的數(shù)據(jù)大于所剩的空間,則先復(fù)制當(dāng)前skb所能容納的大小.
if (copy > skb_tailroom(skb))
copy = skb_tailroom(skb);
///復(fù)制數(shù)據(jù)到sk_buff.大小為copy.如果成功進(jìn)入do_fault,(我們下面會分析)
if ((err = skb_add_data(skb, from, copy)) != 0)
goto do_fault;
}
如果走到這一步,當(dāng)前的sk buff中有空閑空間 也分兩種情況,一種是 設(shè)備支持Scatter/Gather I/O(原理和udp的ip_append_data一樣,可以看我以前的blog).
另外一種情況是設(shè)備不支持S/G IO,可是mss變大了.這種情況下我們需要返回new_segment,新建一個段,然后再處理.
\我建議在看這段代碼前,可以看下我前面blog分析ip_append_data的那篇.因?yàn)槟抢飳/G
IO的設(shè)備處理切片的分析比較詳細(xì),而這里和那邊處理基本類似.這里我對frags的操作什么的都是很簡單的描述,詳細(xì)的在ip_append_data
那里已經(jīng)描述過.
然后再來了解下PSH標(biāo)記,這個標(biāo)記主要是用來使接收方將sk->receive_queue上緩存的skb提交給用戶進(jìn)程.詳細(xì)的介紹可
以看tcp協(xié)議的相關(guān)部分(推功能).在這里設(shè)置這個位會有兩種情況,第一種是我們寫了超過一半窗口大小的數(shù)據(jù),此時我們需要標(biāo)記最后一個段的PSH位.
或者我們有一個完整的tcp段發(fā)送出去,此時我們也需要標(biāo)記pSH位.
while (--iovlen >= 0) {
...........................
while (seglen > 0) {
...............................
else {
int merge = 0;
///取得nr_frags也就是保存物理頁的數(shù)組.
int i = skb_shinfo(skb)->nr_frags;
///從socket取得當(dāng)前的發(fā)送物理頁.
struct page *page = TCP_PAGE(sk);
///取得當(dāng)前頁的位移.
int off = TCP_OFF(sk);
///這里主要是判斷skb的發(fā)送頁是否已經(jīng)存在于nr_frags中,如果存在并且也沒有滿,則我們只需要將數(shù)據(jù)合并到這個頁就可以了,而不需要在frag再添加一個頁.
if (skb_can_coalesce(skb, i, page, off) &&
off != PAGE_SIZE) {
merge = 1;
} else if (i == MAX_SKB_FRAGS ||
(!i &&
!(sk->sk_route_caps & NETIF_F_SG))) {
///到這里說明要么設(shè)備不支持SG IO,要么頁已經(jīng)滿了.因?yàn)槲覀冎纍r_frags的大小是有限制的.此時調(diào)用tcp_mark_push來加一個PSH標(biāo)記.
tcp_mark_push(tp, skb);
goto new_segment;
} else if (page) {
if (off == PAGE_SIZE) {
///這里說明當(dāng)前的發(fā)送頁已滿.
put_page(page);
TCP_PAGE(sk) = page = NULL;
off = 0;
}
} else
off = 0;
if (copy > PAGE_SIZE - off)
copy = PAGE_SIZE - off;
.................................
///如果page為NULL則需要新alloc一個物理頁.
if (!page) {
/* Allocate new cache page. */
if (!(page = sk_stream_alloc_page(sk)))
goto wait_for_memory;
}
///開始復(fù)制數(shù)據(jù)到這個物理頁.
err = skb_copy_to_page(sk, from, skb, page,
off, copy);
if (err) {
///出錯的情況.
if (!TCP_PAGE(sk)) {
TCP_PAGE(sk) = page;
TCP_OFF(sk) = 0;
}
goto do_error;
}
///判斷是否為新建的物理頁.
if (merge) {
///如果只是在存在的物理頁添加數(shù)據(jù),則只需要更新size
skb_shinfo(skb)->frags[i - 1].size +=
copy;
} else {
///負(fù)責(zé)添加此物理頁到skb的frags.
skb_fill_page_desc(skb, i, page, off, copy);
if (TCP_PAGE(sk)) {
///設(shè)置物理頁的引用計數(shù).
get_page(page);
} else if (off + copy < PAGE_SIZE) {
get_page(page);
TCP_PAGE(sk) = page;
}
}
///設(shè)置位移.
TCP_OFF(sk) = off + copy;
}
數(shù)據(jù)復(fù)制完畢,接下來就該發(fā)送數(shù)據(jù)了.
這里我們要知道幾個tcp_push,tcp_one_push最終都會調(diào)用__tcp_push_pending_frames,而在它中間最
終會調(diào)用tcp_write_xmit,而tcp_write_xmit則會調(diào)用tcp_transmit_skb,這個函數(shù)最終會調(diào)用
ip_queue_xmit來講數(shù)據(jù)發(fā)送給ip層.這里要注意,我們這里的分析忽略掉了,tcp的一些管理以及信息交互的過程.
接下來看數(shù)據(jù)傳輸之前先來分析下TCP_PUSH幾個函數(shù)的實(shí)現(xiàn),tcp_push這幾個類似函數(shù)的最后一個參數(shù)都是一個控制nagle算法的參數(shù),來看下這幾個函數(shù)的原型:
static inline void tcp_push(struct sock *sk, int flags, int mss_now,
int nonagle)
void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,
int nonagle)
static int tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle)
我們還要知道tcp sock有一個nonagle域,這個域是會被tcp_cork套接口選項(xiàng)時被設(shè)置為TCP_NAGLE_CORK .先來看tcp_push的實(shí)現(xiàn):
static inline void tcp_push(struct sock *sk, int flags, int mss_now,
int nonagle)
{
struct tcp_sock *tp = tcp_sk(sk);
if (tcp_send_head(sk)) {
struct sk_buff *skb = tcp_write_queue_tail(sk);
///MSG_MORE這個參數(shù)我們在ip_append_data那里已經(jīng)介紹過了,就是告訴ip層,我這里主要是一些小的數(shù)據(jù)包,然后ip層就會提前劃分一個mtu大小的buf,然后等待數(shù)據(jù)的到來.因此如果沒有設(shè)置這個或者forced_push返回真(我們寫了超過最大窗口一般的數(shù)據(jù)),就標(biāo)記一個PSH.
if (!(flags & MSG_MORE) || forced_push(tp))
tcp_mark_push(tp, skb);
tcp_mark_urg(tp, flags, skb);
///這里還是根據(jù)是否有設(shè)置MSG_MORE來判斷使用哪個flags.因此可以看到如果我們設(shè)置了tcp_cork套接字選項(xiàng)和設(shè)置msg的MSG_MORE比較類似.最終調(diào)用tcp_push都會傳遞給__tcp_push_pending_frames的參數(shù)為TCP_NAGLE_CORK .
__tcp_push_pending_frames(sk, mss_now,
(flags & MSG_MORE) ? TCP_NAGLE_CORK : nonagle);
}
}
在看tcp_write_xmit之前,我們先來看下tcp_nagle_test,這個函數(shù)主要用來檢測nagle算法.如果當(dāng)前允許數(shù)據(jù)段立即被發(fā)送,則返回1,否則為0.
///這個函數(shù)就不介紹了,內(nèi)核的注釋很詳細(xì).
/* Return 0, if packet can be sent now without violation Nagle's rules:
* 1. It is full sized.
* 2. Or it contains FIN. (already checked by caller)
* 3. Or TCP_NODELAY was set.
* 4. Or TCP_CORK is not set, and all sent packets are ACKed.
* With Minshall's modification: all sent small packets are ACKed.
*/
static inline int tcp_nagle_check(const struct tcp_sock *tp,
const struct sk_buff *skb,
unsigned mss_now, int nonagle)
{
return (skb->len < mss_now &&
((nonagle & TCP_NAGLE_CORK) ||
(!nonagle && tp->packets_out && tcp_minshall_check(tp))));
}
static inline int tcp_nagle_test(struct tcp_sock *tp, struct sk_buff *skb,
unsigned int cur_mss, int nonagle)
{
///如果設(shè)置了TCP_NAGLE_PUSH則返回1,也就是數(shù)據(jù)可以立即發(fā)送
if (nonagle & TCP_NAGLE_PUSH)
return 1;
/* Don't use the nagle rule for urgent data (or for the final FIN).
* Nagle can be ignored during F-RTO too (see RFC4138).
*/
if (tcp_urg_mode(tp) || (tp->frto_counter == 2) ||
(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_FIN))
return 1;
///再次檢測 nonagle域,相關(guān)的檢測,上面已經(jīng)說明了.
if (!tcp_nagle_check(tp, skb, cur_mss, nonagle))
return 1;
return 0;
}
然后看下tcp_write_xmit的實(shí)現(xiàn),
static int tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle)
{
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;
unsigned int tso_segs, sent_pkts;
int cwnd_quota;
int result;
///檢測狀態(tài).
if (unlikely(sk->sk_state == TCP_CLOSE))
return 0;
sent_pkts = 0;
///探測mtu.
if ((result = tcp_mtu_probe(sk)) == 0) {
return 0;
} else if (result > 0) {
sent_pkts = 1;
}
///開始處理數(shù)據(jù)包.
while ((skb = tcp_send_head(sk))) {
unsigned int limit;
tso_segs = tcp_init_tso_segs(sk, skb, mss_now);
BUG_ON(!tso_segs);
///主要用來測試congestion window..
cwnd_quota = tcp_cwnd_test(tp, skb);
if (!cwnd_quota)
break;
if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now)))
break;
if (tso_segs == 1) {
///主要看這里,如果這個skb是寫隊(duì)列的最后一個buf,則傳輸TCP_NAGLE_PUSH給tcp_nagle_test,這個時侯直接返回1,于是接著往下面走,否則則說明數(shù)據(jù)包不要求理解發(fā)送,我們就跳出循環(huán)(這時數(shù)據(jù)段就不會被發(fā)送).比如設(shè)置了TCP_CORK.
if (unlikely(!tcp_nagle_test(tp, skb, mss_now,
(tcp_skb_is_last(sk, skb) ?
nonagle : TCP_NAGLE_PUSH))))
break;
} else {
if (tcp_tso_should_defer(sk, skb))
break;
}
limit = mss_now;
if (tso_segs > 1 && !tcp_urg_mode(tp))
limit = tcp_mss_split_point(sk, skb, mss_now,
cwnd_quota);
if (skb->len > limit &&
unlikely(tso_fragment(sk, skb, limit, mss_now)))
break;
TCP_SKB_CB(skb)->when = tcp_time_stamp;
///傳輸數(shù)據(jù)給3層.
if (unlikely(tcp_transmit_skb(sk, skb, 1, GFP_ATOMIC)))
break;
/* Advance the send_head. This one is sent out.
* This call will increment packets_out.
*/
tcp_event_new_data_sent(sk, skb);
tcp_minshall_update(tp, mss_now, skb);
sent_pkts++;
}
if (likely(sent_pkts)) {
tcp_cwnd_validate(sk);
return 0;
}
return !tp->packets_out && tcp_send_head(sk);
}
然后返回來,來看剛才緊接著的實(shí)現(xiàn):
while (--iovlen >= 0) {
...........................
while (seglen > 0) {
...............................
///如果第一次組完一個段,則設(shè)置PSH.
if (!copied)
TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
///然后設(shè)置寫隊(duì)列長度.
tp->write_seq += copy;
TCP_SKB_CB(skb)->end_seq += copy;
skb_shinfo(skb)->gso_segs = 0;
///更新buf基地址以及復(fù)制的buf大小.
from += copy;
copied += copy;
///buf已經(jīng)復(fù)制完則退出循環(huán).并發(fā)送這個段.
if ((seglen -= copy) == 0 && iovlen == 0)
goto out;
///如果skb的數(shù)據(jù)大小小于所需拷貝的數(shù)據(jù)大小或者存在帶外數(shù)據(jù),我們繼續(xù)循環(huán),而當(dāng)存在帶外數(shù)據(jù)時,我們接近著的循環(huán)會退出循環(huán),然后調(diào)用tcp_push將數(shù)據(jù)發(fā)出.
if (skb->len < size_goal || (flags & MSG_OOB))
continue;
///forced_push用來判斷我們是否已經(jīng)寫了多于一半窗口大小的數(shù)據(jù)到對端.如果是,我們則要發(fā)送一個推數(shù)據(jù)(PSH).
if (forced_push(tp)) {
tcp_mark_push(tp, skb);
///調(diào)用__tcp_push_pending_frames將開啟NAGLE算法的緩存的段全部發(fā)送出去.
__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
} else if (skb == tcp_send_head(sk))
///如果當(dāng)前將要發(fā)送的buf剛好為skb,則會傳發(fā)送當(dāng)前的buf
tcp_push_one(sk, mss_now);
continue;
wait_for_sndbuf:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
if (copied)
///內(nèi)存不夠,則盡量將本地的NAGLE算法所緩存的數(shù)據(jù)發(fā)送出去.
tcp_push(sk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
goto do_error;
///更新相關(guān)域.
mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));
size_goal = tp->xmit_size_goal;
}
}
最后來看下出錯或者成功tcp_sendmsg所做的:
out:
///這里是成功返回所做的.
if (copied)
///這里可以看到最終的flag是tp->nonagle,而這個就是看套接口選項(xiàng)是否有開nagle算法,如果沒開的話,立即把數(shù)據(jù)發(fā)出去,否則則會村訊nagle算法,將小數(shù)據(jù)緩存起來.
tcp_push(sk, flags, mss_now, tp->nonagle);
TCP_CHECK_TIMER(sk);
release_sock(sk);
return copied;
do_fault:
if (!skb->len) {
///從write隊(duì)列unlink掉當(dāng)前的buf.
tcp_unlink_write_queue(skb, sk);
///更新send)head
tcp_check_send_head(sk, skb);
///釋放skb.
sk_wmem_free_skb(sk, skb);
}
do_error:
if (copied)
///如果copied不為0,則說明發(fā)送成功一部分?jǐn)?shù)據(jù),因此此時返回out.
goto out;
out_err:
///否則進(jìn)入錯誤處理.
err = sk_stream_error(sk, flags, err);
TCP_CHECK_TIMER(sk);
release_sock(sk);
return err;
總結(jié)
以上是生活随笔為你收集整理的linux 内核 发送数据,linux 内核tcp数据发送的实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux非root用户搭建docker
- 下一篇: linux 其他常用命令