当前位置: 首页 > news >正文

bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入

bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入

bws-rs介绍

bws-rs 是一个用 Rust 编写的轻量级 S3 协议服务端网关框架,旨在帮助开发者快速构建兼容 AWS S3 协议 的对象存储服务。该框架支持 S3 V4 签名校验,集成 Axum 作为 Web 框架,所有协议校验逻辑通过实现对应的 trait 并注册为 axum::Extension 实现非侵入式扩展,具有良好的可维护性与可插拔性。

bws-rs 可作为前端网关挂载在你已有的文件系统、对象存储系统甚至缓存引擎之前,为其提供标准化的 S3 协议兼容层,支持与 AWS CLI、MinIO Client 等主流 S3 SDK 的交互。

✅ 已支持的功能
📁 S3 协议支持列表

  • PutObject(上传对象)

  • GetObject(获取对象)

  • HeadObject(获取对象元信息)

  • DeleteObject(删除对象)

  • CreateBucket(创建桶)

  • HeadBucket(桶存在性检查)

  • ListBucket(列举所有桶)

  • DeleteBucket(删除桶)

  • GetBucketLocation(获取桶区域)

  • MultipartUpload(分片上传)

  • Range Get(部分下载)

  • Get/Put Object ACL(访问控制列表)

  • Get/Put Object Metadata(对象元数据)

  • Put Object Tagging(对象标签)

✅ MinIO SDK 兼容性验证
使用 MinIO Go SDK 进行功能验证,支持以下操作:

  • MakeBucket

  • DeleteBucket

  • ListBucket

  • ListObject

  • PutObject

  • DeleteObject

  • BucketExists

在项目中使用bws-rs: cargo add bws-rs

实现bws_rs::service::s3下对应的trait以支持对应的s3 功能

  • HeadHandler: 对应 s3 head object ,head bucket
  • GetObjectHandler: 对应s3 GetObject
  • PutObjectHandler: 对应s3 PutObject
  • DeleteObjectHandler: 对应s3 DeleteObject
  • ListObjectHandler: 对应s3 ListObject
  • CreateBucketHandler: 对应的s3 create bucket
  • ListBucketHandler: 对应s3 list bucket
  • DeleteBucketHandler: 对应s3 delete bucket
  • GetBucketLocationHandler: 对应s3 get bucket location
  • MultiUploadObjectHandler: 对应s3 MultiUpload系列操作

aceeskey 仓库需要实现bws_rs::authorization::AccesskeyStore 来提供对应accesskey的secretkey

