当前位置: 首页 > news >正文

分布微服务电商订单系统Rust编码开发[上]

1 整体设计与项目构建

1.1 系统架构设计

A. 微服务划分

订单服务:处理订单创建、查询、确认、状态更新;

支付服务:模拟支付流程,回调订单状态;

库存服务:管理商品库存,支持扣减/回滚;

统一网关:路由请求、托管前端页面(HTML/CSS/JS);

服务注册发现:Nacos 管理服务实例;

数据存储:MongoDB 存储各服务数据。

B. 技术栈

框架:Actix-web(HTTP 服务);

数据库驱动:mongodb(官方库);

服务注册:reqwest(HTTP 客户端调用 Nacos API);

异步运行时:Tokio;

配置管理:dotenv。

1.2 项目架构及其主要编码获取

采用“腾迅ima”,借助AI大模型hunyuan/deepSeek,获取项目设计框架和主要程序初始编码,如图1所示。

图1 “腾迅ima”获取“项目设计框架和主要程序初始编码”截图

1.3 主项目及其各个子项目构建

RustRoverIDE下创建主项目ecommerce-uservices,再在该目录下分别创建四个子项目:order_service、payment_service、inventory_service、gateway。在主项目的Cargo.toml文件中配置整体Workspace如下文本框所示。

[workspace]
resolver = "2"
members = ["order_service", "payment_service", "inventory_service", "gateway"]

1.4 项目文件目录结构

ecommerce-uservices/

├── gateway/                 # 统一网关服务

│   ├── src/

│   │   ├── main.rs                  # 网关入口

│   │   └── routes.rs                 # 路由配置

│   ├── templates                 # HTML页面模板

│   │   ├── index.html               # 主页面

│   │   ├── orders.html               # 订单页面

│   │   ├── payment.html             # 支付页面

│   │   └── inventory.html            # 库存页面

│   ├─ static/                    # 静态页面定义

│   │   ├── style.css                 # 全局样式

│   │   └── script.js                  # 全局脚本

│   └── Cargo.toml

├── order_service/            # 订单服务

│   ├── src/

│   │   ├── main.rs               # 服务入口

│   │   ├── handler.rs             # HTTP 请求处理

│   │   ├── model.rs              # 数据结构(订单实体)

│   │   └── db.rs                 # MongoDB 操作

│   └── Cargo.toml

├── payment_service/          # 支付服务(类似订单服务结构)

│   ├── src/

│   │   ├── main.rs               # 服务入口

│   │   ├── handler.rs             # HTTP 请求处理

│   │   ├── model.rs              # 数据结构(订单实体)

│   │   └── db.rs                 # MongoDB 操作

│   └── Cargo.toml

├── inventory_service/         # 库存服务(类似订单服务结构)

│   ├── src/

│   │   ├── main.rs               # 服务入口

│   │   ├── handler.rs             # HTTP 请求处理

│   │   ├── model.rs              # 数据结构(订单实体)

│   │   └── db.rs                 # MongoDB 操作

│   └── Cargo.toml

└── Cargo.toml               # Workspace 配置文件

2 订单服务编码设计

2.1 架构原则

  1. 分层设计:业务逻辑(handler)、数据访问(db)、模型(model)分离。
  2. 异步非阻塞:基于Tokio运行时,高并发下性能优异。
  3. 安全序列化:Serde确保JSON与结构体转换的类型安全。

2.2  订单微服务实现功能

  1. 订单创建:通过POST /orders接口创建新订单
  2. 订单查询:通过GET /orders/{order_id}接口查询订单详情
  3. 订单确认:通过PUT /orders/{order_id}/confirm接口确认订单
  4. 订单刷新:通过PUT /orders/{order_id}/refresh接口更新订单时间戳

2.3 main.rs--服务入口与路由配置

初始化数据库连接池,配置路由(创建订单、查询订单、确认订单、刷新订单)。

mod model; mod db; mod handler; use std::collections::HashMap;

use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};

use nacos_sdk::api::props::ClientProps; use handler::order_routes;

use std::net::SocketAddr; use log::info;

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // 初始化日志

    env_logger::init(); info!("Starting order service");

    // 服务配置

    let service_name = "order_service".to_string(); let group = "ecommerce".to_string();

    let ip = "127.0.0.1"; let port = 9001;

    // 创建Nacos客户端

    let client = NamingServiceBuilder::new(

        ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"),

    ).build()?;

    // 构建服务实例(修正字段类型和命名)

    let instance = ServiceInstance { service_name: Some(service_name.clone()),

        ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,

        ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),

        metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ])

    };

    // 注册服务(修正参数传递)

    client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;

    println!(" 库存服务注册成功");

    /* 服务注销逻辑(参数匹配)

    client.deregister_instance( service_name, Some(group), instance ).await?; */

    // 启动Warp服务器

    let routes = order_routes();

    let addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();

    println!("Order service running on {}", addr);

    warp::serve(routes).run(addr).await; Ok(())

}

