当前位置: 首页 > news >正文

区块链 和 一致性哈希的结合

怎么结合呢?

我们先来回顾一下一致性哈希代码实现里面的结构

// Consistent holds the information about the members of the consistent hash circle.
type Consistent struct {mu sync.RWMutex // 读写锁,用于保护并发访问共享数据config         Config         // 存储配置信息hasher         Hasher         // 存储哈希器实例,直接从 config 复制过来sortedSet      []uint64       // 存储所有虚拟节点(通过成员名称+副本索引哈希得到)的哈希值,并保持升序排列。这是哈希环的骨架。partitionCount uint64         // 逻辑分区的总数,从 config.PartitionCount 转换而来loads          map[string]float64 // 存储每个真实成员当前的负载(即它拥有的逻辑分区数量)members        map[string]*Member // 存储所有真实成员的映射,键是成员的 String() 返回值,值是指向 Member 接口的指针partitions     map[int]*Member    // 核心映射:存储每个逻辑分区ID(0到PartitionCount-1)对应的真实成员(指向 Member 接口的指针)ring           map[uint64]*Member // 存储虚拟节点哈希值到真实成员的映射。它是 sortedSet 的补充,sortedSet 提供有序列表,ring 提供哈希值到成员的查找。
}

现在就是加入一个新的服务器后,会修改什么内容

func (c *Consistent) add(member Member) {for i := 0; i < c.config.ReplicationFactor; i++ {key := []byte(fmt.Sprintf("%s%d", member.String(), i))h := c.hasher.Sum64(key)c.ring[h] = &memberc.sortedSet = append(c.sortedSet, h)}// sort hashes ascendinglysort.Slice(c.sortedSet, func(i int, j int) bool {return c.sortedSet[i] < c.sortedSet[j]})// Storing member at this map is useful to find backup members of a partition.c.members[member.String()] = &member
}

我们每次增加服务器都会修改sortedset以及ring(虚拟节点和服务器的映射),但是如果我们能够得到服务器列表,那么是可以在本地构建sortedset和ring

分区和服务器映射需要存储到区块链中吗?不需要

它是通过确定性算法计算出来的:partitions 的内容是根据成员列表、虚拟节点哈希环和有界负载等参数,通过 distributePartitions() 方法计算得出的。这个过程是确定性的,只要输入(成员列表、配置)相同,输出(partitions 映射)也一定相同。
它与 sortedSet 是强关联的:partitions 映射的构建依赖于 sortedSet 和 ring。distributePartitions() 函数正是通过遍历 sortedSet 来为每个逻辑分区寻找归属成员的。
它是高频查询的关键:Consistent.LocateKey 方法的最后一步就是查询 partitions 映射。如果这个映射也需要从区块链上获取,那么每次键的定位都会变成一个慢速的区块链查询,这会彻底破坏系统的性能。

下面开始构建
链码
我们先创建一个链码
找一个位置存下下面的代码