使用示范

    use std::sync::Arc;use tokio::io::AsyncReadExt;#[derive(Default)]struct Target {}use crate::service::s3::*;impl CreateBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a CreateBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("create bucket {_bucket}");Ok(())})}}impl ListBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a ListBucketsOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<Vec<Bucket>, String>>>,> {Box::pin(async move {let datetime = chrono::Utc::now().to_rfc3339();Ok(vec![Bucket {name: "test1".to_string(),creation_date: datetime,bucket_region: "us-east-1".to_string(),}])})}}impl HeadHandler for Target {fn lookup<'a>(&self,_bucket: &str,_object: &str,) -> std::pin::Pin<Box<dyn 'a+ Send+ Sync+ std::future::Future<Output = Result<Option<HeadObjectResult>, Error>>,>,> {Box::pin(async move {let mut ret: HeadObjectResult = Default::default();ret.checksum_sha256 = Some("2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824".to_string(),);ret.content_length = Some(5);ret.etag = Some("5d41402abc4b2a76b9719d911017c592".to_string());ret.last_modified = Some(chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string(),);Ok(Some(ret))})}}impl PutObjectHandler for Target {fn handle<'a>(&'a self,opt: &PutObjectOption,bucket: &'a str,object: &'a str,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("put bucket {bucket} object {object}");let mut buff = vec![];match body.read_to_end(&mut buff).await {Ok(size) => {log::info!("get {}", unsafe {std::str::from_utf8_unchecked(&buff[..size])});}Err(err) => {log::error!("read error {err}");}}Ok(())})}}impl DeleteBucketHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteBucketOption,_bucket: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete bucket {_bucket}");Ok(())})}}impl DeleteObjectHandler for Target {fn handle<'a>(&'a self,_opt: &'a DeleteObjectOption,_object: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {log::info!("delete object {_object}");Ok(())})}}impl crate::authorization::AccesskeyStore for Target {fn get<'a>(&'a self,_accesskey: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + Sync + std::future::Future<Output = Result<Option<String>, String>>,>,> {Box::pin(async move { Ok(Some(format!("{_accesskey}12345"))) })}}impl crate::service::s3::GetObjectHandler for Target {fn handle<'a>(&'a self,bucket: &str,object: &str,opt: crate::service::s3::GetObjectOption,mut out: tokio::sync::Mutex<std::pin::Pin<std::boxed::Box<(dyn crate::utils::io::PollWrite + Send + Unpin + 'a)>,>,>,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), String>>>>{Box::pin(async move {let mut l = out.lock().await;let _ = l.poll_write(b"hello").await.map_err(|err| {log::error!("write error {err}");});Ok(())})}}impl crate::service::s3::GetBucketLocationHandler for Target {}impl MultiUploadObjectHandler for Target {fn handle_create_session<'a>(&'a self,bucket: &'a str,key: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("ffffff".to_string()) })}fn handle_upload_part<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,part_number: u32,body: &'a mut (dyn tokio::io::AsyncRead + Unpin + Send),) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move {let mut buff = Vec::new();let size = body.read_to_end(&mut buff).await.map_err(|err| log::error!("read body error {err}"))?;println!("upload part upload_id={upload_id} part_number={part_number} bucket={bucket} key={key}\n{}",unsafe { std::str::from_boxed_utf8_unchecked((&buff[..size]).into()) });Ok("5d41402abc4b2a76b9719d911017c592".to_string())})}fn handle_complete<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,//(etag,part number)data: &'a [(&'a str, u32)],opts: MultiUploadObjectCompleteOption,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<String, ()>>>>{Box::pin(async move { Ok("69a329523ce1ec88bf63061863d9cb14".to_string()) })}fn handle_abort<'a>(&'a self,bucket: &'a str,key: &'a str,upload_id: &'a str,) -> std::pin::Pin<Box<dyn 'a + Send + std::future::Future<Output = Result<(), ()>>>>{todo!()}}#[tokio::test]async fn test_server() -> Result<(), Box<dyn std::error::Error>> {let _ = tokio::fs::create_dir_all(".sys_bws").await;env_logger::builder().filter_level(log::LevelFilter::Info).init();let target = Arc::new(Target::default());let r = axum::Router::new().layer(axum::middleware::from_fn(super::handle_fn)).layer(axum::middleware::from_fn(super::handle_authorization_middleware,)).layer(axum::Extension(target.clone() as Arc<dyn PutObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn HeadHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn ListBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn CreateBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteBucketHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn DeleteObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn crate::authorization::AccesskeyStore + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetObjectHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn GetBucketLocationHandler + Send + Sync>)).layer(axum::Extension(target.clone() as Arc<dyn MultiUploadObjectHandler + Send + Sync>));let l = tokio::net::TcpListener::bind("0.0.0.0:9900").await?;axum::serve(l, r).await?;Ok(())}

golang 客户端