2.4 model.rs--数据模型定义

use chrono::Utc; use mongodb::bson::oid::ObjectId; use mongodb::error::Error;

use serde::{Deserialize, Serialize}; use warp::reject::Reject;

#[derive(Debug, Serialize, Deserialize)]

pub struct Order {

    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]

    pub id: Option<ObjectId>, pub order_id: String, pub user_id: String,

    pub product_id: String, pub quantity: i32, pub total_price: f64,

    pub status: String, // "created", "confirmed", "completed", "canceled"

    pub created_at: String, pub updated_at: String, }

#[derive(Debug, Serialize, Deserialize)]

pub struct CreateOrderRequest { pub user_id: String,

    pub product_id: String, pub quantity: i32, pub total_price: f64, }

#[derive(Debug, Serialize, Deserialize)]

pub struct UpdateOrderStatusRequest { pub status: String, }

impl Order {

    pub fn new(req: CreateOrderRequest) -> Self {

        let now = Utc::now().to_rfc3339();

        Order { id: None, order_id: uuid::Uuid::new_v4().to_string(),

            user_id: req.user_id, product_id: req.product_id,

            quantity: req.quantity, total_price: req.total_price,

            status: "created".to_string(), created_at: now.clone(), updated_at: now,

        }

    }

}

#[derive(Debug, Serialize, Deserialize)]

pub struct OrderError { message: String, }

impl Reject for OrderError {}

impl OrderError {

    pub fn new(message: &Error) -> Self {

        OrderError { message: message.to_string(), }

    }

}

use warp::{Filter, Rejection, Reply}; use serde_json::json;

2.5 handler.rs--请求处理逻辑

处理HTTP请求,调用数据库操作并返回响应。

use crate::model::{Order, CreateOrderRequest, UpdateOrderStatusRequest, OrderError};

use crate::db::{create_order, get_order, update_order_status, refresh_order};

pub async fn health_check() -> Result<impl Reply, Rejection> {

    Ok(warp::reply::json(&json!({"status": "ok"})))

}

pub async fn handle_create_order(req: CreateOrderRequest) -> Result<impl Reply, Rejection> {

    let order = Order::new(req);

    match create_order(order).await {

        Ok(id) => Ok(warp::reply::json(&json!({"order_id": id}))),

        Err(e) => Err(warp::reject::custom(OrderError::new(&e))),

    }

}

pub async fn handle_get_order(order_id: String) -> Result<impl Reply, Rejection> {

    match get_order(&order_id).await {

        Ok(Some(order)) => Ok(warp::reply::json(&order)),

        Ok(None) => Err(warp::reject::not_found()),

        Err(e) => Err(warp::reject::custom(OrderError::new(&e))),

    }

}

pub async fn handle_confirm_order(order_id: String, req: UpdateOrderStatusRequest) -> Result<impl Reply, Rejection> {

    match update_order_status(&order_id, &req.status).await {

        Ok(_) => Ok(warp::reply::json(&json!({"status": "confirmed"}))),

        Err(e) => Err(warp::reject::custom(OrderError::new(&e))),

    }

}

pub async fn handle_refresh_order(order_id: String) -> Result<impl Reply, Rejection> {

    match refresh_order(&order_id).await {

        Ok(_) => Ok(warp::reply::json(&json!({"status": "refreshed"}))),

        Err(e) => Err(warp::reject::custom(OrderError::new(&e))),

    }

}

pub fn order_routes() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    let health = warp::path!("health").and(warp::get()).and_then(health_check);

    let create_order = warp::path!("orders").and(warp::post())

        .and(warp::body::json()).and_then(handle_create_order);

    let get_order = warp::path!("orders" / String).and(warp::get()).and_then(handle_get_order);

    let confirm_order = warp::path!("orders" / String / "confirm")

        .and(warp::put()).and(warp::body::json()).and_then(handle_confirm_order);

    let refresh_order = warp::path!("orders" / String / "refresh")

        .and(warp::put()).and_then(handle_refresh_order);

    health.or(create_order).or(get_order).or(confirm_order).or(refresh_order)

}

2.6 db.rs--数据库操作

use mongodb::{Client, Collection}; use mongodb::options::ClientOptions;

use crate::model::Order; use mongodb::bson::doc; use tokio::sync::OnceCell as AsyncOnceCell;

// 使用 OnceCell 来延迟初始化全局集合

static ORDER_COLLECTION: AsyncOnceCell<Collection<Order>> = AsyncOnceCell::const_new();

