主頁 > 知識庫 > 解析Linux源碼之epoll

解析Linux源碼之epoll

熱門標(biāo)簽:硅谷的囚徒呼叫中心 智能手機 百度競價點擊價格的計算公式 美圖手機 檢查注冊表項 網(wǎng)站建設(shè) 阿里云 使用U盤裝系統(tǒng)

一、前言

在linux的高性能網(wǎng)絡(luò)編程中,繞不開的就是epoll。和select、poll等系統(tǒng)調(diào)用相比,epoll在需要監(jiān)視大量文件描述符并且其中只有少數(shù)活躍的時候,表現(xiàn)出無可比擬的優(yōu)勢。epoll能讓內(nèi)核記住所關(guān)注的描述符,并在對應(yīng)的描述符事件就緒的時候,在epoll的就緒鏈表中添加這些就緒元素,并喚醒對應(yīng)的epoll等待進(jìn)程。

二、簡單的epoll例子

下面的例子,是從筆者本人用c語言寫的dbproxy中的一段代碼。由于細(xì)節(jié)過多,所以做了一些刪減。

int init_reactor(int listen_fd,int worker_count){
	......
	// 創(chuàng)建多個epoll fd,以充分利用多核
	for(i=0;i<worker_count;i++){
		reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);
	}
	/* epoll add listen_fd and accept */
	// 將accept后的事件加入到對應(yīng)的epoll fd中
	int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
	// 將連接描述符注冊到對應(yīng)的worker里面
	epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
}
// reactor的worker線程
static void* rw_thread_func(void* arg){
	......

	for(;;){
		  // epoll_wait等待事件觸發(fā)
        int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);
        if(retval > 0){
        	for(j=0; j < retval; j++){
        		// 處理讀事件
        	   if(event & EPOLLIN){
                 handle_ready_read_connection(conn);
                 continue;
             }
             /* 處理其它事件 */
        	}
        }
	}
	......
}

上述代碼事實上就是實現(xiàn)了一個reactor模式中的accept與read/write處理線程,如下圖所示:

2.1、epoll_create

Unix的萬物皆文件的思想在epoll里面也有體現(xiàn),epoll_create調(diào)用返回一個文件描述符,此描述符掛載在anon_inode_fs(匿名inode文件系統(tǒng))的根目錄下面。讓我們看下具體的epoll_create系統(tǒng)調(diào)用源碼:

SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
		return -EINVAL;

	return sys_epoll_create1(0);
}

由上述源碼可見,epoll_create的參數(shù)是基本沒有意義的,kernel簡單的判斷是否為0,然后就直接就調(diào)用了sys_epoll_create1。由于linux的系統(tǒng)調(diào)用是通過(SYSCALL_DEFINE1,SYSCALL_DEFINE2......SYSCALL_DEFINE6)定義的,那么sys_epoll_create1對應(yīng)的源碼即是SYSCALL_DEFINE(epoll_create1)。

(注:受限于寄存器數(shù)量的限制,(80x86下的)kernel限制系統(tǒng)調(diào)用最多有6個參數(shù)。據(jù)ulk3所述,這是由于32位80x86寄存器的限制)

接下來,我們就看下epoll_create1的源碼:

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	// kzalloc(sizeof(*ep), GFP_KERNEL),用的是內(nèi)核空間
	error = ep_alloc(&ep);
	// 獲取尚未被使用的文件描述符,即描述符數(shù)組的槽位
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
	// 在匿名inode文件系統(tǒng)中分配一個inode,并得到其file結(jié)構(gòu)體
	// 且file->f_op = &eventpoll_fops
	// 且file->private_data = ep;
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	// 將file填入到對應(yīng)的文件描述符數(shù)組的槽里面
	fd_install(fd,file);			 
	ep->file = file;
	return fd;
}

最后epoll_create生成的文件描述符如下圖所示:

2.2、struct eventpoll

所有的epoll系統(tǒng)調(diào)用都是圍繞eventpoll結(jié)構(gòu)體做操作,現(xiàn)簡要描述下其中的成員:

/*
 * 此結(jié)構(gòu)體存儲在file->private_data中
 */
