muduo网络库学习笔记(五) 链接器Connector与监听器Acceptor

目录 muduo网络库学习笔记(五) 链接器Connector与监听器Acceptor Connector 系统函数connect 处理非阻塞connect的步骤: Connetor时序图 Acceptor 系统函数accept Socket的封装 Acceptor的封装 Acceptor时序图. 简单测试程序 Acceptor Connctor 运行日志 muduo网络库学习笔记(五) 链接器Connector与监听器Acceptor 标签: muduo Connector Acceptor 本篇继续为前面封装的EventLoop添加事件,到现在共给EventLoop添加了两个fd,Timerfd,EventFd分别用于处理定时任务和通知事件. 今天添加的Acceptor会增加另一个fd,此fd是是一个socket,用于监听套接字连接.同时封装非组赛网络编程中的connect(2)的使用Connector. Connector 在非阻塞网络编程中,发起连接的基本方式是调用connect(2),当socket变得可写时表明连接建立完毕,其中要处理各种类型的错误,我们把它封装为Connector class. Connector 和 Acceptor 设计思路基本一致,只是Acceptor通过判断套接字是否可读来执行回调,而Connector是判断套接字是否可写来执行回调. 还有一点就是错误处理,socket可写不一定就是连接建立好了 , 当连接建立出错时,套接口描述符变成既可读又可写,这时我们可以通过调用getsockopt来得到套接口上待处理的错误(SO_ERROR). 其次非阻塞网络编程中connect(2)的sockfd是一次性的,一旦出错(比如对方拒绝连接),就无法恢复,只能关闭重来。但Connector是可以反复使用的, 因此每次尝试连接都要使用新的socket文件描述符和新的Channel对象。要注意的就是Channel的生命期管理了. 系统函数connect #include /* See NOTES */ #include int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); sockfd 试图制作的一个连接到被绑定到addr指定地址的套接字。 addr和addrlen 服务端地址和长度. retrun: 成功 返回0 , 失败 返回 -1. 处理非阻塞connect的步骤: 第一步:创建非阻塞socket,返回套接口描述符; 第二步:connect(2)开始建立连接; 第三步:判断连接是否成功建立: A:如果connect返回0,表示连接建立成功, 如果错误为EINPROGRESS 表示连接正在进行,可以等待select()变的可写,通过getsockopt()来来得到套接口上待处理的错误(SO_ERROR),连接是否建立成功.如果连接建立成功,这个错误值将是0,如果建立连接时遇到错误,则这个值是连接错误所对应的errno值(比如:ECONNREFUSED,ETIMEDOUT等). B: EAGAIN、EADDRINUSE、EADDRNOTAVAIL、ECONNREFUSED、ENETUNREACH 像EAGAIN 这类表明本机临时端口暂时用完的错误、可以尝试重连。 C: EACCES、EPERM、EAFNOSUPPORT、EALREADY、EBADF、EFAULT、ENOTSOCK 其他真错误像无权限,协议错误,等直接关闭套接字. Connector正是按这个步骤处理的连接. 暴露的接口只有start()和stop() start()执行上述connect的步骤. stop()关闭套接字,删除注册的通道,停止进行连接. class Connector { public: typedef std::function NewConnectionCallback; Connector(EventLoop* loop, const InetAddress& serverAddr); ~Connector(); void setNewConnectionCallback(const NewConnectionCallback& cb) { m_newConnectionCallBack = cb; } void start();// can be called in any thread void stop(); // can be called in any thread private: enum States { kDisconnected, kConnecting, kConnected }; static const int kMaxRetryDelayMs = 30*1000; static const int kInitRetryDelayMs = 500; void connect(); void connecting(int sockfd); void handleWrite(); void handleError(); void retry(int sockfd); int removeAndResetChannel(); void resetChannel(); void setState(States s) { m_state = s; } void startInLoop(); void stopInLoop(); EventLoop* p_loop; int m_retryDelayMs; InetAddress m_serverAddr; States m_state; std::unique_ptr p_channel; NewConnectionCallback m_newConnectionCallBack; }; Connetor时序图 Connector::Connector(EventLoop* loop, const InetAddress& serverAddr) :p_loop(loop), m_serverAddr(serverAddr), m_state(kDisconnected), m_retryDelayMs(kInitRetryDelayMs) { LOG_DEBUG << "ctor[" << this << "]"; } Connector::~Connector() { LOG_DEBUG << "dtor[" << this << "]"; assert(!p_channel); } void Connector::start() { p_loop->runInLoop(std::bind(&Connector::startInLoop, this)); } void Connector::startInLoop() { p_loop->assertInLoopThread(); assert(m_state == kDisconnected); connect(); } void Connector::stop() { p_loop->queueInLoop(std::bind(&Connector::stopInLoop, this)); } void Connector::stopInLoop() { p_loop->assertInLoopThread(); if(m_state == kConnecting) { int sockfd = removeAndResetChannel(); sockets::close(sockfd); setState(kDisconnected); } } void Connector::connect() { int sockfd = sockets::createNonblockingOrDie(m_serverAddr.family()); int ret = sockets::connect(sockfd, m_serverAddr.getSockAddr()); int savedErrno = (ret == 0) ? 0 : errno; if(ret != 0) LOG_TRACE << "connect error ("<< savedErrno << ") : " << strerror_tl(savedErrno); switch(savedErrno) { case 0: case EINPROGRESS: //Operation now in progress case EINTR: //Interrupted system call case EISCONN: //Transport endpoint is already connected connecting(sockfd); break; case EAGAIN: case EADDRINUSE: case EADDRNOTAVAIL: case ECONNREFUSED: case ENETUNREACH: retry(sockfd); LOG_SYSERR << "reSave Error. " << savedErrno; break; case EACCES: case EPERM: case EAFNOSUPPORT: case EALREADY: case EBADF: case EFAULT: case ENOTSOCK: LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); break; default: LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); // connectErrorCallback_(); break; } } void Connector::connecting(int sockfd) { LOG_TRACE << "Connector::connecting] sockfd : " << sockfd; setState(kConnecting); assert(!p_channel); p_channel.reset(new Channel(p_loop, sockfd)); p_channel->setWriteCallBack(std::bind(&Connector::handleWrite, this)); //p_channel->setErrorCallback() //enableWriting if Channel Writeable ,Connect Success. p_channel->enableWriting(); } void Connector::retry(int sockfd) { sockets::close(sockfd); setState(kDisconnected); LOG_INFO << "Connector::retry - Retry connecting to " << m_serverAddr.toIpPort() << " in " << m_retryDelayMs << " milliseconds. "; p_loop->runAfter(m_retryDelayMs/1000.0, std::bind(&Connector::startInLoop, this)); m_retryDelayMs = std::min(m_retryDelayMs * 2, kMaxRetryDelayMs); } int Connector::removeAndResetChannel() { p_channel->disableAll(); p_channel->remove(); int sockfd = p_channel->fd(); p_loop->queueInLoop(std::bind(&Connector::resetChannel, this)); return sockfd; } void Connector::resetChannel() { LOG_TRACE << "Connector::resetChannel()"; p_channel.reset(); } void Connector::handleWrite() { LOG_TRACE << "Connector::handleWrite "; if(m_state == kConnecting) { int sockfd = removeAndResetChannel(); int err = sockets::getSocketError(sockfd); if(err) { LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); } /*else if (sockets::isSelfConnect(sockfd)) { }*/ else { setState(kConnected); m_newConnectionCallBack(sockfd); } } else { assert(m_state == kDisconnected); } } void Connector::handleError() { LOG_ERROR << "Connector::handleError States " << m_state; if(m_state == kConnecting) { int sockfd = removeAndResetChannel(); int err = sockets::getSocketError(sockfd); LOG_TRACE << "SOCK_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); } } Acceptor 相较于Connector更简单,只要有socket可读,即可确认连接建立. 系统函数accept #include /* See NOTES */ #include int accept(int sockfd, struct sockaddr addr, socklen_t addrlen); #define _GNU_SOURCE /* See feature_test_macros(7) */ #include int accept4(int sockfd, struct sockaddr addr, socklen_t addrlen, int flags); sockfd socket(2)创建的文件描述符, 且已被bind(2)绑定本地地址,listen(2)使能监听. addr 用于填充远端套接字地址, 如果不需要知道远端地址,可以添NULL. addrlen 用于填充远端地址大小. flags 如果flags为0 等同于 accept. SOCK_NONBLOCK 在新打开的文件描述符设置 O_NONBLOCK 标记。在 fcntl(2) 中保存这个标记可以得到相同的效果。 SOCK_CLOEXEC 在新打开的文件描述符里设置 close-on-exec (FD_CLOEXEC) 标记。参看在open(2)里关于 O_CLOEXEC标记的描述来了解这为什么有用。 int connfd = ::accept4(sockfd, (struct sockaddr *)(addr), &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); flags 会对返回的fd connfd 设置SOCK_NONBLOCK | SOCK_CLOEXEC 标记. 如果用于监听的文件描述符没有设置nonblocking标志,且监听队列上没有挂起的连接, accept()会阻塞直到有新的连接到来. 如果此socket设置了nonblocking标记,accept() 会立即返回失败并设置 error 为 EAGAIN or EWOULDBLOCK. Socket的封装 Socket类封装一个套接字 fd 析构的时候close 管理套接字的生命期. class Socket{ public: explicit Socket(int sockfd) : m_sockfd(sockfd) { } ~Socket(); int fd() const { return m_sockfd; } void bindAddress(const InetAddress& localaddr); void listen(); int accept(int sockfd, struct sockaddr_in6* addr); int accept(InetAddress* peeraddr); private: const int m_sockfd; }; Acceptor的封装 Acceptor的数据成员包含Socket和Channel,Acceptor的Socket是服务端的监听socket,Channel用于观察此socket上的readable事件.并回调Acceptor:: handleRead(),handleRead()会调用accept(2)来接受新连接, 并回调用户callback。 class Acceptor{ public: typedef std::function NewConnectionCallBack; Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport = true); ~Acceptor(); void listen(); bool listenning() const { return m_listenning; } // get listen status. void setNewConnectionCallBack(const NewConnectionCallBack& cb) { m_newConnectionCallBack = cb; } private: void handleRead(); //处理新到的连接. EventLoop* p_loop; Socket m_acceptSocket; Channel m_acceptChannel; NewConnectionCallBack m_newConnectionCallBack; bool m_listenning; int m_idleFd; }; Acceptor时序图. Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport) :p_loop(loop), m_acceptSocket(sockets::createNonblockingOrDie(listenAddr.family())), m_acceptChannel(loop, m_acceptSocket.fd()), m_listenning(false), m_idleFd(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { assert(m_idleFd >= 0); m_acceptSocket.setReuseAddr(true); m_acceptSocket.setReuseAddr(reuseport); m_acceptSocket.bindAddress(listenAddr); m_acceptChannel.setReadCallBack( std::bind(&Acceptor::handleRead, this)); } Acceptor::~Acceptor() { m_acceptChannel.disableAll(); m_acceptChannel.remove(); ::close(m_idleFd); } void Acceptor::listen() { p_loop->assertInLoopThread(); m_listenning = true; m_acceptSocket.listen(); m_acceptChannel.enableReading(); } void Acceptor::handleRead() { p_loop->assertInLoopThread(); InetAddress peerAddr; int connfd = m_acceptSocket.accept(&peerAddr); if(connfd >= 0) { if(m_newConnectionCallBack) { m_newConnectionCallBack(connfd, peerAddr); } else { sockets::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; if(errno == EMFILE) { ::close(m_idleFd); m_idleFd = ::accept(m_acceptSocket.fd(), NULL, NULL); ::close(m_idleFd); m_idleFd = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } 简单测试程序 Acceptor void newConnetion(int sockfd, const InetAddress& peeraddr) { LOG_DEBUG << "newConnetion() : accepted a new connection from"; ::sockets::close(sockfd); } int main() { InetAddress listenAddr(8888); EventLoop loop; Acceptor acceptor(&loop, listenAddr); acceptor.setNewConnectionCallBack(newConnetion); acceptor.listen(); loop.loop(); } Connctor EventLoop* g_loop; void newConnetion(int sockfd) { LOG_DEBUG << "newConnetion() : Connected a new connection."; sockets::close(sockfd); g_loop->quit(); } int main() { EventLoop loop; g_loop = &loop; InetAddress serverAddr("127.0.0.1", 8888); Connector client(&loop, serverAddr); client.setNewConnectionCallback(newConnetion); client.start(); loop.loop(); } 运行日志 作者 —— 艾露米婭娜 出处:http://www.cnblogs.com/ailumiyana/ 除特别注明外,本站所有文章均为艾露米婭娜原创,欢迎转载分享,但请注明出处。https://www.cnblogs.com/ailumiyana/p/9973611.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信