// 异步初始化 MongoDB 集合

async fn init_collection() -> Collection<Order> {

    let client_options = ClientOptions::parse("mongodb://localhost:27017")

        .await.expect("Failed to parse MongoDB connection string");

    let client = Client::with_options(client_options)

        .expect("Failed to create MongoDB client");

    client.database("ecommerce1").collection("orders")

}

// 获取集合实例

async fn get_collection() -> &'static Collection<Order> {

    ORDER_COLLECTION.get_or_init(init_collection).await

}

pub async fn create_order(order: Order) -> Result<String, mongodb::error::Error> {

    let collection = get_collection().await;

    let result = collection.insert_one(order, None).await?;

    Ok(result.inserted_id.to_string())

}

pub async fn get_order(order_id: &str) -> Result<Option<Order>, mongodb::error::Error> {

    let collection = get_collection().await;

    let filter = doc! { "order_id": order_id };

    collection.find_one(filter, None).await

}

pub async fn update_order_status(order_id: &str, status: &str) -> Result<(), mongodb::error::Error> {

    let collection = get_collection().await;

    let filter = doc! { "order_id": order_id };

    let update = doc! { "$set": { "status": status, "updated_at": chrono::Utc::now().to_rfc3339() } };

    collection.update_one(filter, update, None).await?; Ok(())

}

pub async fn refresh_order(order_id: &str) -> Result<(), mongodb::error::Error> {

    let collection = get_collection().await;

    let filter = doc! { "order_id": order_id };

    let update = doc! { "$set": { "updated_at": chrono::Utc::now().to_rfc3339() } };

    collection.update_one(filter, update, None).await?; Ok(())

}

2.7 Cargo.toml--依赖配置

[package]
name = "order_service"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.0", feature = ["sync"] }
lazy_static = "1.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "0.8", features = ["v4"] }
env_logger = "0.11"
log = "0.4"
nacos-sdk = "0.4.0"

3 支付服务编码设计

3.1 功能设计

A. 支付流程

用户选择订单并指定支付方式(微信/支付宝);

系统模拟支付处理(80%成功率);

更新订单状态和支付状态。

B. 订单查询

支持按订单ID查询单个订单;

支持按用户ID、支付状态等多条件查询订单列表;

C. NACOS服务注册

服务启动时自动注册到NACOS;

定期发送心跳维持服务健康状态。

D. MongoDB集成

使用官方MongoDB Rust驱动;

支持订单的CRUD操作;

支持复杂查询和更新。

E. 错误处理

自定义错误类型;

统一的错误处理机制。

3.2 main.rs--服务入口与路由配置

mod handler; mod db; mod model;

use warp::{Filter}; use std::sync::Arc; use tokio::sync::Mutex;

use crate::handler::payment_routes; use crate::db::DbClient;

use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};

use nacos_sdk::api::props::ClientProps; use std::collections::HashMap;

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // 服务配置

    let service_name = "payment_service".to_string();

    let group = "ecommerce".to_string(); let ip = "127.0.0.1"; let port = 9002;

    // 创建Nacos客户端

    let client = NamingServiceBuilder::new(

        ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"),).build()?;

    // 构建服务实例(修正字段类型和命名)

    let instance = ServiceInstance { service_name: Some(service_name.clone()),

        ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,

        ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),

        metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ]) };

    // 注册服务(修正参数传递)

    client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;

    println!(" 库存服务注册成功");

    /* 服务注销逻辑(参数匹配)

    client.deregister_instance( service_name, Some(group), instance ).await?; */

    // 初始化MongoDB客户端

    let db_client = match init_db_client().await { Ok(client) => client,

        Err(e) => { eprintln!("Failed to initialize database client: {}", e); std::process::exit(1); } };

    // 创建共享的数据库客户端

    let db = Arc::new(Mutex::new(db_client));

    // 设置路由

    let routes = payment_routes(db).with(warp::cors().allow_any_origin()).with(warp::log("payment_service"));

    println!("Payment service started on port 9002");

    warp::serve(routes).run(([127, 0, 0, 1], 9002)).await; Ok(())

}

async fn init_db_client() -> Result<DbClient, Box<dyn std::error::Error>> {

    // MongoDB连接配置

    let uri = "mongodb://localhost:27017";

    let db_name = "ecommerce1";//"payment_db";

    let collection_name = "orders";

    // 创建数据库客户端

    let client = DbClient::new(uri, db_name, collection_name).await?; Ok(client)

}

3.3 model.rs--数据模型定义

use serde::{Deserialize, Serialize}; use mongodb::bson::oid::ObjectId;

#[derive(Debug, Serialize, Deserialize)]

pub enum Currency { CNY, USD, EUR, }