package mainimport ("encoding/json""fmt""log"// "strconv"// "github.com/hyperledger/fabric-chaincode-go/shim""github.com/hyperledger/fabric-contract-api-go/contractapi"
)// ConsistentHashManager 链码的智能合约结构体
type ConsistentHashManager struct {contractapi.Contract
}// 成员列表键,用于在世界状态中存储成员列表
const membersKey = "all_members_key"// Member represents a member in the consistent hash ring.
// This is the same structure we used in our previous local Go program.
type Member struct {Name string `json:"name"`
}// InitLedger 初始化链码。
// 在这里,我们将创建一个空的成员列表并将其写入世界状态。
func (s *ConsistentHashManager) InitLedger(ctx contractapi.TransactionContextInterface) error {log.Printf("Initializing the ledger with an empty member list.")// 创建一个空成员列表members := []Member{}membersJSON, err := json.Marshal(members)if err != nil {return err}// 将空成员列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// AddMember 添加一个新成员到哈希环中。
func (s *ConsistentHashManager) AddMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to add member: %s", memberName)// 从世界状态中读取当前成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}// 如果列表不存在,创建一个空列表if membersJSON == nil {membersJSON = []byte("[]")}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 检查成员是否已存在for _, member := range members {if member.Name == memberName {return fmt.Errorf("member '%s' already exists", memberName)}}// 添加新成员members = append(members, Member{Name: memberName})// 将更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 将更新后的列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// RemoveMember 从哈希环中移除一个成员。
func (s *ConsistentHashManager) RemoveMember(ctx contractapi.TransactionContextInterface, memberName string) error {log.Printf("Attempting to remove member: %s", memberName)// 从世界状态中读取当前成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {return fmt.Errorf("member list is empty, cannot remove '%s'", memberName)}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return fmt.Errorf("failed to unmarshal member list: %w", err)}// 查找并移除成员found := falsefor i, member := range members {if member.Name == memberName {// 移除切片中的元素members = append(members[:i], members[i+1:]...)found = truebreak}}if !found {return fmt.Errorf("member '%s' not found", memberName)}// 将更新后的列表序列化membersJSON, err = json.Marshal(members)if err != nil {return err}// 将更新后的列表写入世界状态return ctx.GetStub().PutState(membersKey, membersJSON)
}// GetMembers 查询并返回当前所有成员。
func (s *ConsistentHashManager) GetMembers(ctx contractapi.TransactionContextInterface) ([]*Member, error) {log.Println("Querying for all members.")// 从世界状态中读取成员列表membersJSON, err := ctx.GetStub().GetState(membersKey)if err != nil {return nil, fmt.Errorf("failed to read from world state: %w", err)}if membersJSON == nil {log.Println("No members found, returning empty list.")return []*Member{}, nil}// 反序列化成员列表var members []Membererr = json.Unmarshal(membersJSON, &members)if err != nil {return nil, fmt.Errorf("failed to unmarshal member list: %w", err)}// 将 []Member 转换为 []*Member 以符合 contractapi 接口var result []*Memberfor i := range members {result = append(result, &members[i])}return result, nil
}func main() {chaincode, err := contractapi.NewChaincode(&ConsistentHashManager{})if err != nil {log.Panicf("Error creating consistent hash manager chaincode: %v", err)}if err := chaincode.Start(); err != nil {log.Panicf("Error starting consistent hash manager chaincode: %v", err)}
}

然后我们再运行

go mod init consistent-hash-manager # 你可以替换成你喜欢的模块名
go mod tidy

下面就可以去部署网络,部署链码

./network.sh deployCC -ccn consistent-hash-manager -ccp ../chaincode/ConsistentHashManager -ccv 1.0 -ccl go -c mychannel

接着我们运行我们的客户端代码

package mainimport ("bytes""crypto/x509""encoding/json""fmt""log""os""path"// "time""github.com/hyperledger/fabric-gateway/pkg/client""github.com/hyperledger/fabric-gateway/pkg/identity""google.golang.org/grpc""google.golang.org/grpc/credentials"
)// 定义链码名称、通道名称和网络配置路径
const (mspID       = "Org1MSP"cryptoPath  = "../../test-network/organizations/peerOrganizations/org1.example.com"certPath    = cryptoPath + "/users/User1@org1.example.com/msp/signcerts"keyPath     = cryptoPath + "/users/User1@org1.example.com/msp/keystore"tlsCertPath = cryptoPath + "/peers/peer0.org1.example.com/tls/ca.crt"peerEndpoint = "localhost:7051"         // Fabric Peer 的 Gateway 服务地址gatewayPeer  = "peer0.org1.example.com" // Gateway Peer 的主机名,用于 TLS 验证channelName   = "mychannel"chaincodeName = "consistent-hash-manager" // 使用我们自己的链码名称
)// Member 结构体,用于解析链码返回的成员列表
type Member struct {Name string `json:"name"`
}// newGrpcConnection 创建与 Gateway 服务器的 gRPC 连接
func newGrpcConnection() *grpc.ClientConn {log.Println("--> Creating gRPC client connection...")certificatePEM, err := os.ReadFile(tlsCertPath)if err != nil {log.Fatalf("Failed to read TLS certificate file at %s: %v", tlsCertPath, err)}certPool := x509.NewCertPool()if !certPool.AppendCertsFromPEM(certificatePEM) {log.Fatalf("Failed to append TLS CA certificate to pool")}transportCredentials := credentials.NewClientTLSFromCert(certPool, gatewayPeer)connection, err := grpc.NewClient(peerEndpoint, grpc.WithTransportCredentials(transportCredentials), grpc.WithBlock())if err != nil {log.Fatalf("Failed to create gRPC connection: %v", err)}log.Println("--> gRPC client connection created successfully.")return connection
}// newIdentity 为 Gateway 连接创建一个客户端身份 (X.509 证书)
func newIdentity() *identity.X509Identity {log.Println("--> Creating new client identity...")certificatePEM, err := readFirstFile(certPath)if err != nil {log.Fatalf("Failed to read certificate file from %s: %v", certPath, err)}certificate, err := identity.CertificateFromPEM(certificatePEM)if err != nil {log.Fatalf("Failed to parse identity certificate: %v", err)}id, err := identity.NewX509Identity(mspID, certificate)if err != nil {log.Fatalf("Failed to create X.509 identity: %v", err)}log.Println("--> Client identity created successfully.")return id
}// newSign 创建一个函数,该函数使用私钥从消息摘要生成数字签名。
func newSign() identity.Sign {log.Println("--> Creating new private key signer...")privateKeyPEM, err := readFirstFile(keyPath)if err != nil {log.Fatalf("Failed to read private key file from %s: %v", keyPath, err)}privateKey, err := identity.PrivateKeyFromPEM(privateKeyPEM)if err != nil {log.Fatalf("Failed to parse private key: %v", err)}sign, err := identity.NewPrivateKeySign(privateKey)if err != nil {log.Fatalf("Failed to create private key signer: %v", err)}log.Println("--> Private key signer created successfully.")return sign
}// readFirstFile 从指定目录中读取第一个文件
func readFirstFile(dirPath string) ([]byte, error) {dir, err := os.Open(dirPath)if err != nil {return nil, fmt.Errorf("failed to open directory %s: %w", dirPath, err)}defer dir.Close()fileNames, err := dir.Readdirnames(1)if err != nil {return nil, fmt.Errorf("failed to read file names from directory %s: %w", dirPath, err)}if len(fileNames) == 0 {return nil, fmt.Errorf("no files found in directory: %s", dirPath)}filePath := path.Join(dirPath, fileNames[0])fileContent, err := os.ReadFile(filePath)if err != nil {return nil, fmt.Errorf("failed to read file %s: %w", filePath, err)}return fileContent, nil
}// formatJSON 格式化 JSON 数据,使其更易读
func formatJSON(data []byte) string {if len(data) == 0 {return "[]" // 如果数据为空,返回一个空的 JSON 数组字符串}var prettyJSON bytes.Bufferif err := json.Indent(&prettyJSON, data, "", "  "); err != nil {log.Printf("Warning: Failed to parse JSON, returning raw data. Error: %v", err)return string(data)}return prettyJSON.String()
}func main() {// 1. 创建 gRPC 客户端连接clientConnection := newGrpcConnection()defer clientConnection.Close()// 2. 创建客户端身份和签名函数id := newIdentity()sign := newSign()// 3. 连接 Fabric Gatewaygw, err := client.Connect(id,client.WithSign(sign),client.WithClientConnection(clientConnection),)if err != nil {log.Fatalf("Failed to connect to Gateway: %v", err)}defer gw.Close()// 获取网络和合约对象network := gw.GetNetwork(channelName)contract := network.GetContract(chaincodeName)// memberNameToAdd := "node-a"// // 4. 调用 AddMember 提交事务// log.Printf("\n--> 提交 AddMember 事务以添加 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd)// if err != nil {// 	log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成员 '%s' 添加成功。", memberNameToAdd)// memberNameToAdd2 := "node-b"// // 4. 调用 AddMember 提交事务// log.Printf("\n--> 提交 AddMember 事务以添加 '%s'...", memberNameToAdd2)// _, err = contract.SubmitTransaction("AddMember", memberNameToAdd2)// if err != nil {// 	log.Fatalf("Failed to submit AddMember transaction: %v", err)// }// log.Printf("成员 '%s' 添加成功。", memberNameToAdd2)// 5. 调用 GetMembers 查询当前成员列表log.Println("\n--> 查询所有成员...")result, err := contract.EvaluateTransaction("GetMembers")if err != nil {log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)}log.Printf("当前成员列表:\n%s", formatJSON(result))// // 6. 调用 RemoveMember 提交事务// log.Printf("\n--> 提交 RemoveMember 事务以移除 '%s'...", memberNameToAdd)// _, err = contract.SubmitTransaction("RemoveMember", memberNameToAdd)// if err != nil {// 	log.Fatalf("Failed to submit RemoveMember transaction: %v", err)// }// log.Printf("成员 '%s' 移除成功。", memberNameToAdd)// // 7. 再次调用 GetMembers 查询以确认移除// log.Println("\n--> 再次查询所有成员以确认移除...")// result, err = contract.EvaluateTransaction("GetMembers")// if err != nil {// 	log.Fatalf("Failed to evaluate GetMembers transaction: %v", err)// }// log.Printf("移除后成员列表:\n%s", formatJSON(result))// log.Println("\n所有操作已完成。")
}
http://www.lryc.cn/news/610466.html

相关文章:

  • SpringBoot+SpringMVC常用注解
  • 可视化图解算法57:字符串的排列
  • 简要探讨大型语言模型(LLMs)的发展历史
  • AI编程助手:终结996的新希望
  • [激光原理与应用-134]:光学器件 - 图解透镜原理和元件
  • 实现三通道转单通道(灰度图)的两种加权方法
  • Pixel 4D 3.4.4.0 | 支持丰富的壁纸资源,高清画质,高度的个性化设置能力,智能推荐功能
  • Coze Loop:开源智能体自动化流程编排平台原理与实践
  • 可重复读(Repeatable Read)能解决幻读吗?
  • 【unitrix】 7.1 二进制位加法(bit_add.rs)
  • Minio部署和客户端使用 - 版本 2025-05-24T17-08-30Z
  • 县级融媒体中心备份与恢复策略(精简版3-2-1架构)
  • Javascript面试题及详细答案150道(046-060)
  • Linux 交换空间管理
  • 15个命令上手Linux!
  • 力扣top100--哈希
  • PandasAI连接LLM对MySQL数据库进行数据分析
  • 【笔记】重学单片机(51)(下)
  • ArcGIS的字段计算器生成随机数
  • 数据库提权
  • 并发编程常用工具类(下):CyclicBarrier 与 Phaser 的协同应用
  • (论文速读)RMT:Retentive+ViT的视觉新骨干
  • Hadoop HDFS 3.3.4 讲解~
  • 嵌入式知识篇---闪存
  • mysql 数据库系统坏了,物理拷贝出数据怎么读取
  • Deepoc 赋能送餐机器人:从机械执行到具身智能的革命性跨越
  • JavaScript 中的流程控制语句详解
  • 机器学习实战:逻辑回归深度解析与欺诈检测评估指标详解(二)
  • Redis缓存详解及常见问题解决方案
  • MySQL 基本操作入门指南