基于C语言实现的KV存储引擎(一)
基于C语言实现的KV存储引擎
- 项目简介
- 整体架构
- 网络模块的实现
- recator
- proactor
- Ntyco
项目简介
本文主要是基于 C 语言来实现一个简单的 KV 存储架构,目的就是将网络模块跟实际开发结合起来。
首先我们知道对于数据的存储可以分为两种方式,一种是在内存中进行存储,像 Redis 这种,是一种内存型数据库,主要也是以 KV 的形式进行存储,提升了对应的数据的访问效率;另一种就是在磁盘中进行存储,像 MySQL 这种,是一种关系型数据库,更多的是以一种表的形式去组织的数据库,也是一种主流的数据库,但是访问的速度就稍微慢一点儿。
那么当前已经有众多的数据库的存在了,我们又为什么需要去实现一个自己的 KV 存储引擎呢?
对于 Redis,MySQL 这些数据库来说,他们可以适用于多种数据类型,我们可以理解为是一个非常大类别的实现,如果我们当前只是需要存储某些类别的数据,比如我们就单纯的进行一些短链接映射存储,用户的一些信息存储,在某些时候需要快速进行读取,我们就没必要使用 Redis 和 MySQL 这种的数据库存储的方式,也就避免了使用它们需要去考虑的一些问题,同样,我们只是单纯的去进行某些数据的存储,我们就可以自己进行优化,将对应的性能提升到最优。
我们自己实现的 KV 存储其实跟 Redis 中的 KV 存储的思想很相像,都是以键值对的方式去进行存储,比如说我们需要访问某一张图片,知道链接,输入对应的链接,就可以访问这张图片,其实这就是一种 KV 存储的方式。
整体架构
基于上面的了解,我们就可以来梳理一下我们的 KV 存储的一个整体架构,其实整个流程也可以去理解为一个请求响应的流程,我么需要进行存储,首先就要发送需要存储的数据,然后服务端进行存储,返回给我们对应的信息(成功或者失败),我们后续访问就可以直接进行看到对应的信息,我们就可以简单的理解为下面这种架构:
网络模块的实现
在前面的文章当中我们介绍了几种网络高并发的实现方式,reactor
、io_uring
和 Ntyco(协程)
,在我们的实现当中这几种实现方式都会被使用到。
首先我们需要明白的一点就是,对于网络模块与 KV 引擎模块我们是要进行分开的,网络模块只进行网络模块的工作就可以了,而 KV 引擎模块在进行协议的解析工作。
recator
在前面的章节当中,我们已经实现过 reactor 了,reacor 本质上就是将对应的 IO 管理转化为对事件的管理,其实它的思想就是使用了 IO 多路复用模型,对读写事件进行监听,通过回调函数的调用,异步的去处理读写事件,不去过分的占用核心线程的工作。
首先就是对应的封装工作,对于一个事件来说,肯定有对应的 fd,回调函数,对应的接收和发送缓冲区,我们将其封装在一个 struct 中,方便后续进行调用。
#ifndef __SERVER_H__
#define __SERVER_H__#define BUFFER_LENGTH 1024#define ENABLE_KVS 1typedef int (*RCALLBACK)(int fd);struct conn {int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union {RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;int status;
};#if ENABLE_KVS
int kvs_request(struct conn *c);
int kvs_response(struct conn *c);
#endif#endif
我们在这一块儿也提供了两个接口kvs_request
和kvs_response
,从命名就可以可以看出,他其实就是接受服务端发送过来的请以及响应,那么我们接收到对应的请求以后,就需要进行协议的制定,如何对数据进行解析,很明显kvs_request
接口中就需要与我们的 KV 引擎模块相连接,接收到请求以后就需要调用到 KV 引擎模块进行协议的解析工作。
我们来看具体的实现代码:
reactor.c
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"#define CONNECTION_SIZE 1024#define MAX_PORTS 20#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;// 请求处理
int kvs_request(struct conn *c) {// printf("recv: %d, %s\n", c->rlength, c->rbuffer);c->wlength = kvs_handler(c->rbuffer, c->rlength, c->wbuffer);return c->wlength;
}// 响应处理
int kvs_response(struct conn *c) {}int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);int epfd = 0;
struct timeval begin;struct conn conn_list[CONNECTION_SIZE] = {0};
// fdint set_event(int fd, int event, int flag) {if (flag) { // non-zero addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else { // zero modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}int event_register(int fd, int event) {if (fd < 0) return -1;conn_list[fd].fd = fd;conn_list[fd].r_action.recv_callback = recv_cb;conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);conn_list[fd].wlength = 0;set_event(fd, event, 1);
}// listenfd(sockfd) --> EPOLLIN --> accept_cb
int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);//printf("accept finshed: %d\n", clientfd);if (clientfd < 0) {printf("accept errno: %d --> %s\n", errno, strerror(errno));return -1;}event_register(clientfd, EPOLLIN); // | EPOLLETif ((clientfd % 1000) == 0) {struct timeval current;gettimeofday(¤t, NULL);int time_used = TIME_SUB_MS(current, begin);memcpy(&begin, ¤t, sizeof(struct timeval));printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);}return 0;
}int recv_cb(int fd) {memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH );int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if (count == 0) { // disconnectprintf("client disconnect: %d\n", fd);close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // unfinishedreturn 0;} else if (count < 0) { // printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;// printf("RECV: %s\n", conn_list[fd].rbuffer);#if ENABLE_KVSkvs_request(&conn_list[fd]);#endif set_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd) {
#if ENABLE_KVSkvs_response(&conn_list[fd]);#endifint count = 0;if (conn_list[fd].wlength != 0) {count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);}set_event(fd, EPOLLIN, 0);//set_event(fd, EPOLLOUT, 0);return count;
}int init_reactor_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0servaddr.sin_port = htons(port); // 0-1023, if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {printf("bind failed: %s\n", strerror(errno));}listen(sockfd, 10);//printf("listen finshed: %d\n", sockfd); // 3 return sockfd;
}int recator_entry(unsigned short port, msg_handler handler) {kvs_handler = handler;epfd = epoll_create(1);int i = 0;for (i = 0;i < MAX_PORTS; i++) {int sockfd = init_reactor_server(port + i);conn_list[sockfd].fd = sockfd;conn_list[sockfd].r_action.recv_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&begin, NULL);while (1) { // mainloopstruct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) {conn_list[connfd].r_action.recv_callback(connfd);} if (events[i].events & EPOLLOUT) {conn_list[connfd].send_callback(connfd);}}}
}
kvstore.h
#ifndef __KV_STORE__
#define __KV_STORE__#define NETWORK_RECATOR 0
#define NETWORK_PROACTOR 1
#define NETWORK_NTYCO 2#define NETWORK_TYPE NETWORK_RECATORtypedef int (*msg_handler)(char* msg, int length, char* response);const char* command[] = {"SET", "GET", "DEL", "MOD", "EXIST"
};const char* response[] = {};#endif
kvstore.c
#include <stdio.h>
#include <stdlib.h>
#include "kvstore.h"extern int recator_entry(unsigned short port, msg_handler handler);
extern int ntyco_start(unsigned short port, msg_handler handler);
extern int proactor_entry(unsigned short port, msg_handler handler);
/*
* @brief 协议解析
* @param msg 消息体
* @param length 消息体长度
* @param response 响应体
* @return 0 成功 -1 失败
*/
// 协议解析
int kvs_protocal(char* msg, int length, char* response)
{printf("recv: %d, %s\n", length, msg);}int main(int argc, char* argv[])
{if (argc != 2) {printf("Usage: %s <port>\n", argv[0]);return -1;}if (NETWORK_TYPE == NETWORK_RECATOR) {recator_entry(atoi(argv[1]), kvs_protocal);} else if (NETWORK_TYPE == NETWORK_PROACTOR) {proactor_entry(atoi(argv[1]), kvs_protocal);} else if (NETWORK_TYPE == NETWORK_NTYCO) {ntyco_start(atoi(argv[1]), kvs_protocal);}return 0;
}
kvstore.c
文件当中主要就是对于 KV 引擎的一个实现,当然 main 函数也是用在这当中的,kvs_protocal
是我们的具体协议解析的函数,我们要实现其与网络模块的互联,就可以采用函数指针的方式,无论是reactor
、proactor
或者是协程
,其实都采用这种方式,C 语言并不想 C++ 那样有包装器,所以在这儿我们就使用函数指针的方法来操作。kvstore.h
其实就是定义与实现的分离,我们当前是仿照 Redis 协议进行制定的;recator.c
主要就是kvs_request
模块,实现了与 KV 引擎的互联,但是网络模块又是与 KV 引擎模块是解耦的。
proactor
procator.c
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;struct conn_info {int fd;int event;
};int init_proactor_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}int set_event_recv(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_READ,};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int set_event_send(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_WRITE,};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_ACCEPT,};io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int proactor_entry(unsigned short port, msg_handler handler) {kvs_handler = handler;int sockfd = init_proactor_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);char buffer[BUFFER_LENGTH] = {0};while (1) {io_uring_submit(&ring);struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_waitint i = 0;for (i = 0;i < nready;i ++) {struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT) {set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);//printf("set_event_accept\n"); //int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);} else if (result.event == EVENT_READ) { //int ret = entries->res;if (ret == 0) {close(result.fd);} else if (ret > 0) {// printf("set_event_recv ret: %d, %s\n", ret, buffer); // 协议解析char response[BUFFER_LENGTH] = {0};ret = kvs_handler(buffer, ret, response);set_event_send(&ring, result.fd, response, ret, 0);}} else if (result.event == EVENT_WRITE) {int ret = entries->res;//printf("set_event_send ret: %d, %s\n", ret, buffer);set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);}}io_uring_cq_advance(&ring, nready);}
}
有了前面recator
模块的理解,我们现在就只需要在已经实现过的io_uring
代码中进行一些修改即可,将协议解析的内容添加进去。
Ntyco
当前协程实现是采用的一个 github 上开源的网络协程库组件来进行实现的,也是将对应的协议解析的内容添加进去即可:https://github.com/wangbojing/NtyCo,感兴趣的可以将对应的代码下载下来进行使用。
ntyco.c
#include "nty_coroutine.h"
#include <arpa/inet.h>typedef int (*msg_handler)(char* msg, int length, char* response);
static msg_handler kvs_handler;void server_reader(void *arg) {int fd = *(int *)arg;int ret = 0;while (1) {char buf[1024] = {0};ret = recv(fd, buf, 1024, 0);if (ret > 0) {// printf("read from server: %.*s\n", ret, buf);// 协议解析位置char response[1024] = {0};int slength = kvs_handler(buf, ret, response);ret = send(fd, response, slength, 0);if (ret == -1) {close(fd);break;}} else if (ret == 0) { close(fd);break;}}
}void server(void *arg) {unsigned short port = *(unsigned short *)arg;int fd = socket(AF_INET, SOCK_STREAM, 0);if (fd < 0) return ;struct sockaddr_in local, remote;local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));listen(fd, 20);printf("listen port : %d\n", port);while (1) {socklen_t len = sizeof(struct sockaddr_in);int cli_fd = accept(fd, (struct sockaddr*)&remote, &len);nty_coroutine *read_co;nty_coroutine_create(&read_co, server_reader, &cli_fd);}
}int ntyco_start(unsigned short port, msg_handler handler) {kvs_handler = handler;nty_coroutine *co = NULL;nty_coroutine_create(&co, server, &port);nty_schedule_run();
}
接下来可以看一下对应的实践成果:
当前与网络模块的关联已经实现完毕,后续更新请看下一篇文章。