#[derive(Debug, Serialize, Deserialize)]

pub enum PayStatus { Pending, Processing, Paid, Failed, Refunded, } // 待支付,处理中,已支付,支付失败,已退款

#[derive(Debug, Serialize, Deserialize)]

pub enum PaymentMethod { WeChatPay, Alipay, BankTransfer, CreditCard, }

#[derive(Debug, Serialize, Deserialize)]

pub struct PaymentRequest { pub order_id: String, pub payment_method: PaymentMethod, pub currency: Currency, }

#[derive(Debug, Serialize, Deserialize)]

pub struct PaymentResponse { pub order_id: String, pub status: PayStatus, pub payment_method: PaymentMethod,

    pub amount: f64, pub transaction_id: Option<String>, pub payment_time: Option<String>, }

#[derive(Debug, Serialize, Deserialize)]

pub struct QueryOrderRequest { pub order_id: Option<String>, pub user_id: Option<String>,

    pub pay_status: Option<PayStatus>, }

#[derive(Debug, Serialize, Deserialize)]

pub struct Order {

    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]

    pub id: Option<ObjectId>,

    pub order_id: String,

    pub user_id: String,

    pub product_id: String,

    pub quantity: i32,

    pub total_price: f64,

    pub status: String,  // "created", "confirmed", "completed", "canceled"

    pub pay_status: PayStatus,

    pub payment_method: Option<PaymentMethod>,

    pub currency: Option<Currency>,

    pub transaction_id: Option<String>,

    pub created_at: String,

    pub updated_at: String,

}

impl From<Order> for PaymentResponse {

    fn from(order: Order) -> Self {

        PaymentResponse { order_id: order.order_id, status: order.pay_status,

            payment_method: order.payment_method.unwrap_or(PaymentMethod::WeChatPay),

            amount: order.total_price, transaction_id: order.transaction_id,

            payment_time: Some(order.updated_at),

        }

    }

}

3.4 handler.rs--请求处理逻辑

处理HTTP请求,调用数据库操作并返回响应。

use warp::{Filter, Rejection, Reply}; use serde_json::json; use std::sync::Arc;

use tokio::sync::Mutex; use thiserror::Error; use rand::Rng; use crate::db::{DbClient, DbError};

use crate::model::{PaymentRequest, PaymentResponse, QueryOrderRequest, PayStatus};

#[derive(Error, Debug)]

pub enum HandlerError {

    #[error("Database error: {0}")]

    DbError(#[from] DbError),

    #[error("Payment processing error")]

    PaymentError,

    #[error("Order not found")]

    OrderNotFound,

}

impl warp::reject::Reject for HandlerError {}

pub async fn handle_payment( order_id: String, req: PaymentRequest,

    db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let order = db.get_order(&order_id).await

        .map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

    if order.is_none() { return Err(warp::reject::custom(HandlerError::OrderNotFound)); }

    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    db.process_payment(&order_id, &req.payment_method).await

        .map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

    let payment_successful = rand::thread_rng().gen_bool(0.8);

    if payment_successful {

        let order = db.complete_payment(&order_id).await

            .map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

        Ok(warp::reply::json(&json!({ "status": "success",

            "message": "Payment processed successfully", "order": PaymentResponse::from(order) })))

    } else {

        db.update_payment_status( &order_id, PayStatus::Failed, Some(&req.payment_method),

        ).await.map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

        Err(warp::reject::custom(HandlerError::PaymentError))

    }

}

pub async fn handle_get_order( order_id: String, db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let order = db.get_order(&order_id).await

        .map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

    match order { Some(order) => Ok(warp::reply::json(&PaymentResponse::from(order))),

        None => Err(warp::reject::custom(HandlerError::OrderNotFound)), }

}

pub async fn handle_query_orders( query: QueryOrderRequest,

    db: Arc<Mutex<DbClient>>, ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let orders = db.query_orders(query).await

        .map_err(|e| warp::reject::custom(HandlerError::DbError(e)))?;

    let responses = orders.into_iter().map(PaymentResponse::from).collect::<Vec<_>>();

    Ok(warp::reply::json(&responses))

}

pub fn payment_routes( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    let db_filter = warp::any().map(move || Arc::clone(&db));

    let pay_order = warp::path!("orders" / String / "pay")

        .and(warp::post()).and(warp::body::json()).and(db_filter.clone())

        .and_then(|order_id, req, db| async move { handle_payment(order_id, req, db).await });

    let get_order = warp::path!("orders" / String).and(warp::get()).and(db_filter.clone())

        .and_then(|order_id, db| async move { handle_get_order(order_id, db).await });

    let query_orders = warp::path!("orders").and(warp::get()).and(warp::query::<QueryOrderRequest>())

        .and(db_filter.clone()).and_then(|query, db| async move { handle_query_orders(query, db).await });

    let health = warp::path!("health").and(warp::get())

        .map(|| warp::reply::json(&json!({"status": "ok"})));

    pay_order.or(get_order).or(query_orders).or(health)

}

3.5 db.rs--数据库操作

use mongodb::{Client, Collection}; use mongodb::bson::{doc, to_bson};

use mongodb::options::{ClientOptions, FindOptions};

use crate::model::{Order, PayStatus, QueryOrderRequest, PaymentMethod};

use thiserror::Error; use chrono::Utc; use rand::Rng;

#[derive(Error, Debug)]

pub enum DbError {