struct eventpoll {
	// 自旋鎖,在kernel內(nèi)部用自旋鎖加鎖,就可以同時多線(進(jìn))程對此結(jié)構(gòu)體進(jìn)行操作
	// 主要是保護(hù)ready_list
	spinlock_t lock;
	// 這個互斥鎖是為了保證在eventloop使用對應(yīng)的文件描述符的時候,文件描述符不會被移除掉
	struct mutex mtx;
	// epoll_wait使用的等待隊列,和進(jìn)程喚醒有關(guān)
	wait_queue_head_t wq;
	// file->poll使用的等待隊列,和進(jìn)程喚醒有關(guān)
	wait_queue_head_t poll_wait;
	// 就緒的描述符隊列
	struct list_head rdllist;
	// 通過紅黑樹來組織當(dāng)前epoll關(guān)注的文件描述符
	struct rb_root rbr;
	// 在向用戶空間傳輸就緒事件的時候,將同時發(fā)生事件的文件描述符鏈入到這個鏈表里面
	struct epitem *ovflist;
	// 對應(yīng)的user
	struct user_struct *user;
	// 對應(yīng)的文件描述符
	struct file *file;
	// 下面兩個是用于環(huán)路檢測的優(yōu)化
	int visited;
	struct list_head visited_list_link;
};

本文講述的是kernel是如何將就緒事件傳遞給epoll并喚醒對應(yīng)進(jìn)程上,因此在這里主要聚焦于(wait_queue_head_t wq)等成員。

2.3、epoll_ctl(add)

我們看下epoll_ctl(EPOLL_CTL_ADD)是如何將對應(yīng)的文件描述符插入到eventpoll中的。
借助于spin_lock(自旋鎖)和mutex(互斥鎖),epoll_ctl調(diào)用可以在多個KSE(內(nèi)核調(diào)度實體,即進(jìn)程/線程)中并發(fā)執(zhí)行。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	/* 校驗epfd是否是epoll的描述符 */
	// 此處的互斥鎖是為了防止并發(fā)調(diào)用epoll_ctl,即保護(hù)內(nèi)部數(shù)據(jù)結(jié)構(gòu)
	// 不會被并發(fā)的添加修改刪除破壞
	mutex_lock_nested(&ep->mtx, 0);
	switch (op) {
		case EPOLL_CTL_ADD:
			...
			// 插入到紅黑樹中
			error = ep_insert(ep, &epds, tfile, fd);
			...
			break;
		......
	}
	mutex_unlock(&ep->mtx);	
}		

上述過程如下圖所示:

2.4、ep_insert

在ep_insert中初始化了epitem,然后初始化了本文關(guān)注的焦點,即事件就緒時候的回調(diào)函數(shù),代碼如下所示:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
	/* 初始化epitem */
	// &epq.pt->qproc = ep_ptable_queue_proc
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	// 在這里將回調(diào)函數(shù)注入
	revents = tfile->f_op->poll(tfile, &epq.pt);
	// 如果當(dāng)前有事件已經(jīng)就緒,那么一開始就會被加入到ready list
	// 例如可寫事件
	// 另外,在tcp內(nèi)部ack之后調(diào)用tcp_check_space,最終調(diào)用sock_def_write_space來喚醒對應(yīng)的epoll_wait下的進(jìn)程
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		// wake_up ep對應(yīng)在epoll_wait下的進(jìn)程
		if (waitqueue_active(&ep->wq)){
			wake_up_locked(&ep->wq);
		}
		......
	}	
	// 將epitem插入紅黑樹
	ep_rbtree_insert(ep, epi);
	......
}

2.5、tfile->f_op->poll的實現(xiàn)

向kernel更底層注冊回調(diào)函數(shù)的是tfile->f_op->poll(tfile, &epq.pt)這一句,我們來看一下對于對應(yīng)的socket文件描述符,其fd=>file->f_op->poll的初始化過程:

// 將accept后的事件加入到對應(yīng)的epoll fd中
int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
// 將連接描述符注冊到對應(yīng)的worker里面
epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);

回顧一下上述user space代碼,fd即client_fd是由tcp的listen_fd通過accept調(diào)用而來,那么我們看下accept調(diào)用鏈的關(guān)鍵路徑:

