etcd-cpp-apiv3 二次封装
接口介绍
头文件
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/SyncClient.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
下面从功能介绍几个类的概念
Value :保存键值对的 key 和 value
Event:记录键值对是否改变或者被删除的状态
Response :etcd 服务器向客户端的响应
KeepAlive : 客户端向 etcd 保活数据
Client : 客户端类
Watcher : 检测 etcd 服务器上键值对是否发生改变
下面是接口
namespace etcd {
class Value {bool is_dir();//判断是否是一个目录std::string const& key() //键值对的 key 值std::string const& as_string()//键值对的 val 值int64_t lease() //用于创建租约的响应中,返回租约 ID
}
//etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
//在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {enum class EventType {PUT, //键值对新增或数据发生改变DELETE_,//键值对被删除INVALID,};enum EventType event_type() const Value& kv()const Value& prev_kv()
}
class Response {bool is_ok()std::string const& error_message()Value const& value()//当前的数值 或者 一个请求的处理结果Value const& prev_value()//之前的数值Value const& value(int index)//std::vector<Event> const& events();//触发的事件
}
class KeepAlive {KeepAlive(Client const& client, int ttl, int64_t lease_id =
0);//返回租约 IDint64_t Lease();//停止保活动作void Cancel();
}
class Client {// etcd_url: "http://127.0.0.1:2379"Client(std::string const& etcd_url,std::string const& load_balancer = "round_robin");//Put a new key-value pair 新增一个键值对pplx::task<Response> put(std::string const& key, std::string const& value);//新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)pplx::task<Response> put(std::string const& key, std::string const& value,const int64_t leaseId);//获取一个指定 key 目录下的数据列表pplx::task<Response> ls(std::string const& key);
//创建并获取一个存活 ttl 时间的租约pplx::task<Response> leasegrant(int ttl);//获取一个租约保活对象,其参数 ttl 表示租约有效时间pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int
ttl);//撤销一个指定的租约pplx::task<Response> leaserevoke(int64_t lease_id);//数据锁pplx::task<Response> lock(std::string const& key);
}
class Watcher {Watcher(Client const& client, std::string const& key, //要监控的键值对 keystd::function<void(Response)> callback, //发生改变后的回调bool recursive = false); //是否递归监控目录下的所有数据改变Watcher(std::string const& address, std::string const& key,std::function<void(Response)> callback, bool recursive = false);//阻塞等待,直到监控任务被停止bool Wait();bool Cancel();
}
二次封装
设计两个类 Register Discover ,Register 用来向 etcd 服务器注册键值对,它会向 etcd 服务器申请一个租约ID,负责数据保活。Discover 查看服务器键值对,并检测键值对是否被改变或删除。
Register 类只需封装 Client KeepAlive 类即可,通过KeepAlive 得到租约ID 并且向服务器不断进行保活,Client ::put 向 etcd 服务器添加或更改键值对。代码如下
class Register
{public://构造,传入 etcd 服务器地址Register(const std::string& url = "http://127.0.0.1:2379"):_client_ptr(std::make_shared<etcd::Client>(url)),_keep_alive_ptr(std::make_shared<etcd::KeepAlive>(*(_client_ptr.get()), 3)),_lease_id(_keep_alive_ptr->Lease()){}//析构,撤销指定租约~Register(){_client_ptr->leaserevoke(_lease_id);}//向 etcd 添加服务void put(const std::string& key, const std::string& value){if(false == _client_ptr->put(key, value, _lease_id).get().is_ok()){LOG_ERROR("etcd 添加服务失败 {} : {}", key, value);}else{LOG_INFO("etcd 添加服务成功 {} : {}", key, value);}}private:std::shared_ptr<etcd::Client> _client_ptr; //客户端实体 std::shared_ptr<etcd::KeepAlive> _keep_alive_ptr; //保活int64_t _lease_id; //租约ID
};
Discover 封装 Client 和 Watcher ,检测 etcd 服务器键值对的变化。同时设置两个回调函数当键值对改变时,调用回调函数。
class Discover
{using notify_callback = std::function<void(std::string, std::string)>;public://四个参数为: etcd 服务器地址, 根目录, 服务添加回调,服务删除回调Discover(std::string url, std::string base_dir, notify_callback put_cb, notify_callback del_cd):_client_ptr(std::make_shared<etcd::Client>(url)),_watcher_ptr(std::make_shared<etcd::Watcher>(*(_client_ptr.get()), base_dir, std::bind(&Discover::callback, this, std::placeholders::_1), true)),_put_cb(put_cb),_del_cb(del_cd){//ls查看根目录下所有服务etcd::Response response = _client_ptr->ls(base_dir).get();if(false == response.is_ok()){LOG_ERROR("获取服务失败 {}", base_dir);}else{for(int i = 0; i < response.keys().size(); i++){if(_put_cb) _put_cb(response.value(i).key(), response.value(i).as_string());LOG_INFO("上线了 {} : {} 服务", response.value(i).key(), response.value(i).as_string());}}//监控根目录下服务_watcher_ptr->Wait();}~Discover(){_watcher_ptr->Cancel();}private:void callback(const etcd::Response& resp){if(false == resp.is_ok()){LOG_ERROR("监控服务失败" );return;}else{for(const etcd::Event& ev : resp.events()){if(ev.event_type() == etcd::Event::EventType::PUT){LOG_INFO("服务发生改变 {} : {} ---> {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string(), ev.kv().key(), ev.kv().as_string());if(_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());}else if(ev.event_type() == etcd::Event::EventType::DELETE_){LOG_INFO("下线了服务 {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string());if(_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());}} }}private:std::shared_ptr<etcd::Client> _client_ptr; //客户端实体 std::shared_ptr<etcd::Watcher> _watcher_ptr; //监控服务notify_callback _put_cb;notify_callback _del_cb;
};