    #[error("MongoDB error: {0}")]

    MongoError(#[from] mongodb::error::Error),

    #[error("BSON serialization error: {0}")]

    BsonError(String),

    #[error("Order not found")]

    OrderNotFound,

}

impl From<bson::ser::Error> for DbError {

    fn from(error: bson::ser::Error) -> Self { DbError::BsonError(error.to_string()) }

}

pub struct DbClient { collection: Collection<Order>, }

impl DbClient {

    pub async fn new(uri: &str, db_name: &str, collection_name: &str) -> Result<Self, DbError> {

        let client_options = ClientOptions::parse(uri).await?;

        let client = Client::with_options(client_options)?;

        let db = client.database(db_name); let collection = db.collection::<Order>(collection_name);

        Ok(Self { collection })

    }

    pub async fn get_order(&self, order_id: &str) -> Result<Option<Order>, DbError> {

        let filter = doc! { "order_id": order_id };

        let order = self.collection.find_one(filter, None).await?; Ok(order)

    }

    pub async fn update_payment_status( &self, order_id: &str, pay_status: PayStatus,

        payment_method: Option<&PaymentMethod>, ) -> Result<(), DbError> {

        let filter = doc! { "order_id": order_id };

        let update = doc! {

            "$set": { "pay_status": to_bson(&pay_status)?,

                "payment_method": payment_method.map(to_bson).transpose()?,

                "updated_at": Utc::now().to_rfc3339()

            }

        };

        let result = self.collection.update_one(filter, update, None).await?;

        if result.matched_count == 0 { return Err(DbError::OrderNotFound); }

        Ok(())

    }

    pub async fn process_payment( &self, order_id: &str,

        payment_method: &PaymentMethod, ) -> Result<Order, DbError> {

        let filter = doc! { "order_id": order_id };

        let update = doc! {

            "$set": { "pay_status": to_bson(&PayStatus::Processing)?,

                "payment_method": to_bson(payment_method)?, "updated_at": Utc::now().to_rfc3339()

            }

        };

        let options = mongodb::options::FindOneAndUpdateOptions::builder()

            .return_document(mongodb::options::ReturnDocument::After).build();

        self.collection.find_one_and_update(filter, update, options).await?

            .ok_or(DbError::OrderNotFound)

    }

    pub async fn complete_payment(&self, order_id: &str) -> Result<Order, DbError> {

        let transaction_id = format!("TRX{:08}", rand::thread_rng().gen_range(0..99999999));

        let filter = doc! { "order_id": order_id };

        let update = doc! {

            "$set": { "pay_status": to_bson(&PayStatus::Paid)?,

                "transaction_id": transaction_id, "updated_at": Utc::now().to_rfc3339()

            }

        };

        let options = mongodb::options::FindOneAndUpdateOptions::builder()

            .return_document(mongodb::options::ReturnDocument::After).build();

        self.collection.find_one_and_update(filter, update, options).await?

            .ok_or(DbError::OrderNotFound)

    }

