当前位置: 首页 > article >正文

【node.js】实战项目

在这里插入图片描述

个人主页:Guiat
归属专栏:node.js

在这里插入图片描述

文章目录

  • 1. 项目概览与架构设计
    • 1.1 实战项目:企业级电商管理系统
    • 1.2 技术栈选择
  • 2. 项目初始化与基础架构
    • 2.1 项目结构设计
    • 2.2 基础配置管理
  • 3. 用户服务实现
    • 3.1 用户服务架构
    • 3.2 用户模型设计
    • 3.3 用户服务控制器
  • 4. 商品服务实现
    • 4.1 商品服务数据流
    • 4.2 商品模型设计
    • 4.3 商品服务控制器
  • 5. 订单服务实现
    • 5.1 订单处理流程
    • 5.2 订单状态机
    • 5.3 订单模型设计
    • 5.4 订单服务控制器
  • 6. 支付服务实现
    • 6.1 支付系统架构
    • 6.2 支付流程图
    • 6.3 支付服务实现
  • 7. 实时通知系统
    • 7.1 通知系统架构
    • 7.2 通知流程图
    • 7.3 通知服务实现

正文

1. 项目概览与架构设计

1.1 实战项目:企业级电商管理系统

我们将构建一个完整的企业级电商管理系统,包含用户管理、商品管理、订单处理、支付系统、库存管理等核心功能。

5. 基础设施
4. 数据层
3. 微服务层
2. API网关层
1. 前端层
Docker 容器
Kubernetes 编排
监控系统
日志系统
MongoDB 集群
Redis 缓存
Elasticsearch
MySQL 主从
用户服务
商品服务
订单服务
支付服务
库存服务
通知服务
Nginx 负载均衡
API Gateway
认证中间件
管理后台 - React
用户端 - Vue.js
移动端 - React Native

1.2 技术栈选择

技术栈
1. 后端框架
2. 数据库
3. 缓存
4. 消息队列
5. 搜索引擎
6. 监控工具
Express.js
Koa.js
Fastify
MongoDB
PostgreSQL
MySQL
Redis
Memcached
RabbitMQ
Apache Kafka
Redis Pub/Sub
Elasticsearch
Solr
Prometheus
Grafana
ELK Stack

2. 项目初始化与基础架构

2.1 项目结构设计

ecommerce-system/
├── 1-services/                 # 微服务目录
│   ├── user-service/          # 用户服务
│   ├── product-service/       # 商品服务
│   ├── order-service/         # 订单服务
│   ├── payment-service/       # 支付服务
│   ├── inventory-service/     # 库存服务
│   └── notification-service/  # 通知服务
├── 2-shared/                  # 共享模块
│   ├── database/             # 数据库配置
│   ├── middleware/           # 中间件
│   ├── utils/               # 工具函数
│   └── types/               # 类型定义
├── 3-gateway/                # API网关
├── 4-config/                 # 配置文件
├── 5-scripts/                # 脚本文件
├── 6-tests/                  # 测试文件
├── 7-docs/                   # 文档
└── 8-deployment/             # 部署配置

2.2 基础配置管理

// 2-shared/config/index.js
const path = require('path');
const dotenv = require('dotenv');// 1. 环境配置加载
const loadEnvironmentConfig = () => {const env = process.env.NODE_ENV || 'development';const envFile = path.join(__dirname, `../../4-config/.env.${env}`);dotenv.config({ path: envFile });return {env,port: parseInt(process.env.PORT) || 3000,host: process.env.HOST || 'localhost'};
};// 2. 数据库配置
const getDatabaseConfig = () => {return {mongodb: {uri: process.env.MONGODB_URI || 'mongodb://localhost:27017/ecommerce',options: {useNewUrlParser: true,useUnifiedTopology: true,maxPoolSize: 10,serverSelectionTimeoutMS: 5000,socketTimeoutMS: 45000,}},redis: {host: process.env.REDIS_HOST || 'localhost',port: parseInt(process.env.REDIS_PORT) || 6379,password: process.env.REDIS_PASSWORD,db: parseInt(process.env.REDIS_DB) || 0},mysql: {host: process.env.MYSQL_HOST || 'localhost',port: parseInt(process.env.MYSQL_PORT) || 3306,user: process.env.MYSQL_USER || 'root',password: process.env.MYSQL_PASSWORD,database: process.env.MYSQL_DATABASE || 'ecommerce'}};
};// 3. 服务配置
const getServiceConfig = () => {return {userService: {url: process.env.USER_SERVICE_URL || 'http://localhost:3001',timeout: 5000},productService: {url: process.env.PRODUCT_SERVICE_URL || 'http://localhost:3002',timeout: 5000},orderService: {url: process.env.ORDER_SERVICE_URL || 'http://localhost:3003',timeout: 10000},paymentService: {url: process.env.PAYMENT_SERVICE_URL || 'http://localhost:3004',timeout: 15000},inventoryService: {url: process.env.INVENTORY_SERVICE_URL || 'http://localhost:3005',timeout: 5000},notificationService: {url: process.env.NOTIFICATION_SERVICE_URL || 'http://localhost:3006',timeout: 3000}};
};// 4. 安全配置
const getSecurityConfig = () => {return {jwt: {secret: process.env.JWT_SECRET || 'your-secret-key',expiresIn: process.env.JWT_EXPIRES_IN || '24h',refreshExpiresIn: process.env.JWT_REFRESH_EXPIRES_IN || '7d'},bcrypt: {saltRounds: parseInt(process.env.BCRYPT_SALT_ROUNDS) || 12},cors: {origin: process.env.CORS_ORIGIN?.split(',') || ['http://localhost:3000'],credentials: true}};
};// 5. 第三方服务配置
const getThirdPartyConfig = () => {return {email: {service: process.env.EMAIL_SERVICE || 'gmail',user: process.env.EMAIL_USER,password: process.env.EMAIL_PASSWORD},sms: {provider: process.env.SMS_PROVIDER || 'twilio',accountSid: process.env.TWILIO_ACCOUNT_SID,authToken: process.env.TWILIO_AUTH_TOKEN,phoneNumber: process.env.TWILIO_PHONE_NUMBER},payment: {stripe: {publicKey: process.env.STRIPE_PUBLIC_KEY,secretKey: process.env.STRIPE_SECRET_KEY,webhookSecret: process.env.STRIPE_WEBHOOK_SECRET},paypal: {clientId: process.env.PAYPAL_CLIENT_ID,clientSecret: process.env.PAYPAL_CLIENT_SECRET,mode: process.env.PAYPAL_MODE || 'sandbox'}},storage: {aws: {accessKeyId: process.env.AWS_ACCESS_KEY_ID,secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,region: process.env.AWS_REGION || 'us-east-1',bucket: process.env.AWS_S3_BUCKET}}};
};module.exports = {environment: loadEnvironmentConfig(),database: getDatabaseConfig(),services: getServiceConfig(),security: getSecurityConfig(),thirdParty: getThirdPartyConfig()
};

3. 用户服务实现

3.1 用户服务架构

用户服务架构
1. 控制器层
2. 服务层
3. 数据访问层
4. 中间件层
用户注册
用户登录
用户信息管理
权限管理
业务逻辑处理
数据验证
密码加密
JWT令牌管理
用户模型
角色模型
权限模型
会话模型
身份验证
权限检查
请求限制
日志记录

3.2 用户模型设计