accept

      |->accept4

            |->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);

                  |->init_file(file,...,&socket_file_ops);

                        |->file->f_op = fop;

                              /* file->f_op = &socket_file_ops */

            |->fd_install(newfd, newfile); // 安裝fd

那么,由accept獲得的client_fd的結(jié)構(gòu)如下圖所示:

(注:由于是tcp socket,所以這邊sock->ops=inet_stream_ops,既然知道了tfile->f_op->poll的實現(xiàn),我們就可以看下此poll是如何將安裝回調(diào)函數(shù)的。

2.6、回調(diào)函數(shù)的安裝

kernel的調(diào)用路徑如下:

sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;

|->sock->ops->poll

|->tcp_poll

/* 這邊重要的是拿到了sk_sleep用于KSE(進(jìn)程/線程)的喚醒 */

|->sock_poll_wait(file, sk->sk_sleep, wait);

|->poll_wait

|->p->qproc(filp, wait_address, p);

/* p為&epq.pt,而且&epq.pt->qproc= ep_ptable_queue_proc*/

|-> ep_ptable_queue_proc(filp,wait_address,p);

繞了一大圈之后,我們的回調(diào)函數(shù)的安裝其實就是調(diào)用了eventpoll.c中的ep_ptable_queue_proc,而且向其中傳遞了sk->sk_sleep作為其waitqueue的head,其源碼如下所示:

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	// 取出當(dāng)前client_fd對應(yīng)的epitem
	struct epitem *epi = ep_item_from_epqueue(pt);
	// &pwq->wait->func=ep_poll_callback,用于回調(diào)喚醒
	// 注意,這邊不是init_waitqueue_entry,即沒有將當(dāng)前KSE(current,當(dāng)前進(jìn)程/線程)寫入到
	// wait_queue當(dāng)中,因為不一定是從當(dāng)前安裝的KSE喚醒,而應(yīng)該是喚醒epoll\_wait的KSE
	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
	// 這邊的whead是sk->sk_sleep,將當(dāng)前的waitqueue鏈入到socket對應(yīng)的sleep列表
	add_wait_queue(whead, &pwq->wait);	
}	

這樣client_fd的結(jié)構(gòu)進(jìn)一步完善,如下圖所示:

ep_poll_callback函數(shù)是喚醒對應(yīng)epoll_wait的地方,我們將在后面一起講述。

2.7、epoll_wait

epoll_wait主要是調(diào)用了ep_poll:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	/* 檢查epfd是否是epoll\_create創(chuàng)建的fd */
	// 調(diào)用ep_poll
	error = ep_poll(ep, events, maxevents, timeout);
	...
}

緊接著,我們看下ep_poll函數(shù):

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	......
retry:
	// 獲取spinlock
	spin_lock_irqsave(&ep->lock, flags);
	// 將當(dāng)前task_struct寫入到waitqueue中以便喚醒
	// wq_entry->func = default_wake_function;
	init_waitqueue_entry(&wait, current);
	// WQ_FLAG_EXCLUSIVE,排他性喚醒,配合SO_REUSEPORT從而解決accept驚群問題
	wait.flags |= WQ_FLAG_EXCLUSIVE;
	// 鏈入到ep的waitqueue中
	__add_wait_queue(&ep->wq, &wait);
	for (;;) {
		// 設(shè)置當(dāng)前進(jìn)程狀態(tài)為可打斷
		set_current_state(TASK_INTERRUPTIBLE);
		// 檢查當(dāng)前線程是否有信號要處理,有則返回-EINTR
		if (signal_pending(current)) {
			res = -EINTR;
			break;
		}
		spin_unlock_irqrestore(&ep->lock, flags);
		// schedule調(diào)度,讓出CPU
		jtimeout = schedule_timeout(jtimeout);
		spin_lock_irqsave(&ep->lock, flags);
	}
	// 到這里,表明超時或者有事件觸發(fā)等動作導(dǎo)致進(jìn)程重新調(diào)度
	__remove_wait_queue(&ep->wq, &wait);
	// 設(shè)置進(jìn)程狀態(tài)為running
	set_current_state(TASK_RUNNING);
	......
	// 檢查是否有可用事件
	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
	......
	// 向用戶空間拷貝就緒事件
	ep_send_events(ep, events, maxevents)
}		   

