raft, Raft协议的C 实现,BSD许可协议

分享于 

18分钟阅读

GitHub

  繁體 雙語
C implementation of the Raft Consensus protocol, BSD licensed
  • 源代码名称:raft
  • 源代码网址:http://www.github.com/willemt/raft
  • raft源代码文档
  • raft源代码下载
  • Git URL:
    git://www.github.com/willemt/raft.git
    Git Clone代码到本地:
    git clone http://www.github.com/willemt/raft
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/willemt/raft
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    
    https://travis-ci.org/willemt/raft.pnghttps://coveralls.io/repos/willemt/raft/badge.png

    Raft协议的C 实现,BSD许可协议。

    有关完整文档,请参阅 raft.h

    请参见 ticketd 了解这里库的实际使用情况。

    网络已经超出这里项目范围。 实现者将需要完成所有的管道。 库不假定具有排序或者重复检测的网络层。 这意味着你可以使用UDP进行传输。

    没有依赖项,但是 https://github.com/willemt/linked-List-queue 需要进行测试。

    建筑

    make tests

    产品质量保证

    我们使用以下方法确保库是安全的:

    单个文件合并

    源已经合并为单个 raft.h 头文件。 使用 clib 将源代码下载到项目文件夹 IE的deps 中:

    brew install clib
    clib install willemt/raft_amalgamation

    该文件存储在 deps 文件夹中,如下所示:

    deps/raft/raft.h

    如何与这里库集成

    有关如何集成这里库的示例,请参见 ticketd

    如果你不能访问协作程序,最容易使用两个单独的线程- 一个用于处理Raft通信,另一个用于处理客户端通信。

    请注意,这里库不是线程安全的。 你将需要确保库的函数是专门调用的。

    初始化Raft服务器

    使用 raft_new 实例化新的Raft服务器。

    void* raft = raft_new();

    我们告诉Raft服务器使用 raft_add_node 函数是什么集群配置。 例如如果集群中有 5服务器 ,我们将调用 raft_add_node 5 [2]。

    raft_add_node(raft, connection_user_data, node_id, peer_is_self);

    其中:

    • connection_user_data 是一个指向用户数据的指针。
    • peer_is_self 是布尔值,指示这是服务器索引的当前服务器。
    • node_id 是 node的唯一整数 ID。 对等点用于标识自身。 这应该是一个随机整数。
    [1]AKA raft peer"
    [2]我们还必须在raft_add_node调用中包括Raft服务器本身。 当我们为Raft服务器调用raft_add_node时,我们将peer_is_self设置为 1.

    定期调用 raft_periodic()

    我们需要定期调用 raft_periodic

    raft_periodic(raft, 1000);

    使用libuv计时器的示例:

    staticvoid__periodic(uv_timer_t* handle)
    {
     raft_periodic(sv->raft, PERIOD_MSEC);
    }uv_timer_t *periodic_req;
    periodic_req = malloc(sizeof(uv_timer_t));
    periodic_req->data = sv;uv_timer_init(&peer_loop, periodic_req);uv_timer_start(periodic_req, __periodic, 0, 1000);

    接收条目( IE。 客户机向Raft集群发送入口

    我们的Raft应用程序接收来自客户机的日志条目。

    当发生这种情况时,我们需要:

    • 将客户机重定向到Raft群集主机( 如有必要)
    • 将条目附加到我们的日志
    • 块,直到已经提交日志条目 [3]
    [3]当跨Raft群集中的大多数服务器复制日志条目时

    将条目附加到日志

    当我们想将条目附加到日志时,我们调用 raft_recv_entry

    msg_entry_response_t response;
    e = raft_recv_entry(raft, &entry, &response);

    你应该使用客户机已经发送的日志条目填充 entry 结构。 调用完成后,response 参数被填充,raft_msg_entry_response_committed 函数可用来检查日志条目是否已经提交或者未被提交。

    在提交日志条目之前一直阻塞

    当服务器从客户端接收日志条目时,它必须阻塞,直到提交该条目。 当我们的Raft服务器必须与Raft集群的其他节点复制日志条目时,这是必要的。

    raft_recv_entry 函数不阻止 ! 这意味着你需要自己实现阻塞功能。

    以下示例来自客户端线程。 这表明我们需要在客户端请求中阻止。 ticketd通过等待条件来执行阻塞,这由对等线程发出信号。 单独的线程负责处理Raft节点之间的通信。

    msg_entry_response_t response;
    e = raft_recv_entry(sv->raft, &entry, &response);if (0!= e)
     return h2oh_respond_with_error(req, 500, "BAD");/* block until the entry is committed */int done = 0;do {
     uv_cond_wait(&sv->appendentries_received, &sv->raft_lock);
     e = raft_msg_entry_response_committed(sv->raft, &r);
     switch (e)
     {
     case0:
     /* not committed yet */break;
     case1:
     done = 1;
     uv_mutex_unlock(&sv->raft_lock);
     break;
     case -1:
     uv_mutex_unlock(&sv->raft_lock);
     returnh2oh_respond_with_error(req, 400, "TRY AGAIN");
     }
    } while (!done);

    来自对等线程的ticketd的示例。 当接收到一个appendentries对等点的响应时,我们向客户端线程提交一个条目。

    e = raft_recv_appendentries_response(sv->raft, conn->node, &m.aer);uv_cond_signal(&sv->appendentries_received);

    将客户端重定向到 leader

    当我们收到来自客户的日志记录时,我们可能不是领导者。

    如果当前不是raft集群的领导者,我们必须向客户端发送重定向错误消息。 这样,客户端就可以在将来的连接中直接连接到领导者。 这使得将来的请求更快( IE。 第一次重定向后不需要重定向,直到引导更改为止。

    我们使用 raft_get_current_leader 函数来检查谁是当前的领导者。

    ticketd发送 301 HTTP重定向响应的示例:

    /* redirect to leader if needed */raft_node_t* leader = raft_get_current_leader_node(sv->raft);if (!leader)
    {
     returnh2oh_respond_with_error(req, 503, "Leader unavailable");
    }elseif (raft_node_get_id(leader)!= sv->node_id)
    {
     /* send redirect */peer_connection_t* conn = raft_node_get_udata(leader);
     char leader_url[LEADER_URL_LEN];
     statich2o_generator_t generator = { NULL, NULL };
     statich2o_iovec_t body = {. base = "",. len = 0 };
     req->res.status = 301;
     req->res.reason = "Moved Permanently";
     h2o_start_response(req, &generator);
     snprintf(leader_url, LEADER_URL_LEN, "http://%s:%d/",
     inet_ntoa(conn->addr.sin_addr), conn->http_port);
     h2o_add_header(&req->pool,
     &req->res.headers,
     H2O_TOKEN_LOCATION,
     leader_url,
     strlen(leader_url));
     h2o_send(req, &body, 1, 1);
     return0;
    }

    函数回调

    你可以使用 raft_set_callbacks 向Raft服务器提供回调。

    必须实现以下回调: send_requestvotesend_appendentriesapplylogpersist_votepersist_termlog_offerlog_pop

    设置函数回调的示例:

    raft_cbs_t raft_callbacks = {
    . send_requestvote = __send_requestvote,
    . send_appendentries = __send_appendentries,
    . applylog = __applylog,
    . persist_vote = __persist_vote,
    . persist_term = __persist_term,
    . log_offer = __raft_logentry_offer,
    . log_poll = __raft_logentry_poll,
    . log_pop = __raft_logentry_pop,
    . log = __raft_log,
    };char* user_data = "test";raft_set_callbacks(raft, &raft_callbacks, user_data);

    send_requestvote()

    对于这个回调,我们必须序列化一个 msg_requestvote_t 结构,然后将它发送到由 node 标识的对等点。

    ticketd示例展示了如何实现回调:

    staticint__send_requestvote(
     raft_server_t* raft,
     void *udata,
     raft_node_t* node,
     msg_requestvote_t* m
     )
    {
     peer_connection_t* conn = raft_node_get_udata(node);
     uv_buf_t bufs[1];
     char buf[RAFT_BUFLEN];
     msg_t msg = {
    . type = MSG_REQUESTVOTE,
    . rv = *m
     };
     __peer_msg_serialize(tpl_map("S(I$(IIII))", &msg), bufs, buf);
     int e = uv_try_write(conn->stream, bufs, 1);
     if (e <0)
     uv_fatal(e);
     return0;
    }

    send_appendentries()

    对于这个回调,我们必须序列化一个 msg_appendentries_t 结构,然后将它发送到由 node 标识的对等点。 这里结构更复杂,因为 m->entries array 可能被填充。

    ticketd示例展示了如何实现回调:

    staticint__send_appendentries(
     raft_server_t* raft,
     void *user_data,
     raft_node_t* node,
     msg_appendentries_t* m
     )
    {
     uv_buf_t bufs[3];
     peer_connection_t* conn = raft_node_get_udata(node);
     char buf[RAFT_BUFLEN], *ptr = buf;
     msg_t msg = {
    . type = MSG_APPENDENTRIES,
    . ae = {
    . term = m->term,
    . prev_log_idx = m->prev_log_idx,
    . prev_log_term = m->prev_log_term,
    . leader_commit = m->leader_commit,
    . n_entries = m->n_entries
     }
     };
     ptr += __peer_msg_serialize(tpl_map("S(I$(IIIII))", &msg), bufs, ptr);
     /* appendentries with payload */if (0 <m->n_entries)
     {
     tpl_bin tb = {
    . sz = m->entries[0].data.len,
    . addr = m->entries[0].data.buf };
     /* list of entries */ tpl_node *tn = tpl_map("IIIB",
     &m->entries[0].id,
     &m->entries[0].term,
     &m->entries[0].type,
     &tb);
     size_t sz;
     tpl_pack(tn, 0);
     tpl_dump(tn, TPL_GETSIZE, &sz);
     e = tpl_dump(tn, TPL_MEM | TPL_PREALLOCD, ptr, RAFT_BUFLEN);
     assert(0 == e);
     bufs[1].len = sz;
     bufs[1].base = ptr;
     e = uv_try_write(conn->stream, bufs, 2);
     if (e <0)
     uv_fatal(e);
     tpl_free(tn);
     }
     else {
     /* keep alive appendentries only */ e = uv_try_write(conn->stream, bufs, 1);
     if (e <0)
     uv_fatal(e);
     }
     return0;
    }

    applylog()

    这里回调是将FSM与Raft库接口所需的全部内容。 你可能希望将commit_idx保存到该回调中,具体取决于你的应用程序。

    persist_vote() & persist_term()

    这些回调只是将数据保存到磁盘,因此当Raft服务器重新引导时,它从一个有效状态开始。 这是确保安全的必要。

    log_offer()

    对于这里回调,用户需要添加日志条目。 必须将日志同步到磁盘才能返回这里回调。

    log_poll()

    对于这个回调,用户需要删除最旧的日志条目 [4]。 必须将日志同步到磁盘才能返回这里回调。

    只需要实现这里回调即可支持日志压缩。

    log_pop()

    对于这个回调,用户需要删除年轻的日志条目 [5]。 必须将日志同步到磁盘才能返回这里回调。

    [4]日志前面的日志条目
    [5]日志后面的日志条目

    来自节点的 Receving流量

    要接收 Append EntriesAppend Entries responseRequest VoteRequest Vote response 消息,需要将字节反序列化到消息结构的对应部分。

    下表显示了你需要反序列化或者反序列化的结构:

    消息类型结构函数
    附加条目msg_appendentries_traft_recv_appendentries
    追加条目响应msg_appendentries_response_traft_recv_appendentries_response
    请求投票msg_requestvote_traft_recv_requestvote
    请求投票响应msg_requestvote_response_traft_recv_requestvote_response

    下面是我们如何接收附加条目消息并对它的进行回复的示例:

    msg_appendentries_t ae;msg_appendentries_response_t response;char buf_in[1024], buf_out[1024];size_t len_in, len_out;read(socket, buf_in, &len_in);deserialize_appendentries(buf_in, len_in, &ae);
    e = raft_recv_requestvote(sv->raft, conn->node, &ae, &response);serialize_appendentries_response(&response, buf_out, &len_out);write(socket, buf_out, &len_out);

    成员资格更改

    在Raft日志上管理成员身份变更。 你需要两个日志条目才能将服务器添加到群集。 删除时只需要一个日志条目。 添加服务器有两个日志条目,因为我们需要确保新的服务器日志在进行投票之前。

    强烈建议将 node 添加到它的node ID为随机的群集中。 如果服务器曾经连接到群集,则这一点尤其重要。

    添加 node

    • 使用 raft_recv_entry 附加配置更改。 确保条目的类型设置为 RAFT_LOGTYPE_ADD_NONVOTING_NODE
    • log_offer 回调中,当检测到带有 RAFT_LOGTYPE_ADD_NONVOTING_NODE 类型的日志时,我们将通过调用 raft_add_non_voting_node 添加非投票 node。
    • 一旦 node_has_sufficient_logs 回调激发,使用 raft_recv_entry 附加配置终止日志条目。 确保条目的类型设置为 RAFT_LOGTYPE_ADD_NODE
    • log_offer 回调中,当你接收一个带有 RAFT_LOGTYPE_ADD_NODE 类型的日志时,我们将使用 raft_add_node 设置 node 投票

    删除 node的

    • 使用 raft_recv_entry 附加配置更改。 确保条目的类型设置为 RAFT_LOGTYPE_REMOVE_NODE
    • 如果检测到 log_offer 回调,当检测到类型为 RAFT_LOGTYPE_REMOVE_NODE的日志时,我们将通过调用 raft_remove_node 删除 node
    • applylog 回调中应用 RAFT_LOGTYPE_REMOVE_NODE 配置更改日志后,如果要删除它,我们将关闭服务器。

    日志压缩

    支持的日志压缩方法称为"基于内存的状态机快照快照"( Ongaro,2014 )

    这里库不发送快照( IE。 没有 send_snapshot,recv_snapshot回调可以实现)。 用户必须将快照发送到此库之外。 实现者必须序列化和反序列化快照。

    流程的工作方式如下:

    • 使用 raft_begin_snapshot 开始快照。
    • 将当前成员身份详细信息保存到快照。
    • 将有限状态机保存到快照。
    • 使用 raft_end_snapshot 结束快照。
    • send_snapshot 回调激发时,用户必须将快照发送到其他服务器。
    • 一旦对等方拥有快照,他们就调用 raft_begin_load_snapshot
    • 对等调用 raft_add_node 按快照信息的成员身份添加节点。
    • 对等点按快照信息的成员身份对节点调用 raft_node_set_voting
    • 对等点按快照信息的成员身份对节点调用 raft_node_set_active
    • 最后,对等方根据快照信息的成员身份将 raft_node_set_active 调用到节点。

    当 node 收到快照时,它可以为其他节点重用该快照。

    路线图

    • 批量友好接口- 我们可以通过添加支持批处理多个日志项的新api来加速 Raft
    • 实现可以线性化语义( Ongaro,2014 )
    • 更有效地处理只读查询( Ongaro,2014 )

    引用

    Ongaro,D。( 2014 )。一致:桥接理论和实践。 从 https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf 检索。


    相关文章