文章目录
  1. 1. System V共享内存的api
    1. 1.1. 数据结构
    2. 1.2. int shmget(key_t key, size_t size, int oflag)
    3. 1.3. void* shmat(int shmid, const void* shmaddr, int flag)
    4. 1.4. int shmdt(const void* shmaddr)
    5. 1.5. int shmctl(int shmid, int cmd, struct shmid_ds* buff)
  2. 2. 使用互斥锁同步
  3. 3. 共享内存的扩容和缩容问题
    1. 3.1. 想法:
  4. 4. 另外一个偏移量的问题
  5. 5. 《消息队列与共享内存》系列文章总结:

《UNPv2》当中说了“共享内存是可用IPC形式当中最快的。一旦这样的内存区映射到共享它的进程的地址空间,这些进程间数据的传递就不再涉及内核。这里说的不涉及内核的含义是:进程不再通过执行任何进入内核的系统调用来彼此传递数据。显然,内核必须建立允许各个进程共享该内存区的内存映射关系,然后一直管理该内存区(处理页面故障等)。”

一般的IPC是这个样子
1,服务器通过read从输入文件读,read是进入内核的系统调用,它的作用就是先让内核将输入文件的数据读入到内核的内存空间,然后从内核的内存空间复制到服务器进程。
2,服务器通过IPC的系统调用将数据放入到IPC当中去。这些IPC的系统调用将数据从进程复制到内核当中。
3,客户端通过IPC的系统调用从IPC当中取出数据,那么系统调用又把数据从内核复制到进程
4,最后通过write输出文件,当然,writeread一样,也是进入内核的系统调用,也要经过内核。
一般的IPC

而共享内存则是:
共享内存
可以看到上述的IPC的第2步和第3步的数据从进程到内核的复制过程都被省略了。因为数据从内核到进程的复制过程往往开销很大,所以共享内存的速度是IPC当中最快的。

System V共享内存的api

数据结构

对于每一个共享内存区,内核维护如下的结构:

1
2
3
4
5
6
7
8
9
10
11
struct shmid_ds {
struct ipc_perm shm_perm; /* 允许操作的数据结构 */
size_t shm_segsz; /* 共享内存的大小 */
pid_t shm_lpid; /* 最后操作共享内存的进程id */
pid_t shm_cpid; /* 创建共享内存的进程id */
shmatt_t shm_nattch; /* 当前附接 */
shmat_t shm_cnattch; /* 内核附接 */
time_t shm_atime; /* 上次附接的时间 */
time_t shm_dtime; /* 上次断接的时间 */
time_t shm_ctime; /* 这个结构上次改变的时间 */
};

int shmget(key_t key, size_t size, int oflag)

根据key_t的值来创建一个共享内存区,返回一个整数标识符,就标识了这个内存区。size以字节为单位指定内存区的大小:如果实际操作是访问一个已经存在的内存区,那么size应该为0。如果实际操作是访问一个新的共享内存区时,必须指定一个不为0的size值,创建一个新的共享内存区,该内存区被初始化为size字节的0。oflag和消息队列的oflag的意思是一样的,也是读写权限值的组合。

void* shmat(int shmid, const void* shmaddr, int flag)

shmget只是打开一个内存区,shmat提供了访问这个内存区的手段。
shmat的返回值是所指定的共享内存区在调用进程内的起始地址,至于确定这个地址的规则我觉得只需要记住这么一条:

如果shmaddr是一个空指针,那么系统替调用者选择地址。这是推荐的(也是可移植性最好的)方法。

int shmdt(const void* shmaddr)

当一个进程完成某个共享内存区的使用时,它可以调用shmdt断接这个内存区,但是并不是删除这个内存区。

int shmctl(int shmid, int cmd, struct shmid_ds* buff)

设定cmd参数为IPC_RMID,就是从系统当中删除这个共享内存。
设定cmd参数为IPC_STAT,就是返回指定共享内存区当前的shmid_ds结构,返回给buff。通过buff.shm_segsz就得到内存区的长度,避免越界。

使用互斥锁同步

《UNPv2》当中提供了互斥锁的系统调用,但是这里我想使用C++11的mutex来做。