上述邏輯如下圖所示:

2.8、ep_send_events

ep_send_events函數(shù)主要就是調(diào)用了ep_scan_ready_list,顧名思義ep_scan_ready_list就是掃描就緒列表:

static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv,
			      int depth)
{
	...
	// 將epfd的rdllist鏈入到txlist
	list_splice_init(&ep->rdllist, &txlist);
	...
	/* sproc = ep_send_events_proc */
	error = (*sproc)(ep, &txlist, priv);
	...
	// 處理ovflist,即在上面sproc過程中又到來的事件
	...
}

其主要調(diào)用了ep_send_events_proc:

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {
	   // 遍歷ready list 
		epi = list_first_entry(head, struct epitem, rdllink);
		list_del_init(&epi->rdllink);
		// readylist只是表明當(dāng)前epi有事件,具體的事件信息還是得調(diào)用對應(yīng)file的poll
		// 這邊的poll即是tcp_poll,根據(jù)tcp本身的信息設(shè)置掩碼(mask)等信息 & 上興趣事件掩碼,則可以得知當(dāng)前事件是否是epoll_wait感興趣的事件
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
			epi->event.events;
		if(revents){
			/* 將event放入到用戶空間 */
			/* 處理ONESHOT邏輯 */
			// 如果不是邊緣觸發(fā),則將當(dāng)前的epi重新加回到可用列表中,這樣就可以下一次繼續(xù)觸發(fā)poll,如果下一次poll的revents不為0,那么用戶空間依舊能感知 */
			else if (!(epi->event.events & EPOLLET)){
				list_add_tail(&epi->rdllink, &ep->rdllist);
			}
			/* 如果是邊緣觸發(fā),那么就不加回可用列表,因此只能等到下一個可用事件觸發(fā)的時候才會將對應(yīng)的epi放到可用列表里面*/
			eventcnt++
		}
		/* 如poll出來的revents事件epoll_wait不感興趣(或者本來就沒有事件),那么也不會加回到可用列表 */
		......
	}
	return eventcnt;
}		

上述代碼邏輯如下所示:

三、事件到來添加到epoll就緒隊列(rdllist)的過程

經(jīng)過上述章節(jié)的詳述之后,我們終于可以闡述,tcp在數(shù)據(jù)到來時是怎么加入到epoll的就緒隊列的了。

3.1、可讀事件到來

首先我們看下tcp數(shù)據(jù)包從網(wǎng)卡驅(qū)動到kernel內(nèi)部tcp協(xié)議處理調(diào)用鏈:

step1:

網(wǎng)絡(luò)分組到來的內(nèi)核路徑,網(wǎng)卡發(fā)起中斷后調(diào)用netif_rx將事件掛入CPU的等待隊列,并喚起軟中斷(soft_irq),再通過linux的軟中斷機制調(diào)用net_rx_action,如下圖所示:

注:上圖來自PLKA(<<深入Linux內(nèi)核架構(gòu)>>)

step2:

緊接著跟蹤next_rx_action

next_rx_action

|-process_backlog

......

|->packet_type->func 在這里我們考慮ip_rcv

|->ipprot->handler 在這里ipprot重載為tcp_protocol

(handler 即為tcp_v4_rcv)

我們再看下對應(yīng)的tcp_v4_rcv

tcp_v4_rcv

      |->tcp_v4_do_rcv

            |->tcp_rcv_state_process

                  |->tcp_data_queue

                        |-> sk->sk_data_ready(sock_def_readable)

                              |->wake_up_interruptible_sync_poll(sk->sleep,...)

                                    |->__wake_up

                                          |->__wake_up_common

                                                |->curr->func

                                                /* 這里已經(jīng)被ep_insert添加為ep_poll_callback,而且設(shè)定了排它標(biāo)識WQ_FLAG_EXCLUSIVE*/

                                                      |->ep_poll_callback

