消息队列与共享内存(六):System V共享内存
《UNPv2》当中说了“共享内存是可用IPC形式当中最快的。一旦这样的内存区映射到共享它的进程的地址空间,这些进程间数据的传递就不再涉及内核。这里说的不涉及内核的含义是:进程不再通过执行任何进入内核的系统调用来彼此传递数据。显然,内核必须建立允许各个进程共享该内存区的内存映射关系,然后一直管理该内存区(处理页面故障等)。”
一般的IPC是这个样子
1,服务器通过read
从输入文件读,read
是进入内核的系统调用,它的作用就是先让内核将输入文件的数据读入到内核的内存空间,然后从内核的内存空间复制到服务器进程。
2,服务器通过IPC的系统调用将数据放入到IPC当中去。这些IPC的系统调用将数据从进程复制到内核当中。
3,客户端通过IPC的系统调用从IPC当中取出数据,那么系统调用又把数据从内核复制到进程
4,最后通过write
输出文件,当然,write
和read
一样,也是进入内核的系统调用,也要经过内核。
而共享内存则是:
可以看到上述的IPC的第2步和第3步的数据从进程到内核的复制过程都被省略了。因为数据从内核到进程的复制过程往往开销很大,所以共享内存的速度是IPC当中最快的。
System V共享内存的api
数据结构
对于每一个共享内存区,内核维护如下的结构:
1 | struct shmid_ds { |
int shmget(key_t key, size_t size, int oflag)
根据key_t
的值来创建一个共享内存区,返回一个整数标识符,就标识了这个内存区。size以字节为单位指定内存区的大小:如果实际操作是访问一个已经存在的内存区,那么size应该为0。如果实际操作是访问一个新的共享内存区时,必须指定一个不为0的size值,创建一个新的共享内存区,该内存区被初始化为size字节的0。oflag和消息队列的oflag的意思是一样的,也是读写权限值的组合。
void* shmat(int shmid, const void* shmaddr, int flag)
shmget
只是打开一个内存区,shmat
提供了访问这个内存区的手段。shmat
的返回值是所指定的共享内存区在调用进程内的起始地址,至于确定这个地址的规则我觉得只需要记住这么一条:
如果shmaddr是一个空指针,那么系统替调用者选择地址。这是推荐的(也是可移植性最好的)方法。
int shmdt(const void* shmaddr)
当一个进程完成某个共享内存区的使用时,它可以调用shmdt
断接这个内存区,但是并不是删除这个内存区。
int shmctl(int shmid, int cmd, struct shmid_ds* buff)
设定cmd参数为IPC_RMID
,就是从系统当中删除这个共享内存。
设定cmd参数为IPC_STAT
,就是返回指定共享内存区当前的shmid_ds结构,返回给buff。通过buff.shm_segsz就得到内存区的长度,避免越界。
使用互斥锁同步
《UNPv2》当中提供了互斥锁的系统调用,但是这里我想使用C++11的mutex来做。
同步当中的生产者—消费者问题,也叫做有界缓冲区问题。一个或者多个生产者(进程或线程)创建着一个个的数据条目,然后这些条目由一个或多个消费者(进程或线程)处理。System V消息队列是隐式同步的,意思就是说生产者和消费者感觉不到内核在执行同步:当生产者超前于消费者,内核就在生产者write之前把它投入到睡眠,直到消费者消费。如果消费者超前于生产者,内核就在消费者调用read之前把它投入睡眠,直到有生产者生产。
共享内存区是必须使用显式的同步
共享内存的扩容和缩容问题
因为在System V的系统调用当中,shmget
一旦调用,那么生成的共享内存区的大小就固定了。但是扩容和缩容的实现并不需要一定使用系统调用,可以使用内存池的思想:先使用shmget
分配出一大块固定大小的内存,然后使用者从大内存当中取出小内存。
这就是我的代码需要做的事情(简单地封装一下根本没有学习的意义)。
想法:
共享内存的扩容和缩容就是操作内存的trick,说到底就是如何高效地管理内存。这个问题我在读大学的时候研究过,想法是这样的:
1,在内存的头部分出一块内存,我把它命名为Head_t
,这块头部内存区域保存着管理的信息如下:
- 内存的总size
- 单次分配内存的最小size
- 单次分配内存的最大size
- block的size
- 已经分出去的子内存数量
- 当前可用的偏移量
- 回收的碎片空间的总size,可以二次使用
2,然后每次分配内存的时候都是按照block为单位来分配,没有达到block整数倍的内存空间就向上取整,让它达到block的内存空间的整数倍。
3,分配内存容易,释放内存需要一点算法。我的算法是这样的 :
首先,在Head_t
当中建立起一个数组,名字取名为szFreeList
。这个数组的下标值就是block的数量,而对应的数组的值就是偏移量。我举个例子:比如说我的block的size是32。然后我需要3个block大小的内存来存放我的数据,那么szFreeList
这个数组的下标就是3,等到内存分配完毕,我是通过小内存对于总内存的偏移量来获取到小内存的头指针的,所以要能找到小内存,必须拿到偏移量。
好了,现在回到释放内存的问题上面。比如说我要释放这个内存,那么我就只需要把偏移量放入到szFreeList
数组当中就可以了。这就变成了一个碎片空间,当下次还有需要3个block大小的内存空间需要分配的时候,首先查看szFreeList[3]
,如果它有值的话,那么就直接取到这个偏移量,根据这个偏移量来拿到小内存的指针,就算小内存当中还有以前的信息也没关系,反正把这块内存分配出去了就是要让新数据覆盖掉它的。
好了,现在的确解决了问题的一半。为什么说一半呢?因为还有一点需要考虑,比如说我现在分配的2块内存,A内存和B内存。2块内存都是3个block的size大小。分配的时候没问题,直接分配就好了,但是释放的时候,如果2块内存都释放了,但是szFreeList[3]
只能有一个值,那么该怎么办?
解决剩下的这半个问题,我的方法是:在每一个分配的内存当中加入一个next
,也就是说在A内存当中的头部,加入一个uint64_t
类型的next
类型的值,保存B内存的偏移量,在B内存当中的头部,也同样加入一个uint64_t
类型的next
类型的值,保存0。
好,现在看这样的场景,我要释放A内存和B内存,这个时候szFreeList[3]
当中保存的是0。假设先释放的是A内存,那么步骤是这样的:先将szFreeList[3]
当中的值赋值给A内存的next
,此时是0,再将A内存的偏移量放入szFreeList[3]
当中去,然后释放B内存的时候也是:现将szFreeList[3]
当中的值赋值给B内存的next
,此时是A内存的偏移量,然后再将B内存的偏移量放入szFreeList[3]
当中去。那么通过了这个next
就把两个偏移量连接了起来。
如果现在要分配3个block的内存,就直接查找szFreeList[3]
,得到B内存的偏移量,通过偏移量找到B内存,取出B内存的next
,也就是A内存的地址,再放入szFreeList[3]
当中。
我在Head_t
当中写入szFreeList[0]
的目的是为了方便数组越界处理。
另外一个偏移量的问题
我们知道,共享内存是用于进程间通信。比如进程A开辟了一块共享内存,直接在头部当中写入信息,然后进程B访问这个共享内存,从头部开始读数据,那么就能够获取到进程A写入到共享内存区当中的数据了。
但是,如果进程A不在头部写入呢,比如说在经过了一个offset之后再写入。那么除非B知道这个offset的值,否则无法读取。那么B如何得知这个offset呢,当然还是进程A通过进程间通信的方式把offset告诉进程B啦,那么选用何种进程间的通信方式呢?FIFO,消息队列,共享内存?当然还是用共享内存啦。
这就意味着需要2个共享内存。一个是作为存储信息的主内存。另外一个是作为管理offset的管理内存。
我把头文件的代码贴上来:
主内存的代码设计如下:
1 | class ShmAllocator { |
有关管理内存的设计,我的前期设计就是头部保存节点的数量,节点的数据结构就是非常简单的数组,但是这样一来遍历所有的节点很耗时,我后期打算将它改为红黑树。
管理内存的代码:
1 | class ManagerMem { |
实现文件的代码请参见:https://github.com/adairjun/MQueue/blob/master/util/shm_allocator.cpp
测试文件shm_server.cpp和shm_client.cpp,先启动shm_server
,再启动shm_client
。
《消息队列与共享内存》系列文章总结:
我的上个系列的文章《socket连接池SocketPool分析》系列 算是对《UNPv1》的复习,这个系列的文章 就是对UNPv2的复习。在博文视点的传世经典书丛的出版说明当中有这样一句话:孔子云:“取乎其上,得乎其中;取乎其中,得乎其下;取乎其下,则无所得矣。” Richard Stevens 大神何其高明,他的书当然是上上之选,但是我学习期间所写的博客也难免有疏漏之处。这两本书应该放在我的案头,时时刻刻准备重读。