// 1-services/user-service/models/User.js
const mongoose = require('mongoose');
const bcrypt = require('bcryptjs');
const jwt = require('jsonwebtoken');
const { security } = require('../../../2-shared/config');// 1. 用户基础信息模式
const userSchema = new mongoose.Schema({// 基本信息username: {type: String,required: true,unique: true,trim: true,minlength: 3,maxlength: 30,match: /^[a-zA-Z0-9_]+$/},email: {type: String,required: true,unique: true,trim: true,lowercase: true,match: /^[^\s@]+@[^\s@]+\.[^\s@]+$/},password: {type: String,required: true,minlength: 8,select: false // 默认不返回密码字段},// 个人信息profile: {firstName: { type: String, trim: true },lastName: { type: String, trim: true },avatar: { type: String },phone: { type: String, match: /^\+?[\d\s-()]+$/ },dateOfBirth: { type: Date },gender: { type: String, enum: ['male', 'female', 'other'] }},// 地址信息addresses: [{type: { type: String, enum: ['home', 'work', 'other'], default: 'home' },street: { type: String, required: true },city: { type: String, required: true },state: { type: String, required: true },zipCode: { type: String, required: true },country: { type: String, required: true },isDefault: { type: Boolean, default: false }}],// 账户状态status: {type: String,enum: ['active', 'inactive', 'suspended', 'deleted'],default: 'active'},emailVerified: { type: Boolean, default: false },phoneVerified: { type: Boolean, default: false },// 角色和权限roles: [{type: mongoose.Schema.Types.ObjectId,ref: 'Role'}],permissions: [{type: String}],// 安全信息lastLogin: { type: Date },loginAttempts: { type: Number, default: 0 },lockUntil: { type: Date },passwordResetToken: { type: String },passwordResetExpires: { type: Date },emailVerificationToken: { type: String },emailVerificationExpires: { type: Date },// 偏好设置preferences: {language: { type: String, default: 'en' },currency: { type: String, default: 'USD' },timezone: { type: String, default: 'UTC' },notifications: {email: { type: Boolean, default: true },sms: { type: Boolean, default: false },push: { type: Boolean, default: true }}},// 元数据metadata: {source: { type: String, default: 'web' }, // web, mobile, apireferrer: { type: String },utmSource: { type: String },utmMedium: { type: String },utmCampaign: { type: String }}
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 2. 虚拟字段
userSchema.virtual('fullName').get(function() {return `${this.profile.firstName} ${this.profile.lastName}`.trim();
});userSchema.virtual('isLocked').get(function() {return !!(this.lockUntil && this.lockUntil > Date.now());
});// 3. 索引
userSchema.index({ email: 1 });
userSchema.index({ username: 1 });
userSchema.index({ 'profile.phone': 1 });
userSchema.index({ status: 1 });
userSchema.index({ createdAt: -1 });// 4. 中间件
// 密码加密中间件
userSchema.pre('save', async function(next) {if (!this.isModified('password')) return next();try {const salt = await bcrypt.genSalt(security.bcrypt.saltRounds);this.password = await bcrypt.hash(this.password, salt);next();} catch (error) {next(error);}
});// 5. 实例方法
// 密码验证
userSchema.methods.comparePassword = async function(candidatePassword) {return bcrypt.compare(candidatePassword, this.password);
};// 生成JWT令牌
userSchema.methods.generateAuthToken = function() {const payload = {id: this._id,username: this.username,email: this.email,roles: this.roles};return jwt.sign(payload, security.jwt.secret, {expiresIn: security.jwt.expiresIn});
};// 生成刷新令牌
userSchema.methods.generateRefreshToken = function() {const payload = {id: this._id,type: 'refresh'};return jwt.sign(payload, security.jwt.secret, {expiresIn: security.jwt.refreshExpiresIn});
};// 增加登录尝试次数
userSchema.methods.incLoginAttempts = function() {// 如果之前有锁定且已过期,重置尝试次数if (this.lockUntil && this.lockUntil < Date.now()) {return this.updateOne({$unset: { lockUntil: 1 },$set: { loginAttempts: 1 }});}const updates = { $inc: { loginAttempts: 1 } };// 如果达到最大尝试次数且未锁定,则锁定账户if (this.loginAttempts + 1 >= 5 && !this.isLocked) {updates.$set = { lockUntil: Date.now() + 2 * 60 * 60 * 1000 }; // 锁定2小时}return this.updateOne(updates);
};// 重置登录尝试
userSchema.methods.resetLoginAttempts = function() {return this.updateOne({$unset: { loginAttempts: 1, lockUntil: 1 }});
};// 6. 静态方法
// 查找用户(支持邮箱或用户名)
userSchema.statics.findByCredentials = async function(identifier, password) {const user = await this.findOne({$or: [{ email: identifier },{ username: identifier }],status: 'active'}).select('+password').populate('roles');if (!user) {throw new Error('Invalid credentials');}if (user.isLocked) {throw new Error('Account is temporarily locked');}const isMatch = await user.comparePassword(password);if (!isMatch) {await user.incLoginAttempts();throw new Error('Invalid credentials');}// 重置登录尝试并更新最后登录时间if (user.loginAttempts > 0) {await user.resetLoginAttempts();}user.lastLogin = new Date();await user.save();return user;
};module.exports = mongoose.model('User', userSchema);

3.3 用户服务控制器

// 1-services/user-service/controllers/userController.js
const User = require('../models/User');
const { validationResult } = require('express-validator');
const crypto = require('crypto');
const { sendEmail } = require('../../../2-shared/utils/email');class UserController {// 1. 用户注册async register(req, res) {try {// 验证输入const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { username, email, password, profile } = req.body;// 检查用户是否已存在const existingUser = await User.findOne({$or: [{ email }, { username }]});if (existingUser) {return res.status(409).json({success: false,message: 'User already exists'});}// 创建新用户const user = new User({username,email,password,profile,emailVerificationToken: crypto.randomBytes(32).toString('hex'),emailVerificationExpires: Date.now() + 24 * 60 * 60 * 1000 // 24小时});await user.save();// 发送验证邮件await this.sendVerificationEmail(user);// 生成令牌const token = user.generateAuthToken();const refreshToken = user.generateRefreshToken();res.status(201).json({success: true,message: 'User registered successfully',data: {user: {id: user._id,username: user.username,email: user.email,profile: user.profile,emailVerified: user.emailVerified},tokens: {accessToken: token,refreshToken}}});} catch (error) {console.error('Registration error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 用户登录async login(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { identifier, password } = req.body;// 验证用户凭据const user = await User.findByCredentials(identifier, password);// 生成令牌const token = user.generateAuthToken();const refreshToken = user.generateRefreshToken();// 记录登录日志await this.logUserActivity(user._id, 'login', req);res.json({success: true,message: 'Login successful',data: {user: {id: user._id,username: user.username,email: user.email,profile: user.profile,roles: user.roles,emailVerified: user.emailVerified},tokens: {accessToken: token,refreshToken}}});} catch (error) {console.error('Login error:', error);res.status(401).json({success: false,message: error.message || 'Login failed'});}}// 3. 获取用户信息async getProfile(req, res) {try {const user = await User.findById(req.user.id).populate('roles').select('-password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}res.json({success: true,data: { user }});} catch (error) {console.error('Get profile error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新用户信息async updateProfile(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const updates = req.body;const allowedUpdates = ['profile', 'preferences', 'addresses'];const actualUpdates = {};// 过滤允许的更新字段Object.keys(updates).forEach(key => {if (allowedUpdates.includes(key)) {actualUpdates[key] = updates[key];}});const user = await User.findByIdAndUpdate(req.user.id,actualUpdates,{ new: true, runValidators: true }).select('-password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}res.json({success: true,message: 'Profile updated successfully',data: { user }});} catch (error) {console.error('Update profile error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 5. 修改密码async changePassword(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { currentPassword, newPassword } = req.body;const user = await User.findById(req.user.id).select('+password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}// 验证当前密码const isCurrentPasswordValid = await user.comparePassword(currentPassword);if (!isCurrentPasswordValid) {return res.status(400).json({success: false,message: 'Current password is incorrect'});}// 更新密码user.password = newPassword;await user.save();res.json({success: true,message: 'Password changed successfully'});} catch (error) {console.error('Change password error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 邮箱验证async verifyEmail(req, res) {try {const { token } = req.params;const user = await User.findOne({emailVerificationToken: token,emailVerificationExpires: { $gt: Date.now() }});if (!user) {return res.status(400).json({success: false,message: 'Invalid or expired verification token'});}user.emailVerified = true;user.emailVerificationToken = undefined;user.emailVerificationExpires = undefined;await user.save();res.json({success: true,message: 'Email verified successfully'});} catch (error) {console.error('Email verification error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 7. 发送验证邮件async sendVerificationEmail(user) {const verificationUrl = `${process.env.FRONTEND_URL}/verify-email/${user.emailVerificationToken}`;await sendEmail({to: user.email,subject: 'Verify Your Email Address',template: 'email-verification',data: {username: user.username,verificationUrl}});}// 8. 记录用户活动async logUserActivity(userId, action, req) {// 这里可以记录到数据库或日志系统console.log(`User ${userId} performed ${action} from ${req.ip}`);}
}module.exports = new UserController();

4. 商品服务实现

4.1 商品服务数据流

商品服务数据流
1. 商品管理
2. 分类管理
3. 库存管理
4. 价格管理
5. 搜索服务
商品创建
商品更新
商品删除
商品查询
分类树结构
分类属性
分类关联
库存查询
库存预留
库存释放
库存同步
基础价格
促销价格
会员价格
批量价格
全文搜索
筛选条件
排序规则
搜索建议

4.2 商品模型设计

// 1-services/product-service/models/Product.js
const mongoose = require('mongoose');// 1. 商品规格模式
const specificationSchema = new mongoose.Schema({name: { type: String, required: true },value: { type: String, required: true },unit: { type: String },order: { type: Number, default: 0 }
});// 2. 商品变体模式
const variantSchema = new mongoose.Schema({sku: { type: String, required: true, unique: true },attributes: [{name: { type: String, required: true },value: { type: String, required: true }}],price: {base: { type: Number, required: true },sale: { type: Number },cost: { type: Number }},inventory: {quantity: { type: Number, default: 0 },reserved: { type: Number, default: 0 },available: { type: Number, default: 0 },lowStockThreshold: { type: Number, default: 10 }},images: [{ type: String }],weight: { type: Number },dimensions: {length: { type: Number },width: { type: Number },height: { type: Number }},status: {type: String,enum: ['active', 'inactive', 'out_of_stock'],default: 'active'}
});// 3. 主商品模式
const productSchema = new mongoose.Schema({// 基本信息name: { type: String, required: true, trim: true },slug: { type: String, required: true, unique: true },description: { type: String, required: true },shortDescription: { type: String },// 分类信息category: {type: mongoose.Schema.Types.ObjectId,ref: 'Category',required: true},subcategories: [{type: mongoose.Schema.Types.ObjectId,ref: 'Category'}],tags: [{ type: String }],// 品牌信息brand: {type: mongoose.Schema.Types.ObjectId,ref: 'Brand'},// 媒体资源images: [{url: { type: String, required: true },alt: { type: String },order: { type: Number, default: 0 },isMain: { type: Boolean, default: false }}],videos: [{url: { type: String },title: { type: String },duration: { type: Number }}],// 商品规格specifications: [specificationSchema],// 商品变体variants: [variantSchema],// 价格信息(基础价格,如果没有变体)price: {base: { type: Number },sale: { type: Number },cost: { type: Number },currency: { type: String, default: 'USD' }},// 库存信息(如果没有变体)inventory: {quantity: { type: Number, default: 0 },reserved: { type: Number, default: 0 },available: { type: Number, default: 0 },lowStockThreshold: { type: Number, default: 10 },trackQuantity: { type: Boolean, default: true }},// 物理属性weight: { type: Number },dimensions: {length: { type: Number },width: { type: Number },height: { type: Number },unit: { type: String, default: 'cm' }},// 状态信息status: {type: String,enum: ['draft', 'active', 'inactive', 'archived'],default: 'draft'},visibility: {type: String,enum: ['public', 'private', 'hidden'],default: 'public'},// SEO信息seo: {title: { type: String },description: { type: String },keywords: [{ type: String }],canonicalUrl: { type: String }},// 销售信息sales: {totalSold: { type: Number, default: 0 },totalRevenue: { type: Number, default: 0 },averageRating: { type: Number, default: 0 },reviewCount: { type: Number, default: 0 }},// 配送信息shipping: {weight: { type: Number },requiresShipping: { type: Boolean, default: true },shippingClass: { type: String },freeShipping: { type: Boolean, default: false }},// 税务信息tax: {taxable: { type: Boolean, default: true },taxClass: { type: String, default: 'standard' }},// 元数据metadata: {vendor: { type: String },manufacturerPartNumber: { type: String },gtin: { type: String }, // Global Trade Item Numbercondition: {type: String,enum: ['new', 'used', 'refurbished'],default: 'new'}},// 时间戳publishedAt: { type: Date },featuredUntil: { type: Date }
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 4. 虚拟字段
productSchema.virtual('isOnSale').get(function() {if (this.variants && this.variants.length > 0) {return this.variants.some(variant => variant.price.sale && variant.price.sale < variant.price.base);}return this.price.sale && this.price.sale < this.price.base;
});productSchema.virtual('minPrice').get(function() {if (this.variants && this.variants.length > 0) {const prices = this.variants.map(v => v.price.sale || v.price.base);return Math.min(...prices);}return this.price.sale || this.price.base;
});productSchema.virtual('maxPrice').get(function() {if (this.variants && this.variants.length > 0) {const prices = this.variants.map(v => v.price.sale || v.price.base);return Math.max(...prices);}return this.price.sale || this.price.base;
});productSchema.virtual('totalInventory').get(function() {if (this.variants && this.variants.length > 0) {return this.variants.reduce((total, variant) => total + variant.inventory.available, 0);}return this.inventory.available;
});// 5. 索引
productSchema.index({ name: 'text', description: 'text', tags: 'text' });
productSchema.index({ slug: 1 });
productSchema.index({ category: 1 });
productSchema.index({ brand: 1 });
productSchema.index({ status: 1 });
productSchema.index({ 'price.base': 1 });
productSchema.index({ 'sales.totalSold': -1 });
productSchema.index({ 'sales.averageRating': -1 });
productSchema.index({ createdAt: -1 });
productSchema.index({ publishedAt: -1 });// 6. 中间件
// 生成slug
productSchema.pre('save', function(next) {if (this.isModified('name') && !this.slug) {this.slug = this.name.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/(^-|-$)/g, '');}next();
});// 更新可用库存
productSchema.pre('save', function(next) {if (this.variants && this.variants.length > 0) {this.variants.forEach(variant => {variant.inventory.available = Math.max(0, variant.inventory.quantity - variant.inventory.reserved);});} else {this.inventory.available = Math.max(0, this.inventory.quantity - this.inventory.reserved);}next();
});// 7. 实例方法
// 检查库存
productSchema.methods.checkStock = function(variantId, quantity = 1) {if (variantId) {const variant = this.variants.id(variantId);return variant && variant.inventory.available >= quantity;}return this.inventory.available >= quantity;
};// 预留库存
productSchema.methods.reserveStock = async function(variantId, quantity) {if (variantId) {const variant = this.variants.id(variantId);if (variant && variant.inventory.available >= quantity) {variant.inventory.reserved += quantity;await this.save();return true;}} else if (this.inventory.available >= quantity) {this.inventory.reserved += quantity;await this.save();return true;}return false;
};// 释放库存
productSchema.methods.releaseStock = async function(variantId, quantity) {if (variantId) {const variant = this.variants.id(variantId);if (variant) {variant.inventory.reserved = Math.max(0, variant.inventory.reserved - quantity);await this.save();}} else {this.inventory.reserved = Math.max(0, this.inventory.reserved - quantity);await this.save();}
};// 8. 静态方法
// 搜索商品
productSchema.statics.search = function(query, options = {}) {const {category,brand,minPrice,maxPrice,inStock,sort = '-createdAt',page = 1,limit = 20} = options;const filter = {status: 'active',visibility: 'public'};if (query) {filter.$text = { $search: query };}if (category) {filter.category = category;}if (brand) {filter.brand = brand;}if (minPrice || maxPrice) {filter['price.base'] = {};if (minPrice) filter['price.base'].$gte = minPrice;if (maxPrice) filter['price.base'].$lte = maxPrice;}if (inStock) {filter['inventory.available'] = { $gt: 0 };}const skip = (page - 1) * limit;return this.find(filter).populate('category brand').sort(sort).skip(skip).limit(limit);
};module.exports = mongoose.model('Product', productSchema);

4.3 商品服务控制器

// 1-services/product-service/controllers/productController.js
const Product = require('../models/Product');
const Category = require('../models/Category');
const { validationResult } = require('express-validator');
const { uploadToS3, deleteFromS3 } = require('../../../2-shared/utils/storage');class ProductController {// 1. 创建商品async createProduct(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const productData = req.body;// 验证分类是否存在const category = await Category.findById(productData.category);if (!category) {return res.status(400).json({success: false,message: 'Invalid category'});}// 处理图片上传if (req.files && req.files.length > 0) {const imageUrls = await Promise.all(req.files.map(async (file, index) => {const url = await uploadToS3(file, 'products');return {url,alt: `${productData.name} image ${index + 1}`,order: index,isMain: index === 0};}));productData.images = imageUrls;}const product = new Product(productData);await product.save();await product.populate('category brand');res.status(201).json({success: true,message: 'Product created successfully',data: { product }});} catch (error) {console.error('Create product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 获取商品列表async getProducts(req, res) {try {const {page = 1,limit = 20,category,brand,minPrice,maxPrice,inStock,sort = '-createdAt',search} = req.query;const options = {category,brand,minPrice: minPrice ? parseFloat(minPrice) : undefined,maxPrice: maxPrice ? parseFloat(maxPrice) : undefined,inStock: inStock === 'true',sort,page: parseInt(page),limit: parseInt(limit)};const products = await Product.search(search, options);const total = await Product.countDocuments({status: 'active',visibility: 'public'});res.json({success: true,data: {products,pagination: {page: parseInt(page),limit: parseInt(limit),total,pages: Math.ceil(total / limit)}}});} catch (error) {console.error('Get products error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 3. 获取单个商品async getProduct(req, res) {try {const { id } = req.params;const product = await Product.findOne({$or: [{ _id: id }, { slug: id }],status: 'active',visibility: 'public'}).populate('category brand');if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}res.json({success: true,data: { product }});} catch (error) {console.error('Get product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新商品async updateProduct(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { id } = req.params;const updates = req.body;// 处理新上传的图片if (req.files && req.files.length > 0) {const newImages = await Promise.all(req.files.map(async (file, index) => {const url = await uploadToS3(file, 'products');return {url,alt: `Product image ${index + 1}`,order: index};}));// 合并现有图片和新图片if (updates.images) {updates.images = [...updates.images, ...newImages];} else {updates.images = newImages;}}const product = await Product.findByIdAndUpdate(id,updates,{ new: true, runValidators: true }).populate('category brand');if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}res.json({success: true,message: 'Product updated successfully',data: { product }});} catch (error) {console.error('Update product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 5. 删除商品async deleteProduct(req, res) {try {const { id } = req.params;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}// 删除关联的图片if (product.images && product.images.length > 0) {await Promise.all(product.images.map(image => deleteFromS3(image.url)));}await Product.findByIdAndDelete(id);res.json({success: true,message: 'Product deleted successfully'});} catch (error) {console.error('Delete product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 检查库存async checkStock(req, res) {try {const { id } = req.params;const { variantId, quantity = 1 } = req.query;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}const inStock = product.checkStock(variantId, parseInt(quantity));res.json({success: true,data: {inStock,availableQuantity: variantId ? product.variants.id(variantId)?.inventory.available || 0: product.inventory.available}});} catch (error) {console.error('Check stock error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 7. 预留库存async reserveStock(req, res) {try {const { id } = req.params;const { variantId, quantity } = req.body;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}const reserved = await product.reserveStock(variantId, quantity);if (!reserved) {return res.status(400).json({success: false,message: 'Insufficient stock'});}res.json({success: true,message: 'Stock reserved successfully'});} catch (error) {console.error('Reserve stock error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 8. 获取商品推荐async getRecommendations(req, res) {try {const { id } = req.params;const { limit = 10 } = req.query;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}// 基于分类和标签的推荐const recommendations = await Product.find({_id: { $ne: id },$or: [{ category: product.category },{ tags: { $in: product.tags } }],status: 'active',visibility: 'public'}).populate('category brand').sort({ 'sales.totalSold': -1 }).limit(parseInt(limit));res.json({success: true,data: { recommendations }});} catch (error) {console.error('Get recommendations error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}
}module.exports = new ProductController();

5. 订单服务实现

5.1 订单处理流程

订单处理流程
1. 订单创建
2. 库存验证
3. 价格计算
4. 支付处理
5. 订单确认
6. 履行处理
7. 配送跟踪
8. 订单完成
购物车验证
用户信息验证
配送地址验证
订单号生成
商品库存检查
库存预留
库存不足处理
商品价格计算
优惠券应用
税费计算
配送费计算
支付网关调用
支付状态验证
支付失败处理
订单状态更新
库存扣减
通知发送
仓库分配
拣货打包
发货准备
物流信息录入
配送状态更新
客户通知
收货确认
评价邀请
订单归档

5.2 订单状态机

订单状态机
pending
confirmed
cancelled
processing
cancelled
shipped
cancelled
delivered
returned
completed
returned
refunded
refunded
archived

5.3 订单模型设计

// 1-services/order-service/models/Order.js
const mongoose = require('mongoose');// 1. 订单项模式
const orderItemSchema = new mongoose.Schema({product: {type: mongoose.Schema.Types.ObjectId,ref: 'Product',required: true},variant: {type: mongoose.Schema.Types.ObjectId,required: false},sku: { type: String, required: true },name: { type: String, required: true },description: { type: String },image: { type: String },// 价格信息unitPrice: { type: Number, required: true },salePrice: { type: Number },quantity: { type: Number, required: true, min: 1 },totalPrice: { type: Number, required: true },// 折扣信息discount: {amount: { type: Number, default: 0 },percentage: { type: Number, default: 0 },couponCode: { type: String }},// 税费信息tax: {rate: { type: Number, default: 0 },amount: { type: Number, default: 0 }},// 状态信息status: {type: String,enum: ['pending', 'confirmed', 'processing', 'shipped', 'delivered', 'cancelled', 'returned'],default: 'pending'},// 履行信息fulfillment: {warehouse: { type: String },trackingNumber: { type: String },carrier: { type: String },shippedAt: { type: Date },deliveredAt: { type: Date }}
});// 2. 地址模式
const addressSchema = new mongoose.Schema({firstName: { type: String, required: true },lastName: { type: String, required: true },company: { type: String },street1: { type: String, required: true },street2: { type: String },city: { type: String, required: true },state: { type: String, required: true },zipCode: { type: String, required: true },country: { type: String, required: true },phone: { type: String }
});// 3. 支付信息模式
const paymentSchema = new mongoose.Schema({method: {type: String,enum: ['credit_card', 'debit_card', 'paypal', 'stripe', 'bank_transfer', 'cash_on_delivery'],required: true},status: {type: String,enum: ['pending', 'processing', 'completed', 'failed', 'cancelled', 'refunded'],default: 'pending'},transactionId: { type: String },gatewayResponse: { type: mongoose.Schema.Types.Mixed },amount: { type: Number, required: true },currency: { type: String, default: 'USD' },paidAt: { type: Date },refundedAt: { type: Date },refundAmount: { type: Number, default: 0 }
});// 4. 主订单模式
const orderSchema = new mongoose.Schema({// 订单基本信息orderNumber: { type: String, required: true, unique: true },customer: {type: mongoose.Schema.Types.ObjectId,ref: 'User',required: true},// 订单项items: [orderItemSchema],// 地址信息shippingAddress: { type: addressSchema, required: true },billingAddress: { type: addressSchema, required: true },// 价格信息pricing: {subtotal: { type: Number, required: true },discount: { type: Number, default: 0 },tax: { type: Number, default: 0 },shipping: { type: Number, default: 0 },total: { type: Number, required: true },currency: { type: String, default: 'USD' }},// 优惠券信息coupons: [{code: { type: String, required: true },discount: { type: Number, required: true },type: { type: String, enum: ['percentage', 'fixed'], required: true }}],// 配送信息shipping: {method: { type: String, required: true },carrier: { type: String },service: { type: String },cost: { type: Number, default: 0 },estimatedDelivery: { type: Date },trackingNumber: { type: String },trackingUrl: { type: String }},// 支付信息payment: paymentSchema,// 订单状态status: {type: String,enum: ['pending',      // 待确认'confirmed',    // 已确认'processing',   // 处理中'shipped',      // 已发货'delivered',    // 已送达'completed',    // 已完成'cancelled',    // 已取消'returned',     // 已退货'refunded'      // 已退款],default: 'pending'},// 状态历史statusHistory: [{status: { type: String, required: true },timestamp: { type: Date, default: Date.now },note: { type: String },updatedBy: { type: mongoose.Schema.Types.ObjectId, ref: 'User' }}],// 备注信息notes: {customer: { type: String },internal: { type: String }},// 时间戳placedAt: { type: Date, default: Date.now },confirmedAt: { type: Date },shippedAt: { type: Date },deliveredAt: { type: Date },completedAt: { type: Date },cancelledAt: { type: Date },// 元数据metadata: {source: { type: String, default: 'web' }, // web, mobile, apiuserAgent: { type: String },ipAddress: { type: String },referrer: { type: String }}
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 5. 虚拟字段
orderSchema.virtual('totalItems').get(function() {return this.items.reduce((total, item) => total + item.quantity, 0);
});orderSchema.virtual('canCancel').get(function() {return ['pending', 'confirmed'].includes(this.status);
});orderSchema.virtual('canReturn').get(function() {return ['delivered'].includes(this.status);
});orderSchema.virtual('isCompleted').get(function() {return ['completed', 'cancelled', 'refunded'].includes(this.status);
});// 6. 索引
orderSchema.index({ orderNumber: 1 });
orderSchema.index({ customer: 1 });
orderSchema.index({ status: 1 });
orderSchema.index({ placedAt: -1 });
orderSchema.index({ 'payment.status': 1 });
orderSchema.index({ 'shipping.trackingNumber': 1 });// 7. 中间件
// 生成订单号
orderSchema.pre('save', async function(next) {if (this.isNew && !this.orderNumber) {this.orderNumber = await this.constructor.generateOrderNumber();}next();
});// 更新状态历史
orderSchema.pre('save', function(next) {if (this.isModified('status')) {this.statusHistory.push({status: this.status,timestamp: new Date()});}next();
});// 8. 实例方法
// 更新订单状态
orderSchema.methods.updateStatus = async function(newStatus, note, updatedBy) {const validTransitions = {pending: ['confirmed', 'cancelled'],confirmed: ['processing', 'cancelled'],processing: ['shipped', 'cancelled'],shipped: ['delivered', 'returned'],delivered: ['completed', 'returned'],returned: ['refunded'],cancelled: [],completed: [],refunded: []};if (!validTransitions[this.status].includes(newStatus)) {throw new Error(`Invalid status transition from ${this.status} to ${newStatus}`);}this.status = newStatus;this.statusHistory.push({status: newStatus,timestamp: new Date(),note,updatedBy});// 更新相关时间戳switch (newStatus) {case 'confirmed':this.confirmedAt = new Date();break;case 'shipped':this.shippedAt = new Date();break;case 'delivered':this.deliveredAt = new Date();break;case 'completed':this.completedAt = new Date();break;case 'cancelled':this.cancelledAt = new Date();break;}await this.save();
};// 计算总价
orderSchema.methods.calculateTotal = function() {const subtotal = this.items.reduce((total, item) => total + item.totalPrice, 0);const discount = this.coupons.reduce((total, coupon) => total + coupon.discount, 0);const tax = this.pricing.tax || 0;const shipping = this.shipping.cost || 0;this.pricing.subtotal = subtotal;this.pricing.discount = discount;this.pricing.total = subtotal - discount + tax + shipping;return this.pricing.total;
};// 9. 静态方法
// 生成订单号
orderSchema.statics.generateOrderNumber = async function() {const date = new Date();const year = date.getFullYear().toString().slice(-2);const month = (date.getMonth() + 1).toString().padStart(2, '0');const day = date.getDate().toString().padStart(2, '0');const prefix = `ORD${year}${month}${day}`;// 查找当天最大的订单号const lastOrder = await this.findOne({orderNumber: { $regex: `^${prefix}` }}).sort({ orderNumber: -1 });let sequence = 1;if (lastOrder) {const lastSequence = parseInt(lastOrder.orderNumber.slice(-4));sequence = lastSequence + 1;}return `${prefix}${sequence.toString().padStart(4, '0')}`;
};// 按状态查询订单
orderSchema.statics.findByStatus = function(status, options = {}) {const { page = 1, limit = 20, sort = '-placedAt' } = options;const skip = (page - 1) * limit;return this.find({ status }).populate('customer', 'username email profile').sort(sort).skip(skip).limit(limit);
};// 按客户查询订单
orderSchema.statics.findByCustomer = function(customerId, options = {}) {const { page = 1, limit = 20, sort = '-placedAt' } = options;const skip = (page - 1) * limit;return this.find({ customer: customerId }).sort(sort).skip(skip).limit(limit);
};module.exports = mongoose.model('Order', orderSchema);

5.4 订单服务控制器

// 1-services/order-service/controllers/orderController.js
const Order = require('../models/Order');
const { validationResult } = require('express-validator');
const { reserveStock, releaseStock } = require('../services/inventoryService');
const { processPayment } = require('../services/paymentService');
const { sendOrderConfirmation } = require('../services/notificationService');class OrderController {// 1. 创建订单async createOrder(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const orderData = req.body;orderData.customer = req.user.id;// 验证库存const stockValidation = await this.validateStock(orderData.items);if (!stockValidation.valid) {return res.status(400).json({success: false,message: 'Insufficient stock',details: stockValidation.errors});}// 预留库存const reservationResults = await this.reserveOrderStock(orderData.items);if (!reservationResults.success) {return res.status(400).json({success: false,message: 'Failed to reserve stock',details: reservationResults.errors});}try {// 创建订单const order = new Order(orderData);order.calculateTotal();await order.save();// 处理支付if (orderData.payment.method !== 'cash_on_delivery') {const paymentResult = await processPayment({amount: order.pricing.total,currency: order.pricing.currency,method: orderData.payment.method,orderId: order._id,customer: req.user});if (!paymentResult.success) {// 支付失败,释放库存await this.releaseOrderStock(orderData.items);await order.updateStatus('cancelled', 'Payment failed');return res.status(400).json({success: false,message: 'Payment failed',details: paymentResult.error});}// 更新支付信息order.payment.status = 'completed';order.payment.transactionId = paymentResult.transactionId;order.payment.paidAt = new Date();await order.updateStatus('confirmed', 'Payment completed');} else {await order.updateStatus('confirmed', 'Cash on delivery order');}// 发送订单确认通知await sendOrderConfirmation(order);await order.populate('customer', 'username email profile');res.status(201).json({success: true,message: 'Order created successfully',data: { order }});} catch (error) {// 创建订单失败,释放库存await this.releaseOrderStock(orderData.items);throw error;}} catch (error) {console.error('Create order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 获取订单列表async getOrders(req, res) {try {const {page = 1,limit = 20,status,customer,startDate,endDate,sort = '-placedAt'} = req.query;const filter = {};// 如果不是管理员,只能查看自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;} else if (customer) {filter.customer = customer;}if (status) {filter.status = status;}if (startDate || endDate) {filter.placedAt = {};if (startDate) filter.placedAt.$gte = new Date(startDate);if (endDate) filter.placedAt.$lte = new Date(endDate);}const skip = (page - 1) * limit;const orders = await Order.find(filter).populate('customer', 'username email profile').sort(sort).skip(skip).limit(parseInt(limit));const total = await Order.countDocuments(filter);res.json({success: true,data: {orders,pagination: {page: parseInt(page),limit: parseInt(limit),total,pages: Math.ceil(total / limit)}}});} catch (error) {console.error('Get orders error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 3. 获取单个订单async getOrder(req, res) {try {const { id } = req.params;const filter = { _id: id };// 如果不是管理员,只能查看自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;}const order = await Order.findOne(filter).populate('customer', 'username email profile').populate('items.product', 'name images');if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}res.json({success: true,data: { order }});} catch (error) {console.error('Get order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新订单状态async updateOrderStatus(req, res) {try {const { id } = req.params;const { status, note } = req.body;const order = await Order.findById(id);if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}await order.updateStatus(status, note, req.user.id);res.json({success: true,message: 'Order status updated successfully',data: { order }});} catch (error) {console.error('Update order status error:', error);res.status(500).json({success: false,message: error.message || 'Internal server error'});}}// 5. 取消订单async cancelOrder(req, res) {try {const { id } = req.params;const { reason } = req.body;const filter = { _id: id };// 如果不是管理员,只能取消自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;}const order = await Order.findOne(filter);if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}if (!order.canCancel) {return res.status(400).json({success: false,message: 'Order cannot be cancelled'});}// 释放库存await this.releaseOrderStock(order.items);// 处理退款(如果已支付)if (order.payment.status === 'completed') {// 这里应该调用支付服务进行退款order.payment.status = 'refunded';order.payment.refundedAt = new Date();order.payment.refundAmount = order.pricing.total;}await order.updateStatus('cancelled', reason, req.user.id);res.json({success: true,message: 'Order cancelled successfully',data: { order }});} catch (error) {console.error('Cancel order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 验证库存async validateStock(items) {const errors = [];for (const item of items) {const stockCheck = await reserveStock(item.product,item.variant,item.quantity,true // 仅检查,不实际预留);if (!stockCheck.success) {errors.push({product: item.product,variant: item.variant,requested: item.quantity,available: stockCheck.available,message: stockCheck.message});}}return {valid: errors.length === 0,errors};}// 7. 预留订单库存async reserveOrderStock(items) {const errors = [];const reservations = [];for (const item of items) {try {const result = await reserveStock(item.product,item.variant,item.quantity);if (result.success) {reservations.push({product: item.product,variant: item.variant,quantity: item.quantity});} else {errors.push({product: item.product,variant: item.variant,message: result.message});}} catch (error) {errors.push({product: item.product,variant: item.variant,message: error.message});}}// 如果有任何失败,回滚所有预留if (errors.length > 0) {for (const reservation of reservations) {await releaseStock(reservation.product,reservation.variant,reservation.quantity);}}return {success: errors.length === 0,errors,reservations};}// 8. 释放订单库存async releaseOrderStock(items) {for (const item of items) {try {await releaseStock(item.product,item.variant,item.quantity);} catch (error) {console.error('Failed to release stock:', error);}}}
}module.exports = new OrderController();

6. 支付服务实现

6.1 支付系统架构

支付系统架构
1. 支付网关
2. 支付方式
3. 风险控制
4. 对账系统
5. 退款处理
Stripe Gateway
PayPal Gateway
银行网关
第三方支付
信用卡支付
借记卡支付
数字钱包
银行转账
货到付款
欺诈检测
风险评分
黑名单检查
限额控制
交易记录
对账文件
差异处理
财务报表
全额退款
部分退款
退款审核
退款通知

6.2 支付流程图

低风险
高风险
成功
失败
待处理
支付流程
1. 支付请求
2. 参数验证
3. 风险检查
4. 网关选择
5. 支付处理
6. 结果处理
7. 状态更新
8. 通知发送
风险评估
人工审核
审核通过
审核拒绝
支付拒绝
支付结果
支付成功
支付失败
支付待处理
订单确认
订单取消
状态监控

6.3 支付服务实现

// 1-services/payment-service/services/paymentService.js
const stripe = require('stripe')(process.env.STRIPE_SECRET_KEY);
const paypal = require('@paypal/checkout-server-sdk');
const Payment = require('../models/Payment');
const { sendPaymentNotification } = require('./notificationService');class PaymentService {constructor() {this.gateways = {stripe: new StripeGateway(),paypal: new PayPalGateway(),bank: new BankGateway()};}// 1. 处理支付async processPayment(paymentData) {try {// 创建支付记录const payment = new Payment({orderId: paymentData.orderId,customerId: paymentData.customerId,amount: paymentData.amount,currency: paymentData.currency,method: paymentData.method,status: 'pending',metadata: paymentData.metadata});await payment.save();// 风险检查const riskAssessment = await this.assessRisk(payment);if (riskAssessment.risk === 'high') {payment.status = 'requires_review';payment.riskScore = riskAssessment.score;await payment.save();return {success: false,payment,message: 'Payment requires manual review'};}// 选择支付网关const gateway = this.selectGateway(paymentData.method);if (!gateway) {throw new Error(`Unsupported payment method: ${paymentData.method}`);}// 处理支付const result = await gateway.processPayment(payment, paymentData);// 更新支付状态payment.status = result.status;payment.transactionId = result.transactionId;payment.gatewayResponse = result.response;payment.processedAt = new Date();if (result.status === 'completed') {payment.completedAt = new Date();}await payment.save();// 发送通知await sendPaymentNotification(payment);return {success: result.status === 'completed',payment,transactionId: result.transactionId};} catch (error) {console.error('Payment processing error:', error);throw error;}}// 2. 处理退款async processRefund(paymentId, refundData) {try {const payment = await Payment.findById(paymentId);if (!payment) {throw new Error('Payment not found');}if (payment.status !== 'completed') {throw new Error('Payment is not completed');}const refundAmount = refundData.amount || payment.amount;if (refundAmount > payment.amount - payment.refundedAmount) {throw new Error('Refund amount exceeds available amount');}// 选择支付网关const gateway = this.selectGateway(payment.method);const result = await gateway.processRefund(payment, refundAmount);// 创建退款记录const refund = {amount: refundAmount,reason: refundData.reason,transactionId: result.transactionId,status: result.status,processedAt: new Date()};payment.refunds.push(refund);payment.refundedAmount += refundAmount;if (payment.refundedAmount >= payment.amount) {payment.status = 'refunded';} else {payment.status = 'partially_refunded';}await payment.save();return {success: true,refund,payment};} catch (error) {console.error('Refund processing error:', error);throw error;}}// 3. 风险评估async assessRisk(payment) {let score = 0;const factors = [];// 金额风险if (payment.amount > 1000) {score += 20;factors.push('high_amount');}// 地理位置风险if (payment.metadata.country && this.isHighRiskCountry(payment.metadata.country)) {score += 30;factors.push('high_risk_country');}// 用户历史const userPayments = await Payment.find({customerId: payment.customerId,status: { $in: ['completed', 'failed'] }}).limit(10);const failureRate = userPayments.filter(p => p.status === 'failed').length / userPayments.length;if (failureRate > 0.3) {score += 25;factors.push('high_failure_rate');}// 设备指纹if (payment.metadata.deviceFingerprint) {const devicePayments = await Payment.find({'metadata.deviceFingerprint': payment.metadata.deviceFingerprint,status: 'failed'}).limit(5);if (devicePayments.length > 2) {score += 15;factors.push('suspicious_device');}}let risk = 'low';if (score >= 50) {risk = 'high';} else if (score >= 25) {risk = 'medium';}return {score,risk,factors};}// 4. 选择支付网关selectGateway(method) {const gatewayMap = {'credit_card': 'stripe','debit_card': 'stripe','paypal': 'paypal','bank_transfer': 'bank'};const gatewayName = gatewayMap[method];return this.gateways[gatewayName];}// 5. 检查高风险国家isHighRiskCountry(country) {const highRiskCountries = ['XX', 'YY', 'ZZ']; // 示例return highRiskCountries.includes(country);}// 6. 处理 Webhookasync handleWebhook(provider, payload, signature) {try {const gateway = this.gateways[provider];if (!gateway) {throw new Error(`Unknown payment provider: ${provider}`);}const event = await gateway.verifyWebhook(payload, signature);await this.processWebhookEvent(event);return { success: true };} catch (error) {console.error('Webhook processing error:', error);throw error;}}// 7. 处理 Webhook 事件async processWebhookEvent(event) {switch (event.type) {case 'payment.completed':await this.handlePaymentCompleted(event.data);break;case 'payment.failed':await this.handlePaymentFailed(event.data);break;case 'refund.completed':await this.handleRefundCompleted(event.data);break;default:console.log(`Unhandled webhook event: ${event.type}`);}}// 8. 处理支付完成事件async handlePaymentCompleted(data) {const payment = await Payment.findOne({transactionId: data.transactionId});if (payment && payment.status !== 'completed') {payment.status = 'completed';payment.completedAt = new Date();await payment.save();// 通知订单服务await this.notifyOrderService(payment.orderId, 'payment_completed');}}// 9. 处理支付失败事件async handlePaymentFailed(data) {const payment = await Payment.findOne({transactionId: data.transactionId});if (payment && payment.status !== 'failed') {payment.status = 'failed';payment.failureReason = data.reason;await payment.save();// 通知订单服务await this.notifyOrderService(payment.orderId, 'payment_failed');}}// 10. 通知订单服务async notifyOrderService(orderId, event) {// 这里应该发送消息到订单服务console.log(`Notifying order service: ${orderId} - ${event}`);}
}// Stripe 网关实现
class StripeGateway {async processPayment(payment, paymentData) {try {const paymentIntent = await stripe.paymentIntents.create({amount: Math.round(payment.amount * 100), // Stripe 使用分为单位currency: payment.currency,payment_method: paymentData.paymentMethodId,confirmation_method: 'manual',confirm: true,metadata: {orderId: payment.orderId.toString(),paymentId: payment._id.toString()}});return {status: this.mapStripeStatus(paymentIntent.status),transactionId: paymentIntent.id,response: paymentIntent};} catch (error) {return {status: 'failed',transactionId: null,response: { error: error.message }};}}async processRefund(payment, amount) {try {const refund = await stripe.refunds.create({payment_intent: payment.transactionId,amount: Math.round(amount * 100)});return {status: 'completed',transactionId: refund.id};} catch (error) {throw new Error(`Stripe refund failed: ${error.message}`);}}async verifyWebhook(payload, signature) {try {return stripe.webhooks.constructEvent(payload,signature,process.env.STRIPE_WEBHOOK_SECRET);} catch (error) {throw new Error(`Webhook verification failed: ${error.message}`);}}mapStripeStatus(stripeStatus) {const statusMap = {'succeeded': 'completed','requires_payment_method': 'failed','requires_confirmation': 'pending','requires_action': 'pending','processing': 'processing','canceled': 'cancelled'};return statusMap[stripeStatus] || 'pending';}
}// PayPal 网关实现
class PayPalGateway {constructor() {const environment = process.env.PAYPAL_MODE === 'live' ? new paypal.core.LiveEnvironment(process.env.PAYPAL_CLIENT_ID,process.env.PAYPAL_CLIENT_SECRET): new paypal.core.SandboxEnvironment(process.env.PAYPAL_CLIENT_ID,process.env.PAYPAL_CLIENT_SECRET);this.client = new paypal.core.PayPalHttpClient(environment);}async processPayment(payment, paymentData) {try {const request = new paypal.orders.OrdersCaptureRequest(paymentData.orderId);const response = await this.client.execute(request);return {status: this.mapPayPalStatus(response.result.status),transactionId: response.result.id,response: response.result};} catch (error) {return {status: 'failed',transactionId: null,response: { error: error.message }};}}async processRefund(payment, amount) {// PayPal 退款实现// 这里需要根据 PayPal API 实现退款逻辑throw new Error('PayPal refund not implemented');}async verifyWebhook(payload, signature) {// PayPal Webhook 验证实现throw new Error('PayPal webhook verification not implemented');}mapPayPalStatus(paypalStatus) {const statusMap = {'COMPLETED': 'completed','PENDING': 'pending','DECLINED': 'failed','VOIDED': 'cancelled'};return statusMap[paypalStatus] || 'pending';}
}// 银行网关实现
class BankGateway {async processPayment(payment, paymentData) {// 银行转账通常是异步的return {status: 'pending',transactionId: `bank_${Date.now()}`,response: { message: 'Bank transfer initiated' }};}async processRefund(payment, amount) {// 银行退款实现throw new Error('Bank refund requires manual processing');}async verifyWebhook(payload, signature) {// 银行 Webhook 验证实现throw new Error('Bank webhook verification not implemented');}
}module.exports = PaymentService;

7. 实时通知系统

7.1 通知系统架构

通知系统架构
1. 消息队列
2. 通知渠道
3. 模板引擎
4. 用户偏好
5. 发送状态
Redis Queue
RabbitMQ
Apache Kafka
AWS SQS
邮件通知
短信通知
推送通知
站内消息
WebSocket实时
邮件模板
短信模板
推送模板
多语言支持
通知偏好设置
免打扰时间
频率限制
渠道优先级
发送记录
送达状态
失败重试
统计分析

7.2 通知流程图

允许
禁止
成功
失败
未超限
已超限
通知触发
1. 事件接收
2. 用户偏好检查
3. 模板选择
4. 内容渲染
5. 渠道选择
6. 消息队列
7. 发送处理
8. 状态更新
用户偏好
跳过发送
邮件队列
短信队列
推送队列
WebSocket队列
发送结果
标记成功
重试机制
重试次数
标记失败

7.3 通知服务实现

// 1-services/notification-service/services/notificationService.js
const Queue = require('bull');
const nodemailer = require('nodemailer');
const twilio = require('twilio');
const webpush = require('web-push');
const Notification = require('../models/Notification');
const NotificationTemplate = require('../models/NotificationTemplate');
const UserPreference = require('../models/UserPreference');class NotificationService {constructor() {// 初始化队列this.emailQueue = new Queue('email notifications', process.env.REDIS_URL);this.smsQueue = new Queue('sms notifications', process.env.REDIS_URL);this.pushQueue = new Queue('push notifications', process.env.REDIS_URL);this.websocketQueue = new Queue('websocket notifications', process.env.REDIS_URL);// 初始化服务提供商this.emailTransporter = this.initEmailTransporter();this.smsClient = this.initSMSClient();this.initPushService();// 设置队列处理器this.setupQueueProcessors();}// 1. 发送通知(主入口)async sendNotification(notificationData) {try {const {userId,type,title,content,data = {},channels = ['email', 'push'],priority = 'normal',scheduledAt = null} = notificationData;// 检查用户偏好const userPreferences = await this.getUserPreferences(userId);const allowedChannels = this.filterChannelsByPreferences(channels, userPreferences, type);if (allowedChannels.length === 0) {console.log(`No allowed channels for user ${userId} and notification type ${type}`);return { success: true, message: 'Notification skipped due to user preferences' };}// 创建通知记录const notification = new Notification({userId,type,title,content,data,channels: allowedChannels,priority,status: 'pending',scheduledAt: scheduledAt || new Date()});await notification.save();// 根据渠道分发通知const results = await Promise.allSettled(allowedChannels.map(channel => this.dispatchToChannel(channel, notification)));// 更新通知状态const hasFailures = results.some(result => result.status === 'rejected');notification.status = hasFailures ? 'partial_failure' : 'sent';notification.sentAt = new Date();await notification.save();return {success: true,notificationId: notification._id,results};} catch (error) {console.error('Send notification error:', error);throw error;}}// 2. 分发到指定渠道async dispatchToChannel(channel, notification) {const jobData = {notificationId: notification._id,userId: notification.userId,type: notification.type,title: notification.title,content: notification.content,data: notification.data};const options = {priority: this.getPriorityValue(notification.priority),delay: notification.scheduledAt > new Date() ? notification.scheduledAt.getTime() - Date.now() : 0,attempts: 3,backoff: {type: 'exponential',delay: 2000}};switch (channel) {case 'email':return this.emailQueue.add('send-email', jobData, options);case 'sms':return this.smsQueue.add('send-sms', jobData, options);case 'push':return this.pushQueue.add('send-push', jobData, options);case 'websocket':return this.websocketQueue.add('send-websocket', jobData, options);default:throw new Error(`Unknown notification channel: ${channel}`);}}// 3. 设置队列处理器setupQueueProcessors() {// 邮件处理器this.emailQueue.process('send-email', async (job) => {return this.processEmailNotification(job.data);});// 短信处理器this.smsQueue.process('send-sms', async (job) => {return this.processSMSNotification(job.data);});// 推送处理器this.pushQueue.process('send-push', async (job) => {return this.processPushNotification(job.data);});// WebSocket处理器this.websocketQueue.process('send-websocket', async (job) => {return this.processWebSocketNotification(job.data);});// 错误处理[this.emailQueue, this.smsQueue, this.pushQueue, this.websocketQueue].forEach(queue => {queue.on('failed', (job, err) => {console.error(`Job ${job.id} failed:`, err);this.handleJobFailure(job, err);});queue.on('completed', (job) => {console.log(`Job ${job.id} completed successfully`);this.handleJobSuccess(job);});});}// 4. 处理邮件通知async processEmailNotification(data) {try {const { userId, type, title, content, data: notificationData } = data;// 获取用户邮箱const user = await this.getUserById(userId);if (!user || !user.email) {throw new Error('User email not found');}// 获取邮件模板const template = await this.getTemplate(type, 'email');if (!template) {throw new Error(`Email template not found for type: ${type}`);}// 渲染邮件内容const renderedContent = await this.renderTemplate(template, {user,title,content,...notificationData});// 发送邮件const mailOptions = {from: process.env.EMAIL_FROM,to: user.email,subject: renderedContent.subject,html: renderedContent.html,text: renderedContent.text};const result = await this.emailTransporter.sendMail(mailOptions);return {success: true,messageId: result.messageId,channel: 'email'};} catch (error) {console.error('Email notification error:', error);throw error;}}// 5. 处理短信通知async processSMSNotification(data) {try {const { userId, type, content } = data;// 获取用户手机号const user = await this.getUserById(userId);if (!user || !user.phone) {throw new Error('User phone number not found');}// 获取短信模板const template = await this.getTemplate(type, 'sms');if (!template) {throw new Error(`SMS template not found for type: ${type}`);}// 渲染短信内容const renderedContent = await this.renderTemplate(template, {user,content,...data.data});// 发送短信const result = await this.smsClient.messages.create({body: renderedContent.text,from: process.env.TWILIO_PHONE_NUMBER,to: user.phone});return {success: true,messageId: result.sid,channel: 'sms'};} catch (error) {console.error('SMS notification error:', error);throw error;}}// 6. 处理推送通知async processPushNotification(data) {try {const { userId, title, content, data: notificationData } = data;// 获取用户推送订阅const subscriptions = await this.getUserPushSubscriptions(userId);if (!subscriptions || subscriptions.length === 0) {throw new Error('No push subscriptions found for user');}const payload = JSON.stringify({title,body: content,icon: '/icon-192x192.png',badge: '/badge-72x72.png',data: notificationData});// 发送到所有订阅const results = await Promise.allSettled(subscriptions.map(subscription =>webpush.sendNotification(subscription, payload)));const successCount = results.filter(r => r.status === 'fulfilled').length;return {success: successCount > 0,successCount,totalCount: subscriptions.length,channel: 'push'};} catch (error) {console.error('Push notification error:', error);throw error;}}// 7. 处理WebSocket通知async processWebSocketNotification(data) {try {const { userId, title, content, data: notificationData } = data;// 获取WebSocket连接const connections = await this.getUserWebSocketConnections(userId);if (!connections || connections.length === 0) {throw new Error('No WebSocket connections found for user');}const message = {type: 'notification',title,content,data: notificationData,timestamp: new Date().toISOString()};// 发送到所有连接let successCount = 0;connections.forEach(connection => {try {if (connection.readyState === 1) { // WebSocket.OPENconnection.send(JSON.stringify(message));successCount++;}} catch (error) {console.error('WebSocket send error:', error);}});return {success: successCount > 0,successCount,totalCount: connections.length,channel: 'websocket'};} catch (error) {console.error('WebSocket notification error:', error);throw error;}}// 8. 获取用户偏好async getUserPreferences(userId) {try {let preferences = await UserPreference.findOne({ userId });if (!preferences) {// 创建默认偏好preferences = new UserPreference({userId,channels: {email: { enabled: true, types: ['all'] },sms: { enabled: false, types: [] },push: { enabled: true, types: ['all'] },websocket: { enabled: true, types: ['all'] }},doNotDisturb: {enabled: false,startTime: '22:00',endTime: '08:00'},frequency: {maxPerHour: 10,maxPerDay: 50}});await preferences.save();}return preferences;} catch (error) {console.error('Get user preferences error:', error);return null;}}// 9. 根据偏好过滤渠道filterChannelsByPreferences(channels, preferences, notificationType) {if (!preferences) return channels;const allowedChannels = [];channels.forEach(channel => {const channelPref = preferences.channels[channel];if (channelPref && channelPref.enabled) {// 检查通知类型是否允许if (channelPref.types.includes('all') || channelPref.types.includes(notificationType)) {// 检查免打扰时间if (!this.isInDoNotDisturbTime(preferences.doNotDisturb)) {allowedChannels.push(channel);}}}});return allowedChannels;}// 10. 检查是否在免打扰时间isInDoNotDisturbTime(doNotDisturb) {if (!doNotDisturb.enabled) return false;const now = new Date();const currentTime = now.getHours() * 60 + now.getMinutes();const [startHour, startMin] = doNotDisturb.startTime.split(':').map(Number);const [endHour, endMin] = doNotDisturb.endTime.split(':').map(Number);const startTime = startHour * 60 + startMin;const endTime = endHour * 60 + endMin;if (startTime <= endTime) {return currentTime >= startTime && currentTime <= endTime;} else {// 跨天的情况return currentTime >= startTime || currentTime <= endTime;}}// 11. 初始化邮件传输器initEmailTransporter() {return nodemailer.createTransporter({service: process.env.EMAIL_SERVICE,auth: {user: process.env.EMAIL_USER,pass: process.env.EMAIL_PASSWORD}});}// 12. 初始化短信客户端initSMSClient() {return twilio(process.env.TWILIO_ACCOUNT_SID,process.env.TWILIO_AUTH_TOKEN);}// 13. 初始化推送服务initPushService() {webpush.setVapidDetails('mailto:' + process.env.EMAIL_USER,process.env.VAPID_PUBLIC_KEY,process.env.VAPID_PRIVATE_KEY);}// 14. 获取优先级数值getPriorityValue(priority) {const priorities = {'low': 1,'normal': 5,'high': 10,'urgent': 15};return priorities[priority] || 5;}// 15. 处理任务失败async handleJobFailure(job, error) {try {await Notification.findByIdAndUpdate(job.data.notificationId, {$push: {failures: {channel: job.queue.name.split(' ')[0],error: error.message,timestamp: new Date()}}});} catch (err) {console.error('Handle job failure error:', err);}}// 16. 处理任务成功async handleJobSuccess(job) {try {await Notification.findByIdAndUpdate(job.data.notificationId, {$push: {deliveries: {channel: job.queue.name.split(' ')[0],timestamp: new Date()}}});} catch (err) {console.error('Handle job success error:', err);}}// 辅助方法(需要根据实际情况实现)async getUserById(userId) {// 调用用户服务获取用户信息// 这里应该是实际的用户服务调用return { id: userId, email: 'user@example.com', phone: '+1234567890' };}async getTemplate(type, channel) {return NotificationTemplate.findOne({ type, channel });}async renderTemplate(template, data) {// 使用模板引擎渲染内容// 这里可以使用 Handlebars, Mustache 等模板引擎return {subject: template.subject,html: template.htmlContent,text: template.textContent};}async getUserPushSubscriptions(userId) {// 获取用户的推送订阅return [];}async getUserWebSocketConnections(userId) {// 获取用户的WebSocket连接return [];}
}module.exports = NotificationService;

结语
感谢您的阅读!期待您的一键三连!欢迎指正!

在这里插入图片描述

http://www.lryc.cn/news/2386130.html

相关文章:

  • 从AD9361 到 ADSY1100 ,中间的迭代产品历史
  • 免费插件集-illustrator插件-Ai插件-查找选中颜色与pantone中匹配颜色
  • redis集合类型
  • [爬虫实战] 爬微博图片:xpath的具体运用
  • MySQL中简单的操作
  • NNG和DDS
  • 防震基座在半导体晶圆制造设备抛光机详细应用案例-江苏泊苏系统集成有限公司
  • 框架开发与原生开发的权衡:React案例分析(原生JavaScript)
  • Lua5.4.2常用API整理记录
  • Python打卡训练营学习记录Day36
  • ### Mac电脑推送文件至Gitee仓库步骤详解
  • 官方SDK停更后的选择:开源维护的Bugly Unity SDK
  • 什么是智能体agent?
  • 【多线程】Java 实现方式及其优缺点
  • Obsidian 数据可视化深度实践:用 DataviewJS 与 Charts 插件构建智能日报系统
  • Three.js 海量模型加载性能优化指南
  • 6.4.3_有向无环图描述表达式
  • 力扣第157场双周赛
  • 青少年编程与数学 02-019 Rust 编程基础 19课题、项目发布
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(二十五) -> 端云一体化开发 -> 业务介绍(二)
  • LLaMA-Factory 微调模型与训练数据量对应关系
  • 数据库与Redis数据一致性解决方案
  • Spring Boot AI 之 Chat Client API 使用大全
  • 分身空间:手机分身多开工具,轻松实现多账号登录
  • 音视频之视频压缩及数字视频基础概念
  • Ubuntu 24.04部署安装Honeyd蜜罐
  • C++复习核心精华
  • Android中获取控件尺寸进阶方案
  • 云原生安全之PaaS:从基础到实践的技术指南
  • MCP技术体系介绍