這樣,我們就看下最終喚醒epoll_wait的ep_poll_callback函數(shù):

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	// 獲取wait對應(yīng)的epitem	
	struct epitem *epi = ep_item_from_wait(wait);
	// epitem對應(yīng)的eventpoll結(jié)構(gòu)體
	struct eventpoll *ep = epi->ep;
	// 獲取自旋鎖,保護(hù)ready_list等結(jié)構(gòu)
	spin_lock_irqsave(&ep->lock, flags);
	// 如果當(dāng)前epi沒有被鏈入ep的ready list,則鏈入
	// 這樣,就把當(dāng)前的可用事件加入到epoll的可用列表了
	if (!ep_is_linked(&epi->rdllink))
		list_add_tail(&epi->rdllink, &ep->rdllist);
	// 如果有epoll_wait在等待的話,則喚醒這個epoll_wait進(jìn)程
	// 對應(yīng)的&ep->wq是在epoll_wait調(diào)用的時候通過init_waitqueue_entry(&wait, current)而生成的
	// 其中的current即是對應(yīng)調(diào)用epoll_wait的進(jìn)程信息task_struct
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
}

上述過程如下圖所示:

最后wake_up_locked調(diào)用__wake_up_common,然后調(diào)用了在init_waitqueue_entry注冊的default_wake_function,調(diào)用路徑為:

wake_up_locked

|->__wake_up_common

|->default_wake_function

|->try_wake_up (wake up a thread)

|->activate_task

|->enqueue_task    running

將epoll_wait進(jìn)程推入可運行隊列,等待內(nèi)核重新調(diào)度進(jìn)程,然后epoll_wait對應(yīng)的這個進(jìn)程重新運行后,就從schedule恢復(fù),繼續(xù)下面的ep_send_events(向用戶空間拷貝事件并返回)。

wake_up過程如下圖所示:

3.2、可寫事件到來

可寫事件的運行過程和可讀事件大同小異:

首先,在epoll_ctl_add的時候預(yù)先會調(diào)用一次對應(yīng)文件描述符的poll,如果返回事件里有可寫掩碼的時候直接調(diào)用wake_up_locked以喚醒對應(yīng)的epoll_wait進(jìn)程。

然后,在tcp在底層驅(qū)動有數(shù)據(jù)到來的時候可能攜帶了ack從而可以釋放部分已經(jīng)被對端接收的數(shù)據(jù),于是觸發(fā)可寫事件,這一部分的調(diào)用鏈為:

tcp_input.c

tcp_v4_rcv

|-tcp_v4_do_rcv

|-tcp_rcv_state_process

|-tcp_data_snd_check

|->tcp_check_space

|->tcp_new_space

|->sk->sk_write_space

/* tcp下即是sk_stream_write_space*/

最后在此函數(shù)里面sk_stream_write_space喚醒對應(yīng)的epoll_wait進(jìn)程

void sk_stream_write_space(struct sock *sk)
{
	// 即有1/3可寫空間的時候才觸發(fā)可寫事件
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
		clear_bit(SOCK_NOSPACE, &sock->flags);

		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
			wake_up_interruptible_poll(sk->sk_sleep, POLLOUT |
						POLLWRNORM | POLLWRBAND)
		......
	}
}

四、關(guān)閉描述符(close fd)

值得注意的是,我們在close對應(yīng)的文件描述符的時候,會自動調(diào)用eventpoll_release將對應(yīng)的file從其關(guān)聯(lián)的epoll_fd中刪除,kernel關(guān)鍵路徑如下:

close fd

      |->filp_close

            |->fput

                  |->__fput

                        |->eventpoll_release

                              |->ep_remove

所以我們在關(guān)閉對應(yīng)的文件描述符后,并不需要通過epoll_ctl_del來刪掉對應(yīng)epoll中相應(yīng)的描述符。

五、總結(jié)

epoll作為linux下非常優(yōu)秀的事件觸發(fā)機制得到了廣泛的運用。其源碼還是比較復(fù)雜的,本文只是闡述了epoll讀寫事件的觸發(fā)機制。

以上就是解析Linux源碼之epoll的詳細(xì)內(nèi)容,更多關(guān)于Linux源碼 epoll的資料請關(guān)注腳本之家其它相關(guān)文章!

標(biāo)簽:煙臺 懷化 山南 湖北 黃山 賀州 湘潭 通遼

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《解析Linux源碼之epoll》,本文關(guān)鍵詞  ;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 收縮
    • 微信客服
    • 微信二維碼
    • 電話咨詢

    • 400-1100-266