目录
muduo学习笔记(二)Reactor关键结构
Reactor简述
什么是Reactor
Reactor模型的优缺点
poll简述
poll使用样例
muduo Reactor关键结构
Channel
Poller
EventLoop
Reactor时序图
测试程序-单次触发的定时器
muduo学习笔记(二)Reactor关键结构
Reactor简述
什么是Reactor
Reactor是一种基于事件驱动的设计模式,即通过回调机制,我们将事件的接口注册到Reactor上,当事件发生之后,就会回调注册的接口。
Reactor必要的几个组件:
Event Multiplexer事件分发器:即一些I/O复用机制select、poll、epoll等.程序将事件源注册到分发器上,等待事件的触发,做相应处理.
Handle事件源:用于标识一个事件,Linux上是文件描述符.
Reactor反应器:用于管理事件的调度及注册删除.当有激活的事件时,则调用回调函数处理,没有则继续事件循环.
event handler事件处理器:管理已注册事件和的调度,分成不同类型的事件(读/写,定时)当事件发生,调用对应的回调函数处理.
Reactor模型的优缺点
优点
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;
缺点
Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。
poll简述
poll的使用方法与select相似,轮询多个文件描述符,有读写时设置相应的状态位,poll相比select优在没有最大文件描述符数量的限制.
# include 
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
struct pollfd {
int fd;         /* 文件描述符 */
short events;         /* 等待的事件 */
short revents;       /* 实际发生了的事件 */
} ; 
  每一个pollfd结构体指定了一个被监视的文件描述符,可以传递多个结构体,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。合法的事件如下:
  POLLIN         有数据可读。
  POLLRDNORM      有普通数据可读。
  POLLRDBAND      有优先数据可读。
  POLLPRI         有紧迫数据可读。
  POLLOUT       写数据不会导致阻塞。
  POLLWRNORM      写普通数据不会导致阻塞。
  POLLWRBAND      写优先数据不会导致阻塞。
  POLLMSGSIGPOLL     消息可用。
poll使用样例
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#define MAX_BUFFER_SIZE 1024
#define IN_FILES 1
#define MAX(a,b) ((a>b)?(a):(b))
int main(int argc ,char **argv)
{
  struct pollfd fds[3];
  char buf[1024];
  int i,res,real_read, maxfd;
  if((fds[0].fd=open("/dev/stdin",O_RDONLY|O_NONBLOCK)) < 0)
  {
    fprintf(stderr,"open data1 error:%s",strerror(errno));
    return 1;
  }
  for (i = 0; i < IN_FILES; i++)
  {
    fds[i].events = POLLIN | POLLPRI;
  }
  while(1) //|| fds[1].events || fds[2].events)
  {
    int ret = poll(fds, 1, 1000);
    if (ret < 0)
    {
      printf("Poll error : %s\n",strerror(errno));
      return 1;
    }
    if(ret == 0){
      printf("Poll timeout\n");
      continue;
    }
    for (i = 0; i< 1; i++)
    {
      if (fds[i].revents)
      {
        memset(buf, 0, MAX_BUFFER_SIZE);
        real_read = read(fds[i].fd, buf, MAX_BUFFER_SIZE);
        if (real_read < 0)
        {
          if (errno != EAGAIN)
          {
            printf("read eror : %s\n",strerror(errno));
            continue;
          }
        }
        else if (!real_read)
        {
          close(fds[i].fd);
          fds[i].events = 0;
        }
        else
        {
          if (i == 0)
          {
            buf[real_read] = '\0';
            printf("%s", buf);
            if ((buf[0] == 'q') || (buf[0] == 'Q'))
            {
              printf("quit\n");
              return 1;
            }
          }
          else
          {
            buf[real_read] = '\0';
            printf("%s", buf);
          }
        }
      }
    }
  }
  exit(0);
}
muduo Reactor关键结构
muduo Reactor最核心的事件分发机制, 即将IO multiplexing拿到的IO事件分发给各个文件描述符(fd)的事件处理函数。
Channel
Chanel目前我对它的理解是,它负责管理一个文件描述符(file descript)IO事件.
Channel会封装C的poll,把不同的IO事件分发到不同的回调:ReadCallBack、WriteCallBack等
每个Channel对象自始至终只属于一个EventLoop,因此每个Channel对象都只属于某一个IO线程。 每个Channel对象自始至终只负责一个文件描述符(fd) 的IO事件分发
#ifndef NET_CHANNEL_H
#define NET_CHANNEL_H
#include 
#include "EventLoop.hh"
class Channel {
public:
  typedef std::function EventCallBack;
  Channel(EventLoop* loop, int fd);
  ~Channel();
  void handleEvent();
  void setReadCallBack(const EventCallBack& cb) { m_readCallBack = cb; }
  void setWriteCallBack(const EventCallBack& cb) { m_writeCallBack = cb; }
  void setErrorCallBack(const EventCallBack& cb) { m_errorCallBack = cb; }
  int fd() const { return m_fd; }
  int events() const { return m_events; }
  void set_revents(int revt) { m_revents = revt; }
  bool isNoneEvent() const { return m_events == kNoneEvent; }
  void eableReading() { m_events |=  kReadEvent; update(); }
  int index() { return m_index; }
  void set_index(int idx) { m_index =idx; }
  EventLoop* ownerLoop() { return m_pLoop; }
private:
  Channel& operator=(const Channel&);
  Channel(const Channel&);
  void update();
  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;
  EventLoop* m_pLoop;
  const int m_fd;
  int m_events;    // 等待的事件
  int m_revents;   // 实际发生了的事件
  int m_index;
  EventCallBack m_readCallBack;
  EventCallBack m_writeCallBack;
  EventCallBack m_errorCallBack;
};
#endif
//Channel.cpp
#include 
#include "Channel.hh"
#include "Logger.hh"
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
Channel::Channel(EventLoop* loop, int fd)
  : m_pLoop(loop),
    m_fd(fd),
    m_events(0),
    m_revents(0),
    m_index(-1)
{
}
Channel::~Channel()
{
}
void Channel::update()
{
  m_pLoop->updateChannel(this);
}
void Channel::handleEvent()
{
  if(m_revents & POLLNVAL)
  {
    LOG_WARN << "Channel::handleEvent() POLLNVAL";
  }
  if(m_revents & (POLLERR | POLLNVAL)){
    if(m_errorCallBack) m_errorCallBack();
  }
  if(m_revents & (POLLIN | POLLPRI | POLLRDHUP)){
    if(m_readCallBack) m_readCallBack();
  }
  if(m_revents & POLLOUT){
    if(m_writeCallBack) m_writeCallBack();
  }
}
值得一提的就是 Channel::update()它会调用EventLoop::updateChannel(), 后者会转而调
用Poller::updateChannel()。Poller对象下面会讲,通过Poller::updateChannel()注册IO事件(即file descript).
Channel::handleEvent()是Channel的核心, 它由EventLoop::loop()调
用, 它的功能是根据revents发生事件的的值分别调用不同的用户回调。 这个函数以后还会扩充。
Poller
Poller class是IO multiplexing的封装。 它现在是个具体类,而在muduo中是个抽象基类,因为muduo同时支持poll(2)和epoll(4)两种IOmultiplexing机制。
Poller是EventLoop的间接成员,只供其自己在EventLoop的IO线程中调用,因此无须加锁。其生命期与EventLoop相等。
Poller并不拥有管理文件描述符事件的Channel, Channel在析构之前必须自己
unregister(EventLoop::removeChannel()) , 避免空悬指针
#ifndef _NET_POLLER_HH
#define _NET_POLLER_HH
#include 
#include