    pub async fn query_orders(&self, query: QueryOrderRequest) -> Result<Vec<Order>, DbError> {

        let mut filter = doc! {};

        if let Some(order_id) = query.order_id { filter.insert("order_id", order_id); }

        if let Some(user_id) = query.user_id { filter.insert("user_id", user_id); }

        if let Some(pay_status) = query.pay_status { filter.insert("pay_status", to_bson(&pay_status)?); }

        let find_options = FindOptions::builder().sort(doc! { "created_at": -1 }).build();

        let mut cursor = self.collection.find(filter, find_options).await?;

        let mut orders = Vec::new();

        while cursor.advance().await? {

            orders.push(cursor.deserialize_current()?);

        }

        Ok(orders)

    }

}

3.6 Cargo.toml--依赖配置

[package]
name = "payment_service"
version = "0.1.0"
edition = "2024"
[dependencies]
warp = "0.3"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.0", feature = ["sync"] }
bson = "2.0"
chrono = { version = "0.4", features = ["serde"] }
rand = "0.8"
thiserror = "1.0"
nacos-sdk = "0.4.0"

4 库存服务编码设计

4.1 功能设计

A. 库存模型设计:

使用 stock 表示总库存量;

reserved 表示已预留库存;

available 表示可用库存(stock - reserved);

version 字段用于乐观锁控制并发更新。

B. 与订单系统的兼容性:

使用相同的 product_id 字段与订单系统关联;

预留库存时可关联 order_id 便于追踪。

C. 错误处理:

自定义 DbError 并实现 Reject trait 用于统一错误处理;

区分库存不存在和库存不足的不同错误类型。

D. 并发控制:

使用 MongoDB 的原子操作确保库存更新的正确性;

乐观锁机制防止超卖。

E. API 设计:

RESTful 风格 API;

统一前缀 /api 便于网关路由;

支持 CORS 便于前端调用。

4.2 main.rs--服务入口与路由配置

use warp::{Filter, Rejection, Reply}; use std::sync::Arc; use tokio::sync::Mutex;

use crate::{db::DbClient, handler::inventory_routes}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use log::info; use std::collections::HashMap; use nacos_sdk::api::props::ClientProps;

use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};

mod db; mod handler; mod model;

#[tokio::main]

async fn main() -> Result<(), Box<dyn std::error::Error>> {

    env_logger::init(); info!("Starting inventory service");

    // 服务配置

    let service_name = "inventory_service".to_string();

    let group = "ecommerce".to_string(); let ip = "127.0.0.1";let port = 9003;

    // 创建Nacos客户端

    let client = NamingServiceBuilder::new(

        ClientProps::new().server_addr("127.0.0.1:8848").namespace("public"), ).build()?;

    // 构建服务实例(修正字段类型和命名)

    let instance = ServiceInstance {service_name: Some(service_name.clone()),

        ip: ip.to_string(), port, weight: 1.0, healthy: true, enabled: true,

        ephemeral: true, instance_id: None, cluster_name: Some("DEFAULT".to_string()),

        metadata: HashMap::from([ ("version".into(), "1.0".into()), ("service".into(), "inventory".into()) ]) };

    // 注册服务(修正参数传递)

    client.register_instance( service_name.clone(), Some(group.clone()), instance.clone() ).await?;

    println!(" 库存服务注册成功");

    /* 服务注销逻辑(参数匹配)

    client.deregister_instance( service_name, Some(group), instance ).await?; */

    // 初始化 MongoDB 连接

    let db_client = Arc::new(Mutex::new(

        DbClient::new("mongodb://localhost:27017", "ecommerce1").await

            .map_err(|e| { log::error!("Failed to initialize database client: {}", e); e })? ));

    // 定义 API 路由

    let api = warp::path("api");

    let routes = api.and( inventory_routes(db_client.clone()).with(warp::log("inventory_service")) )

        .with(warp::cors().allow_any_origin().allow_methods(vec!["GET", "POST", "PUT", "DELETE"])

            .allow_headers(vec!["Content-Type"])).recover(handle_rejection);

    // 启动服务

    let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9003);

    info!("Starting inventory service on http://{}", socket_addr);

    warp::serve(routes).run(socket_addr).await; Ok(())

}

async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {

    let (code, message) = if err.is_not_found() {

        (warp::http::StatusCode::NOT_FOUND, "Not Found".to_string())

    } else if let Some(e) = err.find::<model::DbError>() {

        match e {

            model::DbError::InventoryNotFound =>

                (warp::http::StatusCode::NOT_FOUND, "Inventory not found".to_string()),

            model::DbError::InsufficientStock =>

                (warp::http::StatusCode::BAD_REQUEST, "Insufficient stock".to_string()),

            _ =>

                (warp::http::StatusCode::INTERNAL_SERVER_ERROR, "Database error".to_string()),

        }

    } else {

        (warp::http::StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())

    };

    Ok(warp::reply::with_status(message, code))

}

4.3 model.rs--数据模型定义

use bson::oid::ObjectId; use serde::{Deserialize, Serialize}; use mongodb::bson::DateTime;

#[derive(Debug, Serialize, Deserialize)]

pub struct Inventory {

    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]

    pub id: Option<ObjectId>,

    pub product_id: String,

    pub stock: i32,                  // 总库存量

    pub reserved: i32,               // 已预留库存

    pub available: i32,              // 可用库存

    pub last_updated: DateTime,

    pub version: i32,                // 用于乐观锁控制

}

#[derive(Debug, Serialize, Deserialize)]

pub struct InventoryUpdateRequest {

    pub product_id: String,

    pub delta: i32,                  // 正数表示增加库存,负数表示减少

}

#[derive(Debug, Serialize, Deserialize)]

