CppGuide社区 CppGuide社区
首页
  • 最新谷歌C++风格指南(含C++17/20)
  • C++17详解
  • C++20完全指南
  • C++23快速入门
  • C++语言面试问题集锦
  • 🔥C/C++后端开发常见面试题解析 (opens new window)
  • 网络编程面试题 (opens new window)
  • 网络编程面试题 答案详解 (opens new window)
  • 聊聊WebServer作面试项目那些事儿 (opens new window)
  • 字节跳动面试官现身说 (opens new window)
  • 技术简历指南 (opens new window)
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 从零用C语言写一个Redis
  • Windows 10系统编程
  • Go语言特性

    • Go开发实用指南
    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
  • Rust编程

    • Rust编程指南
  • 数据库

    • SQL零基础指南
    • MySQL开发与调试指南
  • Linux内核

    • 心中的内核 —— 在阅读内核代码之前先理解内核
    • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
    • TCP源码实现超详细注释版.pdf (opens new window)
GitHub (opens new window)
首页
  • 最新谷歌C++风格指南(含C++17/20)
  • C++17详解
  • C++20完全指南
  • C++23快速入门
  • C++语言面试问题集锦
  • 🔥C/C++后端开发常见面试题解析 (opens new window)
  • 网络编程面试题 (opens new window)
  • 网络编程面试题 答案详解 (opens new window)
  • 聊聊WebServer作面试项目那些事儿 (opens new window)
  • 字节跳动面试官现身说 (opens new window)
  • 技术简历指南 (opens new window)
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 从零用C语言写一个Redis
  • Windows 10系统编程
  • Go语言特性

    • Go开发实用指南
    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
  • Rust编程

    • Rust编程指南
  • 数据库

    • SQL零基础指南
    • MySQL开发与调试指南
  • Linux内核

    • 心中的内核 —— 在阅读内核代码之前先理解内核
    • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
    • TCP源码实现超详细注释版.pdf (opens new window)
GitHub (opens new window)
  • 服务器资源调整
  • 初始化参数解析
  • 网络监听的建立
  • 网络连接建立
  • 内存初始化
  • 资源初始化
  • get过程
  • cas属性
  • 内存池
  • 连接队列
  • Hash表操作
  • LRU操作
  • set操作
  • do_item_alloc操作
  • item结构
  • Hash表扩容
  • 线程交互
  • 状态机
  • Memcached源码分析
zhangxf
2023-04-02

线程交互

# Memcached源码阅读十六 线程交互

Memcached按之前的分析可以知道,其是典型的Master-Worker线程模型,这种模型很典型,其工作模型是Master绑定端口,监听网络连接,接受网络连接之后,通过线程间通信来唤醒Worker线程,Worker线程已经连接的描述符执行读写操作,这种模型简化了整个通信模型,下面分析下这个过程。

case conn_listening:
    addrlen = sizeof(addr);
    //Master线程(main)进入状态机之后执行accept操作,这个操作也是非阻塞的。
    if ((sfd = accept(c->sfd, (struct sockaddr *) &addr, &addrlen)) == -1)
    {
        //非阻塞模型,这个错误码继续等待
        if (errno == EAGAIN || errno == EWOULDBLOCK)
        {
            stop = true;
        }
        //连接超载
        else if (errno == EMFILE)
        {
            if (settings.verbose > 0)
                 fprintf(stderr, "Too many open connections\n");
            accept_new_conns(false);
            stop = true;
        }
        else
        {
            perror("accept()");
            stop = true;
        }
        break;
    }
    //已经accept成功,将accept之后的描述符设置为非阻塞的
    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
            || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
    {
        perror("setting O_NONBLOCK");
        close(sfd);
        break;
    }
    //判断是否超过最大连接数
    if (settings.maxconns_fast
           && stats.curr_conns + stats.reserved_fd >= settings.maxconns - 1)
    {
        str = "ERROR Too many open connections\r\n";
        res = write(sfd, str, strlen(str));
        close(sfd);
        STATS_LOCK();
        stats.rejected_conns++;
        STATS_UNLOCK();
    }
    else
    {       

        //直线连接分发   
        dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                   DATA_BUFFER_SIZE, tcp_transport);
     }

     stop = true;
     break;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

这个是TCP的连接建立过程,由于UDP不需要建立连接,所以直接分发给Worker线程,让Worker线程进行读写操作,而TCP在建立连接之后,也执行连接分发(和UDP的一样),下面看看dispatch_conn_new内部是如何进行链接分发的。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) 
{
    //创建一个连接队列
    CQ_ITEM *item = cqi_new();
    char buf[1];
    //通过round-robin算法选择一个线程
    int tid = (last_thread + 1) % settings.num_threads;

    //thread数组存储了所有的工作线程
    LIBEVENT_THREAD *thread = threads + tid;

    //缓存这次的线程编号,下次待用
    last_thread = tid;

    //sfd表示accept之后的描述符
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    //投递item信息到Worker线程的工作队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    //在Worker线程的notify_send_fd写入字符c,表示有连接   
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

投递到子线程的连接队列之后,同时,通过往子线程的PIPE管道写入字符c来,下面我们看看子线程是如何处理的?

//子线程会在PIPE管道读上面建立libevent事件,事件回调函数是thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    //PIPE管道读取一个字节的数据
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) 
    {
    case 'c':
    //从连接队列读出Master线程投递的消息
    item = cq_pop(me->new_conn_queue);

    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);//创建连接
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

之前分析过conn_new的执行流程,conn_new里面会建立sfd的网络监听libevent事件,事件回调函数为event_handler。

event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
event_base_set(base, &c->event);
1
2

event_handler的执行流程最终会进入到业务处理的状态机中,关于状态机,后续分析。

编辑 (opens new window)
上次更新: 2023/12/11, 22:32:09
Hash表扩容
状态机

← Hash表扩容 状态机→

最近更新
01
第二章 关键字static及其不同用法
03-27
02
第一章 auto与类型推导
03-27
03
第四章 Lambda函数
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式