同步当中的生产者—消费者问题,也叫做有界缓冲区问题。一个或者多个生产者(进程或线程)创建着一个个的数据条目,然后这些条目由一个或多个消费者(进程或线程)处理。System V消息队列是隐式同步的,意思就是说生产者和消费者感觉不到内核在执行同步:当生产者超前于消费者,内核就在生产者write之前把它投入到睡眠,直到消费者消费。如果消费者超前于生产者,内核就在消费者调用read之前把它投入睡眠,直到有生产者生产。

共享内存区是必须使用显式的同步

共享内存的扩容和缩容问题

因为在System V的系统调用当中,shmget一旦调用,那么生成的共享内存区的大小就固定了。但是扩容和缩容的实现并不需要一定使用系统调用,可以使用内存池的思想:先使用shmget分配出一大块固定大小的内存,然后使用者从大内存当中取出小内存。

这就是我的代码需要做的事情(简单地封装一下根本没有学习的意义)。

想法:

共享内存的扩容和缩容就是操作内存的trick,说到底就是如何高效地管理内存。这个问题我在读大学的时候研究过,想法是这样的:
1,在内存的头部分出一块内存,我把它命名为Head_t,这块头部内存区域保存着管理的信息如下:

  • 内存的总size
  • 单次分配内存的最小size
  • 单次分配内存的最大size
  • block的size
  • 已经分出去的子内存数量
  • 当前可用的偏移量
  • 回收的碎片空间的总size,可以二次使用

2,然后每次分配内存的时候都是按照block为单位来分配,没有达到block整数倍的内存空间就向上取整,让它达到block的内存空间的整数倍。

3,分配内存容易,释放内存需要一点算法。我的算法是这样的 :
首先,在Head_t当中建立起一个数组,名字取名为szFreeList。这个数组的下标值就是block的数量,而对应的数组的值就是偏移量。我举个例子:比如说我的block的size是32。然后我需要3个block大小的内存来存放我的数据,那么szFreeList这个数组的下标就是3,等到内存分配完毕,我是通过小内存对于总内存的偏移量来获取到小内存的头指针的,所以要能找到小内存,必须拿到偏移量。
好了,现在回到释放内存的问题上面。比如说我要释放这个内存,那么我就只需要把偏移量放入到szFreeList数组当中就可以了。这就变成了一个碎片空间,当下次还有需要3个block大小的内存空间需要分配的时候,首先查看szFreeList[3],如果它有值的话,那么就直接取到这个偏移量,根据这个偏移量来拿到小内存的指针,就算小内存当中还有以前的信息也没关系,反正把这块内存分配出去了就是要让新数据覆盖掉它的。
好了,现在的确解决了问题的一半。为什么说一半呢?因为还有一点需要考虑,比如说我现在分配的2块内存,A内存和B内存。2块内存都是3个block的size大小。分配的时候没问题,直接分配就好了,但是释放的时候,如果2块内存都释放了,但是szFreeList[3]只能有一个值,那么该怎么办?
解决剩下的这半个问题,我的方法是:在每一个分配的内存当中加入一个next,也就是说在A内存当中的头部,加入一个uint64_t类型的next类型的值,保存B内存的偏移量,在B内存当中的头部,也同样加入一个uint64_t类型的next类型的值,保存0。
好,现在看这样的场景,我要释放A内存和B内存,这个时候szFreeList[3]当中保存的是0。假设先释放的是A内存,那么步骤是这样的:先将szFreeList[3]当中的值赋值给A内存的next,此时是0,再将A内存的偏移量放入szFreeList[3]当中去,然后释放B内存的时候也是:现将szFreeList[3]当中的值赋值给B内存的next,此时是A内存的偏移量,然后再将B内存的偏移量放入szFreeList[3]当中去。那么通过了这个next就把两个偏移量连接了起来。
如果现在要分配3个block的内存,就直接查找szFreeList[3],得到B内存的偏移量,通过偏移量找到B内存,取出B内存的next,也就是A内存的地址,再放入szFreeList[3]当中。

共享内存的结构

我在Head_t当中写入szFreeList[0]的目的是为了方便数组越界处理。