package testsimport ("context""io""os""testing""github.com/minio/minio-go/v7""github.com/minio/minio-go/v7/pkg/credentials"
)func TestCreateBucket(t *testing.T) {creds, err := minio.New("127.0.0.1:9900", &minio.Options{Secure: false, Creds: credentials.NewStaticV4("root", "root12345", ""),Region: "us-east-1",})if err != nil {t.Fatal(err)}_, err = creds.BucketExists(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.MakeBucket(context.Background(), "itest", minio.MakeBucketOptions{})if err != nil {t.Fatal(err)}bkts, err := creds.ListBuckets(context.Background())if err != nil {t.Fatal(err)}t.Log(bkts)err = creds.RemoveBucket(context.Background(), "test")if err != nil {t.Fatal(err)}err = creds.RemoveObject(context.Background(), "test", "test", minio.RemoveObjectOptions{})if err != nil {t.Fatal(err)}err = os.WriteFile("test.txt", []byte("hello"), 0o644)if err != nil {t.Fatal(err)}fd, err := os.OpenFile("test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()_, err = creds.PutObject(context.Background(), "test", "hello/world", fd, 5, minio.PutObjectOptions{})if err != nil {t.Fatal(err)}resp, err := creds.GetObject(context.Background(), "test", "test", minio.GetObjectOptions{})if err != nil {t.Fatal(err)}content, err := io.ReadAll(resp)if err != nil {t.Fatal(err)}if string(content) != "hello" {t.Fatal("expect hello got [" + string(content) + "]")}
}

s3 multipart 验证

package testsimport ("context""crypto/tls""fmt""log""net/http""os""testing""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/credentials""github.com/aws/aws-sdk-go-v2/service/s3""github.com/aws/aws-sdk-go-v2/service/s3/types"
)func TestS3Sdk(t *testing.T) {var (host      = "127.0.0.1"port      = 9900accesskey = "root"secretkey = "root12345"region    = "us-east-1")customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {if service == s3.ServiceID {return aws.Endpoint{URL:           fmt.Sprintf("http://%s:%d", host, port),SigningRegion: "us-east-1",}, nil}return aws.Endpoint{}, &aws.EndpointNotFoundError{}})// 加载 AWS 配置,指定自定义端点解析器cfg, err := config.LoadDefaultConfig(context.TODO(),config.WithEndpointResolverWithOptions(customResolver),config.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},},}),config.WithRegion(region),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accesskey, secretkey, "")),)if err != nil {log.Fatalf("无法加载 AWS 配置: %v", err)}// 创建 S3 客户端cli := s3.NewFromConfig(cfg, func(o *s3.Options) {o.UsePathStyle = true})var (bucket = "itest"key    = "test.txt")fd, err := os.OpenFile("./test.txt", os.O_RDONLY, 0)if err != nil {t.Fatal(err)}defer fd.Close()out, err := cli.CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{Bucket: &bucket,Key:    &key,})if err != nil {t.Fatal(err)}var upNo int32 = 1resp, err := cli.UploadPart(context.Background(), &s3.UploadPartInput{Bucket: &bucket, Key: &key, PartNumber: &upNo, UploadId: out.UploadId, Body: fd,})if err != nil {t.Fatal(err)}_, err = cli.CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{Bucket: &bucket, Key: &key, UploadId: out.UploadId, MultipartUpload: &types.CompletedMultipartUpload{Parts: []types.CompletedPart{{ETag: resp.ETag, PartNumber: &upNo,},},},})if err != nil {t.Fatal(err)}
}
http://www.lryc.cn/news/593005.html

相关文章:

  • VBA 运用LISTBOX插件,选择多个选项,并将选中的选项回车录入当前选中的单元格
  • 关于NUC+雷达+倍福组网交换机是否完全足够的问题(是否需要一个路由器)
  • 软考 系统架构设计师系列知识点之杂项集萃(113)
  • WPF为启动界面(Splash Screen)添加背景音乐
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - snowNLP库实现中文情感分析
  • 标准文件和系统文件I/O
  • 车载刷写框架 --- 关于私有节点刷写失败未报引起的反思
  • 《命令行参数与环境变量:从使用到原理的全方位解析》
  • 移除debian升级后没用的垃圾
  • laravel RedisException: Connection refused优雅草PMS项目管理系统报错解决-以及Redis 详细指南-优雅草卓伊凡
  • 2025第15届上海国际生物发酵展:聚焦合成生物与绿色制造,共启生物经济新时代
  • Rust Web 全栈开发(十):编写服务器端 Web 应用
  • 医疗AI与融合数据库的整合:挑战、架构与未来展望(下)
  • 【C# in .NET】19. 探秘抽象类:具体实现与抽象契约的桥梁
  • xss的利用
  • CS231n-2017 Lecture2图像分类笔记
  • Kafka深度解析:架构、原理与应用实践
  • [论文阅读] 人工智能 + 软件工程 | 强化学习在软件工程中的全景扫描:从应用到未来
  • windows docker-02-docker 最常用的命令汇总
  • GEO营销:AI时代的搜索优化新赛道——从DeepSeek爆火看生成式引擎优化的崛起
  • Elasticsearch 重命名索引
  • LVS 集群技术实践:NAT 与 DR 模式的配置与对比
  • 牛客-倒置字符串
  • Go语言中的类型转换与类型推断解析
  • 用 Numpy 手动实现矩阵卷积运算
  • 我们使用 Blender 和 Godot 的工作流程
  • 从车险理赔到快递签收:打通区块链与现实世界的“最后一公里”——解密预言机(Oracle)
  • 【Linux服务器】-mysql数据库数据目录迁移
  • Linux系统环境下 Node.js 20 安装实践:glibc 2.17 兼容方案与工具链优化
  • 正向代理与反向代理理解