DDR爱好者之家 Design By 杰米

前言

Redis的网络模型是基于I/O多路复用程序来实现的。源码中包含四种多路复用函数库epoll、select、evport、kqueue。在程序编译时会根据系统自动选择这四种库其中之一。下面以epoll为例,来分析Redis的I/O模块的源码。

epoll系统调用方法

Redis网络事件处理模块的代码都是围绕epoll那三个系统方法来写的。先把这三个方法弄清楚,后面就不难了。

epfd = epoll_create(1024);

创建epoll实例

参数:表示该 epoll 实例最多可监听的 socket fd(文件描述符)数量。

返回: epoll 专用的文件描述符。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

管理epoll中的事件,对事件进行注册、修改和删除。

参数:
epfd:epoll实例的文件描述符;
op:取值三种:EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除;
fd:socket的文件描述符;
epoll_event *event:事件

event代表一个事件,类似于Java NIO中的channel“通道”。epoll_event 的结构如下:

typedef union epoll_data {
void *ptr;
int fd; /* socket文件描述符 */
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events 就是各种待监听操作的操作码求与的结果,例如EPOLLIN(fd可读)、EPOLLOUT(fd可写) */
epoll_data_t data; /* User data variable */
};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等待事件是否就绪,类似于Java NIO中 select 方法。如果事件就绪,将就绪的event存入events数组中。

参数
epfd:epoll实例的文件描述符;
events:已就绪的事件数组;
intmaxevents:每次能处理的事件数;
timeout:阻塞时间,等待产生就绪事件的超时值。

源码分析

事件

Redis事件系统中将事件分为两种类型:

  • 文件事件;网络套接字对应的事件;
  • 时间事件:Redis中一些定时操作事件,例如 serverCron 函数。

下面从事件的注册、触发两个流程对源码进行分析

绑定事件

建立 eventLoop

在 initServer方法(由 redis.c 的 main 函数调用) 中,在建立 RedisDb 对象的同时,会初始化一个“eventLoop”对象,我称之为事件处理器对象。结构体的关键成员变量如下所示:

struct aeEventLoop{
aeFileEvent *events;//已注册的文件事件数组
aeFiredEvent *fired;//已就绪的文件事件数组
aeTimeEvent *timeEventHead;//时间事件数组
...
}

初始化 eventLoop 在 ae.c 的“aeCreateEventLoop”方法中执行。该方法中除了初始化 eventLoop 还调用如下方法初始化了一个 epoll 实例。

/*
 * ae_epoll.c
 * 创建一个新的 epoll 实例,并将它赋值给 eventLoop
 */
static int aeApiCreate(aeEventLoop *eventLoop) {

  aeApiState *state = zmalloc(sizeof(aeApiState));

  if (!state) return -1;

  // 初始化事件槽空间
  state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
  if (!state->events) {
    zfree(state);
    return -1;
  }

  // 创建 epoll 实例
  state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
  if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
  }

  // 赋值给 eventLoop
  eventLoop->apidata = state;
  return 0;
}

也正是在此处调用了系统方法“epoll_create”。这里的state是一个aeApiState结构,如下所示:

/*
 * 事件状态
 */
typedef struct aeApiState {

  // epoll 实例描述符
  int epfd;

  // 事件槽
  struct epoll_event *events;

} aeApiState;

这个 state 由 eventLoop->apidata 来记录。

绑定ip端口与句柄

通过 listenToPort 方法开启TCP端口,每个IP端口会对应一个文件描述符 ipfd(因为服务器可能会有多个ip地址)

// 打开 TCP 监听端口,用于等待客户端的命令请求
if (server.port != 0 &&
  listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
  exit(1);

注意:*eventLoop 和 ipfd 分别被 server.el 和 server.ipfd[] 引用。server 是结构体 RedisServer 的实例,是Redis的全局变量。

注册事件

如下所示代码,为每一个文件描述符绑定一个事件函数

// initServer方法:
for (j = 0; j < server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
    {
      redisPanic(
        "Unrecoverable error creating server.ipfd file event.");
    }
}
// ae.c 中的 aeCreateFileEvent 方法
/*
 * 根据 mask 参数的值,监听 fd 文件的状态,
 * 当 fd 可用时,执行 proc 函数
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
{
  if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
  }

  if (fd >= eventLoop->setsize) return AE_ERR;

  // 取出文件事件结构
  aeFileEvent *fe = &eventLoop->events[fd];

  // 监听指定 fd 的指定事件
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

  // 设置文件事件类型,以及事件的处理器
  fe->mask |= mask;
  if (mask & AE_READABLE) fe->rfileProc = proc;
  if (mask & AE_WRITABLE) fe->wfileProc = proc;

  // 私有数据
  fe->clientData = clientData;

  // 如果有需要,更新事件处理器的最大 fd
  if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

  return AE_OK;
}

aeCreateFileEvent 函数中有一个方法调用:aeApiAddEvent,代码如下

/*
 * ae_epoll.c
 * 关联给定事件到 fd
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  aeApiState *state = eventLoop->apidata;
  struct epoll_event ee;

  /* If the fd was already monitored for some event, we need a MOD
   * operation. Otherwise we need an ADD operation. 
   *
   * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
   *
   * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
   */
  int op = eventLoop->events[fd].mask == AE_NONE "htmlcode">
/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
      eventLoop->beforesleep(eventLoop);

    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

上述代码会调用 aeProcessEvents 方法用于处理事件,方法如下所示

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 * 处理所有已到达的时间事件,以及所有已就绪的文件事件。
 * 函数的返回值为已处理事件的数量
 */
 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  int processed = 0, numevents;

  /* Nothing to do"htmlcode">
/*
 * 获取可执行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  aeApiState *state = eventLoop->apidata;
  int retval, numevents = 0;

  // 等待时间
  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
      tvp "color: #ff0000">总结

Redis的网络模块其实是一个简易的Reactor模式。本文顺着“服务端注册事件——>接受客户端连接——>监听事件是否就绪——>执行事件”这样的路线,来分析Redis源码,描述了Redis接受客户端connect的过程。实际上NIO的思想都基本类似。

DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米

稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!

昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。

这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。

而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?