另外一个偏移量的问题

我们知道,共享内存是用于进程间通信。比如进程A开辟了一块共享内存,直接在头部当中写入信息,然后进程B访问这个共享内存,从头部开始读数据,那么就能够获取到进程A写入到共享内存区当中的数据了。
但是,如果进程A不在头部写入呢,比如说在经过了一个offset之后再写入。那么除非B知道这个offset的值,否则无法读取。那么B如何得知这个offset呢,当然还是进程A通过进程间通信的方式把offset告诉进程B啦,那么选用何种进程间的通信方式呢?FIFO,消息队列,共享内存?当然还是用共享内存啦。
这就意味着需要2个共享内存。一个是作为存储信息的主内存。另外一个是作为管理offset的管理内存。

我把头文件的代码贴上来:

主内存的代码设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class ShmAllocator {
public:
/*
* 当server为true的时候,执行的是创建一个共享内存区
* 当server为false的时候,执行的是获取已经存在的共享内存区
*/
explicit ShmAllocator(bool server);
explicit ShmAllocator(string shmFile, uint64_t shmSize, bool server);
ShmAllocator(const ShmAllocator&) = delete;
ShmAllocator& operator=(const ShmAllocator&) = delete;
virtual ~ShmAllocator();
void Dump() const;

int GetShmID() const;

string GetShmFile() const;

uint64_t GetShmSize() const;

/*
* 获取到的是最顶部的地址,要想从内存池当中分配内存,就使用Allocate函数
*/
void* GetShmAddr() const;

/*
* 获取剩余的大块内存空间
*/
uint64_t GetFreeSize() const;

/*
* 获取全部剩余空间:大块内存空间加上所有碎片空间
*/
uint64_t GetTotalFreeSize() const;

/*
* 如果是创建一个不存在的共享内存区,那么调用Attach()之后要使用InitPHead来初始化pHead
* 如果仅仅只是访问已经存在的内存区,就不要乱动InitPHead
*/
void Attach();

void InitPHead();

void Detach();

// 关键函数:从内存池当中取出size大小的内存, 通过引用的形式将偏移量返回
void* Allocate(uint64_t size, uint64_t& offset);
// 关键函数:将ptr指向的内存释放会内存池当中,通过引用的形式将偏移量返回
bool Deallocate(void *ptr, uint64_t& offset);

//关键函数:通过offset来获取到对应的内存块的地址
void* GetMemByOffset(uint64_t offset);

// 给共享内存上锁
bool Lock();
bool Unlock();

private:
//enum hack
enum { BLOCK_SIZE = 32 }; // 对齐的SIZE, 这里不能小于sizeof(uint64_t),否则InitPHead()的时候currentOffset将会越出MAX_BYTES
enum { MAX_BYTES = 4 * 1024 * 1024 }; // 最大分配的SIZE
enum { MIN_BYTES = 8 }; // 最小分配的SIZE,设置的时候不能小于BLOCK_SIZE
enum { READY_FLAG = 1 }; // 是否已经准备好的标志值, 其实这里可以直接使用1和0

private:
// 在共享内存当中的head,存储了共享内存的信息
typedef struct {
uint64_t mutex; // 共享内存上锁,0为被锁住了,1为处于解锁状态
uint64_t memorySize; // 共享内存SIZE
uint64_t minBytes; // 单次分配最小SIZE
uint64_t maxBytes; // 单次分配最大SIZE
uint64_t blockSize; // 对齐SIZE,每次分配内存空间就是它的整数倍
uint64_t memoryCount; // 已经分配出去的内存数量
uint64_t currentOffset; // 当前可分配的地址偏移量, shmAddr_ + currentOffset就是当前可用的地址
uint64_t managedSize; // 管理中的碎片空间
int iReady; // 是否已经准备好
uint64_t szFreeList[0]; // 管理各个大小的空闲Buffer列表,之所以把它写在Head_t当中是为了利用它来进行数组越界
} Head_t;

// 每次调用Allocate从内存池当中取出内存的时候,这个数据结构就表明了分配的内存大小以及下一个可用内存的偏移量
typedef struct {
uint64_t size;
uint64_t next;
} Pointer_t;

private:
// 将size向上取整,就是取到blockSize的最小整数倍
uint64_t RoundUp(uint64_t size) const;

private:
int shmid_;

//用于ftok的shmFile_
string shmFile_;

void* shmAddr_; // 使用shmat获取到的值
uint64_t shmSize_; // 初始化共享内存的时候指定的大小

Head_t* pHead;
};