pub struct InventoryQueryRequest {

    pub product_id: String,

}

#[derive(Debug, Serialize, Deserialize)]

pub struct InventoryReserveRequest {

    pub product_id: String, pub quantity: i32,

    pub order_id: Option<String>,   // 可选,关联订单ID

}

#[derive(Debug, Serialize, Deserialize)]

pub struct InventoryResponse { pub product_id: String, pub stock: i32, pub reserved: i32,

    pub available: i32, pub last_updated: String, }

#[derive(Debug, thiserror::Error)]

pub enum DbError {

    #[error("Inventory not found")]

    InventoryNotFound,

    #[error("Insufficient stock")]

    InsufficientStock,

    #[error("Database error: {0}")]

    MongoError(#[from] mongodb::error::Error),

    #[error("Serialization error: {0}")]

    BsonError(#[from] bson::ser::Error),

}

// 实现 warp Reject trait 以便错误处理

impl warp::reject::Reject for DbError {}

4.4 handler.rs--请求处理逻辑

处理HTTP请求,调用数据库操作并返回响应。

use warp::{Filter, Rejection, Reply}; use std::sync::Arc; use tokio::sync::Mutex; use crate::db::DbClient;

use crate::model::{Inventory, InventoryUpdateRequest, InventoryQueryRequest, InventoryReserveRequest, InventoryResponse};

pub fn inventory_routes( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    create_inventory(db.clone()).or(get_inventory(db.clone()))

        .or(update_inventory(db.clone())).or(reserve_inventory(db.clone()))

}

fn create_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    warp::path!("inventory").and(warp::post()).and(with_db(db))

        .and(warp::body::json()).and_then(create_inventory_handler)

}

fn get_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    warp::path!("inventory").and(warp::get()).and(with_db(db))

        .and(warp::query::<InventoryQueryRequest>()).and_then(get_inventory_handler)

}

fn update_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    warp::path!("inventory" / "update").and(warp::post())

        .and(with_db(db)).and(warp::body::json()).and_then(update_inventory_handler)

}

fn reserve_inventory( db: Arc<Mutex<DbClient>>, ) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {

    warp::path!("inventory" / "reserve").and(warp::post()).and(with_db(db))

        .and(warp::body::json()).and_then(reserve_inventory_handler)

}

fn with_db(db: Arc<Mutex<DbClient>>) -> impl Filter<Extract = (Arc<Mutex<DbClient>>,),

     Error = std::convert::Infallible> + Clone {

    warp::any().map(move || db.clone())

}

async fn create_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryQueryRequest,

    ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let inventory = db.create_inventory(&req.product_id, 0).await.map_err(warp::reject::custom)?;

    Ok(warp::reply::json(&to_response(inventory)))

}

async fn get_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryQueryRequest,

    ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let inventory = db.get_inventory(&req.product_id).await.map_err(warp::reject::custom)?;

    Ok(warp::reply::json(&to_response(inventory)))

}

async fn update_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryUpdateRequest,

    ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let inventory = db.update_inventory(&req).await.map_err(warp::reject::custom)?;

    Ok(warp::reply::json(&to_response(inventory)))

}

async fn reserve_inventory_handler( db: Arc<Mutex<DbClient>>, req: InventoryReserveRequest,

    ) -> Result<impl Reply, Rejection> {

    let db = db.lock().await;

    let inventory = db.reserve_inventory(&req).await.map_err(warp::reject::custom)?;

    Ok(warp::reply::json(&to_response(inventory)))

}

fn to_response(inventory: Inventory) -> InventoryResponse {

    InventoryResponse { product_id: inventory.product_id, stock: inventory.stock,

        reserved: inventory.reserved, available: inventory.available,

        last_updated: inventory.last_updated.to_string(),

    }

}

4.5 db.rs--数据库操作

use mongodb::{Client, Collection, options::ClientOptions}; use mongodb::bson::doc;

use crate::model::{Inventory, InventoryUpdateRequest, InventoryReserveRequest, DbError};

pub struct DbClient { inventory_collection: Collection<Inventory> }

impl DbClient {

    pub async fn new(uri: &str, db_name: &str) -> Result<Self, DbError> {

        let client_options = ClientOptions::parse(uri).await?;

        let client = Client::with_options(client_options)?;

        let db = client.database(db_name);

        Ok(Self { inventory_collection: db.collection("inventory"),

            //orders_collection: db.collection("orders"),

        })

    }

    pub async fn create_inventory(&self, product_id: &str, initial_stock: i32) -> Result<Inventory, DbError> {

        let now = bson::DateTime::now();

        let inventory = Inventory { id: None, product_id: product_id.to_string(), stock: initial_stock,

            reserved: 0, available: initial_stock, last_updated: now, version: 1, };

        let result = self.inventory_collection.insert_one(inventory, None).await?;

        let inserted_id = result.inserted_id.as_object_id().unwrap();

        self.get_inventory_by_id(&inserted_id).await

    }

    pub async fn get_inventory(&self, product_id: &str) -> Result<Inventory, DbError> {

        self.inventory_collection.find_one(doc! { "product_id": product_id }, None).await?

            .ok_or(DbError::InventoryNotFound)

    }

    pub async fn update_inventory(&self, req: &InventoryUpdateRequest) -> Result<Inventory, DbError> {

        let filter = doc! { "product_id": &req.product_id };

        let update = doc! { "$inc": { "stock": req.delta, "available": req.delta, "version": 1 },

            "$currentDate": { "last_updated": true } };

        let options = mongodb::options::FindOneAndUpdateOptions::builder()

            .return_document(mongodb::options::ReturnDocument::After).build();

        self.inventory_collection.find_one_and_update(filter, update, options).await?

            .ok_or(DbError::InventoryNotFound)

    }

    pub async fn reserve_inventory(&self, req: &InventoryReserveRequest) -> Result<Inventory, DbError> {

        let filter = doc! { "product_id": &req.product_id, "available": { "$gte": req.quantity } };

        let update = doc! { "$inc": { "reserved": req.quantity, "available": -req.quantity, "version": 1 },

            "$currentDate": { "last_updated": true }

        };

        let options = mongodb::options::FindOneAndUpdateOptions::builder()

            .return_document(mongodb::options::ReturnDocument::After).build();

        self.inventory_collection.find_one_and_update(filter, update, options).await?

            .ok_or(DbError::InsufficientStock)

    }

    async fn get_inventory_by_id(&self, id: &bson::oid::ObjectId) -> Result<Inventory, DbError> {

        self.inventory_collection.find_one(doc! { "_id": id }, None).await?

            .ok_or(DbError::InventoryNotFound)

    }

}

4.6 Cargo.toml--依赖配置

[package]
name = "inventory_service"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { version = "1.0", features = ["full"] }
warp = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
mongodb = { version = "2.4", feature = ["sync"] }
bson = "2.4"
futures = "0.3"
log = "0.4"
env_logger = "0.9"
thiserror = "1.0"
nacos-sdk = "0.4.0"

http://www.lryc.cn/news/615801.html

相关文章:

  • Rust进阶-part6-宏
  • [激光原理与应用-224]:机械 - 机械设计与加工 - 常见的术语以及含义
  • 每日算法刷题Day60:8.10:leetcode 队列5道题,用时2h
  • 机器学习-增加样本、精确率与召回率
  • Modbus RTU转Profinet网关接在线循环Na离子实现PLC读取温度值
  • C# 中常用集合以及使用场景
  • 本地WSL部署接入 whisper + ollama qwen3:14b 总结字幕增加利用 Whisper 分段信息,全新 Prompt功能
  • Framework开发之Zygote进程2(基于开源的AOSP15)--init.rc在start zygote之后的事情(详细完整版逐行代码走读)
  • 《解锁 C++ 基础密码:输入输出、缺省参数,函数重载与引用的精髓》
  • 【Linux | 网络】数据链路层
  • 九、Linux Shell脚本:运算符与表达式
  • 开启单片机
  • 服务器硬件电路设计之 I2C 问答(三):I2C 总线上可以接多少个设备?如何保证数据的准确性?
  • 笔试——Day34
  • 亚麻云之全球加速器——CloudFront(CDN)服务入门
  • 【Docker实战】Spring Boot应用容器化
  • ShadowKV 机制深度解析:高吞吐长上下文 LLM 推理的 KV 缓存“影子”方案
  • Python爬虫-爬取政务网站的文档正文内容和附件数据
  • 【后端】Java 8 特性 `User::getId` 语法(方法引用)介绍
  • 【东枫科技】NTN-IOT 卫星互联网原型系统,高达1.6G大带宽
  • MPLS特性之PHP(Penultimate Hop Popping)
  • Android快速视频解码抽帧FFmpegMediaMetadataRetriever,Kotlin(2)
  • 【软考中级网络工程师】知识点之 DCC 深度剖析
  • 【21】OpenCV C++实战篇——OpenCV C++案例实战二十七《角度测量》
  • Perplexity 为特朗普 Truth Social 提供技术支持
  • 如何培养自己工程化的能力(python项目)
  • Pytorch深度学习框架实战教程12:Pytorch混合精度推理,性能加速147%的技术实现
  • 若依前后端分离版学习笔记(八)——事务简介与使用
  • Apache Pulsar性能与可用性优化实践指南
  • NLP---IF-IDF案例分析