有关管理内存的设计,我的前期设计就是头部保存节点的数量,节点的数据结构就是非常简单的数组,但是这样一来遍历所有的节点很耗时,我后期打算将它改为红黑树。

管理内存的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class ManagerMem {
public:
/*
* 当server为true的时候,执行的是创建一个共享内存区
* 当server为false的时候,执行的是获取已经存在的共享内存区
*/
explicit ManagerMem(bool server);
explicit ManagerMem(string shmFile, uint64_t shmSize, bool server);
ManagerMem(const ManagerMem&) = delete;
ManagerMem& operator=(const ManagerMem&) = delete;
virtual ~ManagerMem();
void Dump() const;

public:
void Attach();

void InitPHead();

void Detach();

/*
* 获取到的是最顶部的地址
*/
void* GetShmAddr() const;

bool AddIdOffsetMapping(uint64_t id, uint64_t offset);
/*
* 返回id对应的第一个offset
*/
uint64_t GetOffsetById(uint64_t id);

bool EraseOffset(uint64_t offset);

// 给共享内存上锁
bool Lock();
bool Unlock();

private:
enum { MANAGER_MEM_BYTES = 4 * 1024 * 1024 }; // 管理内存的总长度

typedef struct {
uint64_t overwriteFlag; // 这个标记为0的时候,表示这个节点已经无效,应该被覆盖掉。标记为1的时候有效
uint64_t id; // protobuf当中的id
uint64_t offset; // 对应的offset
} Map_t; // 这就是在管理内存当中存储的节点,管理内存的ManagerMemHead_t之后,这些节点是按照普通的链表来排序
// 要想遍历这些节点效率较低,我正在思考如何使用红黑树的数据结构来存储节点
typedef struct {
uint64_t mutex; // 共享内存上锁,0为被锁住了,1为处于解锁状态
uint64_t memorySize; // 管理内存的SIZE
uint64_t nodeNum; // Map_t类型节点的数量
Map_t nodeList[0]; // 利用它来执行数组越界
} ManagerMemHead_t;

private:
int shmid_;

//用于ftok的shmFile_
string shmFile_;

void* shmAddr_; // 使用shmat获取到的值
uint64_t shmSize_; // 初始化共享内存的时候指定的大小

ManagerMemHead_t* pHead;
};

实现文件的代码请参见:https://github.com/adairjun/MQueue/blob/master/util/shm_allocator.cpp

测试文件shm_server.cppshm_client.cpp,先启动shm_server,再启动shm_client

《消息队列与共享内存》系列文章总结:

我的上个系列的文章《socket连接池SocketPool分析》系列 算是对《UNPv1》的复习,这个系列的文章 就是对UNPv2的复习。在博文视点的传世经典书丛的出版说明当中有这样一句话:孔子云:“取乎其上,得乎其中;取乎其中,得乎其下;取乎其下,则无所得矣。” Richard Stevens 大神何其高明,他的书当然是上上之选,但是我学习期间所写的博客也难免有疏漏之处。这两本书应该放在我的案头,时时刻刻准备重读。

文章目录
  1. 1. System V共享内存的api
    1. 1.1. 数据结构
    2. 1.2. int shmget(key_t key, size_t size, int oflag)
    3. 1.3. void* shmat(int shmid, const void* shmaddr, int flag)
    4. 1.4. int shmdt(const void* shmaddr)
    5. 1.5. int shmctl(int shmid, int cmd, struct shmid_ds* buff)
  2. 2. 使用互斥锁同步
  3. 3. 共享内存的扩容和缩容问题
    1. 3.1. 想法:
  4. 4. 另外一个偏移量的问题
  5. 5. 《消息队列与共享内存》系列文章总结: