【node.js】实战项目
个人主页:Guiat
归属专栏:node.js
文章目录
- 1. 项目概览与架构设计
- 1.1 实战项目:企业级电商管理系统
- 1.2 技术栈选择
- 2. 项目初始化与基础架构
- 2.1 项目结构设计
- 2.2 基础配置管理
- 3. 用户服务实现
- 3.1 用户服务架构
- 3.2 用户模型设计
- 3.3 用户服务控制器
- 4. 商品服务实现
- 4.1 商品服务数据流
- 4.2 商品模型设计
- 4.3 商品服务控制器
- 5. 订单服务实现
- 5.1 订单处理流程
- 5.2 订单状态机
- 5.3 订单模型设计
- 5.4 订单服务控制器
- 6. 支付服务实现
- 6.1 支付系统架构
- 6.2 支付流程图
- 6.3 支付服务实现
- 7. 实时通知系统
- 7.1 通知系统架构
- 7.2 通知流程图
- 7.3 通知服务实现
正文
1. 项目概览与架构设计
1.1 实战项目:企业级电商管理系统
我们将构建一个完整的企业级电商管理系统,包含用户管理、商品管理、订单处理、支付系统、库存管理等核心功能。
1.2 技术栈选择
2. 项目初始化与基础架构
2.1 项目结构设计
ecommerce-system/
├── 1-services/ # 微服务目录
│ ├── user-service/ # 用户服务
│ ├── product-service/ # 商品服务
│ ├── order-service/ # 订单服务
│ ├── payment-service/ # 支付服务
│ ├── inventory-service/ # 库存服务
│ └── notification-service/ # 通知服务
├── 2-shared/ # 共享模块
│ ├── database/ # 数据库配置
│ ├── middleware/ # 中间件
│ ├── utils/ # 工具函数
│ └── types/ # 类型定义
├── 3-gateway/ # API网关
├── 4-config/ # 配置文件
├── 5-scripts/ # 脚本文件
├── 6-tests/ # 测试文件
├── 7-docs/ # 文档
└── 8-deployment/ # 部署配置
2.2 基础配置管理
// 2-shared/config/index.js
const path = require('path');
const dotenv = require('dotenv');// 1. 环境配置加载
const loadEnvironmentConfig = () => {const env = process.env.NODE_ENV || 'development';const envFile = path.join(__dirname, `../../4-config/.env.${env}`);dotenv.config({ path: envFile });return {env,port: parseInt(process.env.PORT) || 3000,host: process.env.HOST || 'localhost'};
};// 2. 数据库配置
const getDatabaseConfig = () => {return {mongodb: {uri: process.env.MONGODB_URI || 'mongodb://localhost:27017/ecommerce',options: {useNewUrlParser: true,useUnifiedTopology: true,maxPoolSize: 10,serverSelectionTimeoutMS: 5000,socketTimeoutMS: 45000,}},redis: {host: process.env.REDIS_HOST || 'localhost',port: parseInt(process.env.REDIS_PORT) || 6379,password: process.env.REDIS_PASSWORD,db: parseInt(process.env.REDIS_DB) || 0},mysql: {host: process.env.MYSQL_HOST || 'localhost',port: parseInt(process.env.MYSQL_PORT) || 3306,user: process.env.MYSQL_USER || 'root',password: process.env.MYSQL_PASSWORD,database: process.env.MYSQL_DATABASE || 'ecommerce'}};
};// 3. 服务配置
const getServiceConfig = () => {return {userService: {url: process.env.USER_SERVICE_URL || 'http://localhost:3001',timeout: 5000},productService: {url: process.env.PRODUCT_SERVICE_URL || 'http://localhost:3002',timeout: 5000},orderService: {url: process.env.ORDER_SERVICE_URL || 'http://localhost:3003',timeout: 10000},paymentService: {url: process.env.PAYMENT_SERVICE_URL || 'http://localhost:3004',timeout: 15000},inventoryService: {url: process.env.INVENTORY_SERVICE_URL || 'http://localhost:3005',timeout: 5000},notificationService: {url: process.env.NOTIFICATION_SERVICE_URL || 'http://localhost:3006',timeout: 3000}};
};// 4. 安全配置
const getSecurityConfig = () => {return {jwt: {secret: process.env.JWT_SECRET || 'your-secret-key',expiresIn: process.env.JWT_EXPIRES_IN || '24h',refreshExpiresIn: process.env.JWT_REFRESH_EXPIRES_IN || '7d'},bcrypt: {saltRounds: parseInt(process.env.BCRYPT_SALT_ROUNDS) || 12},cors: {origin: process.env.CORS_ORIGIN?.split(',') || ['http://localhost:3000'],credentials: true}};
};// 5. 第三方服务配置
const getThirdPartyConfig = () => {return {email: {service: process.env.EMAIL_SERVICE || 'gmail',user: process.env.EMAIL_USER,password: process.env.EMAIL_PASSWORD},sms: {provider: process.env.SMS_PROVIDER || 'twilio',accountSid: process.env.TWILIO_ACCOUNT_SID,authToken: process.env.TWILIO_AUTH_TOKEN,phoneNumber: process.env.TWILIO_PHONE_NUMBER},payment: {stripe: {publicKey: process.env.STRIPE_PUBLIC_KEY,secretKey: process.env.STRIPE_SECRET_KEY,webhookSecret: process.env.STRIPE_WEBHOOK_SECRET},paypal: {clientId: process.env.PAYPAL_CLIENT_ID,clientSecret: process.env.PAYPAL_CLIENT_SECRET,mode: process.env.PAYPAL_MODE || 'sandbox'}},storage: {aws: {accessKeyId: process.env.AWS_ACCESS_KEY_ID,secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,region: process.env.AWS_REGION || 'us-east-1',bucket: process.env.AWS_S3_BUCKET}}};
};module.exports = {environment: loadEnvironmentConfig(),database: getDatabaseConfig(),services: getServiceConfig(),security: getSecurityConfig(),thirdParty: getThirdPartyConfig()
};
3. 用户服务实现
3.1 用户服务架构
3.2 用户模型设计
// 1-services/user-service/models/User.js
const mongoose = require('mongoose');
const bcrypt = require('bcryptjs');
const jwt = require('jsonwebtoken');
const { security } = require('../../../2-shared/config');// 1. 用户基础信息模式
const userSchema = new mongoose.Schema({// 基本信息username: {type: String,required: true,unique: true,trim: true,minlength: 3,maxlength: 30,match: /^[a-zA-Z0-9_]+$/},email: {type: String,required: true,unique: true,trim: true,lowercase: true,match: /^[^\s@]+@[^\s@]+\.[^\s@]+$/},password: {type: String,required: true,minlength: 8,select: false // 默认不返回密码字段},// 个人信息profile: {firstName: { type: String, trim: true },lastName: { type: String, trim: true },avatar: { type: String },phone: { type: String, match: /^\+?[\d\s-()]+$/ },dateOfBirth: { type: Date },gender: { type: String, enum: ['male', 'female', 'other'] }},// 地址信息addresses: [{type: { type: String, enum: ['home', 'work', 'other'], default: 'home' },street: { type: String, required: true },city: { type: String, required: true },state: { type: String, required: true },zipCode: { type: String, required: true },country: { type: String, required: true },isDefault: { type: Boolean, default: false }}],// 账户状态status: {type: String,enum: ['active', 'inactive', 'suspended', 'deleted'],default: 'active'},emailVerified: { type: Boolean, default: false },phoneVerified: { type: Boolean, default: false },// 角色和权限roles: [{type: mongoose.Schema.Types.ObjectId,ref: 'Role'}],permissions: [{type: String}],// 安全信息lastLogin: { type: Date },loginAttempts: { type: Number, default: 0 },lockUntil: { type: Date },passwordResetToken: { type: String },passwordResetExpires: { type: Date },emailVerificationToken: { type: String },emailVerificationExpires: { type: Date },// 偏好设置preferences: {language: { type: String, default: 'en' },currency: { type: String, default: 'USD' },timezone: { type: String, default: 'UTC' },notifications: {email: { type: Boolean, default: true },sms: { type: Boolean, default: false },push: { type: Boolean, default: true }}},// 元数据metadata: {source: { type: String, default: 'web' }, // web, mobile, apireferrer: { type: String },utmSource: { type: String },utmMedium: { type: String },utmCampaign: { type: String }}
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 2. 虚拟字段
userSchema.virtual('fullName').get(function() {return `${this.profile.firstName} ${this.profile.lastName}`.trim();
});userSchema.virtual('isLocked').get(function() {return !!(this.lockUntil && this.lockUntil > Date.now());
});// 3. 索引
userSchema.index({ email: 1 });
userSchema.index({ username: 1 });
userSchema.index({ 'profile.phone': 1 });
userSchema.index({ status: 1 });
userSchema.index({ createdAt: -1 });// 4. 中间件
// 密码加密中间件
userSchema.pre('save', async function(next) {if (!this.isModified('password')) return next();try {const salt = await bcrypt.genSalt(security.bcrypt.saltRounds);this.password = await bcrypt.hash(this.password, salt);next();} catch (error) {next(error);}
});// 5. 实例方法
// 密码验证
userSchema.methods.comparePassword = async function(candidatePassword) {return bcrypt.compare(candidatePassword, this.password);
};// 生成JWT令牌
userSchema.methods.generateAuthToken = function() {const payload = {id: this._id,username: this.username,email: this.email,roles: this.roles};return jwt.sign(payload, security.jwt.secret, {expiresIn: security.jwt.expiresIn});
};// 生成刷新令牌
userSchema.methods.generateRefreshToken = function() {const payload = {id: this._id,type: 'refresh'};return jwt.sign(payload, security.jwt.secret, {expiresIn: security.jwt.refreshExpiresIn});
};// 增加登录尝试次数
userSchema.methods.incLoginAttempts = function() {// 如果之前有锁定且已过期,重置尝试次数if (this.lockUntil && this.lockUntil < Date.now()) {return this.updateOne({$unset: { lockUntil: 1 },$set: { loginAttempts: 1 }});}const updates = { $inc: { loginAttempts: 1 } };// 如果达到最大尝试次数且未锁定,则锁定账户if (this.loginAttempts + 1 >= 5 && !this.isLocked) {updates.$set = { lockUntil: Date.now() + 2 * 60 * 60 * 1000 }; // 锁定2小时}return this.updateOne(updates);
};// 重置登录尝试
userSchema.methods.resetLoginAttempts = function() {return this.updateOne({$unset: { loginAttempts: 1, lockUntil: 1 }});
};// 6. 静态方法
// 查找用户(支持邮箱或用户名)
userSchema.statics.findByCredentials = async function(identifier, password) {const user = await this.findOne({$or: [{ email: identifier },{ username: identifier }],status: 'active'}).select('+password').populate('roles');if (!user) {throw new Error('Invalid credentials');}if (user.isLocked) {throw new Error('Account is temporarily locked');}const isMatch = await user.comparePassword(password);if (!isMatch) {await user.incLoginAttempts();throw new Error('Invalid credentials');}// 重置登录尝试并更新最后登录时间if (user.loginAttempts > 0) {await user.resetLoginAttempts();}user.lastLogin = new Date();await user.save();return user;
};module.exports = mongoose.model('User', userSchema);
3.3 用户服务控制器
// 1-services/user-service/controllers/userController.js
const User = require('../models/User');
const { validationResult } = require('express-validator');
const crypto = require('crypto');
const { sendEmail } = require('../../../2-shared/utils/email');class UserController {// 1. 用户注册async register(req, res) {try {// 验证输入const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { username, email, password, profile } = req.body;// 检查用户是否已存在const existingUser = await User.findOne({$or: [{ email }, { username }]});if (existingUser) {return res.status(409).json({success: false,message: 'User already exists'});}// 创建新用户const user = new User({username,email,password,profile,emailVerificationToken: crypto.randomBytes(32).toString('hex'),emailVerificationExpires: Date.now() + 24 * 60 * 60 * 1000 // 24小时});await user.save();// 发送验证邮件await this.sendVerificationEmail(user);// 生成令牌const token = user.generateAuthToken();const refreshToken = user.generateRefreshToken();res.status(201).json({success: true,message: 'User registered successfully',data: {user: {id: user._id,username: user.username,email: user.email,profile: user.profile,emailVerified: user.emailVerified},tokens: {accessToken: token,refreshToken}}});} catch (error) {console.error('Registration error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 用户登录async login(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { identifier, password } = req.body;// 验证用户凭据const user = await User.findByCredentials(identifier, password);// 生成令牌const token = user.generateAuthToken();const refreshToken = user.generateRefreshToken();// 记录登录日志await this.logUserActivity(user._id, 'login', req);res.json({success: true,message: 'Login successful',data: {user: {id: user._id,username: user.username,email: user.email,profile: user.profile,roles: user.roles,emailVerified: user.emailVerified},tokens: {accessToken: token,refreshToken}}});} catch (error) {console.error('Login error:', error);res.status(401).json({success: false,message: error.message || 'Login failed'});}}// 3. 获取用户信息async getProfile(req, res) {try {const user = await User.findById(req.user.id).populate('roles').select('-password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}res.json({success: true,data: { user }});} catch (error) {console.error('Get profile error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新用户信息async updateProfile(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const updates = req.body;const allowedUpdates = ['profile', 'preferences', 'addresses'];const actualUpdates = {};// 过滤允许的更新字段Object.keys(updates).forEach(key => {if (allowedUpdates.includes(key)) {actualUpdates[key] = updates[key];}});const user = await User.findByIdAndUpdate(req.user.id,actualUpdates,{ new: true, runValidators: true }).select('-password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}res.json({success: true,message: 'Profile updated successfully',data: { user }});} catch (error) {console.error('Update profile error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 5. 修改密码async changePassword(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { currentPassword, newPassword } = req.body;const user = await User.findById(req.user.id).select('+password');if (!user) {return res.status(404).json({success: false,message: 'User not found'});}// 验证当前密码const isCurrentPasswordValid = await user.comparePassword(currentPassword);if (!isCurrentPasswordValid) {return res.status(400).json({success: false,message: 'Current password is incorrect'});}// 更新密码user.password = newPassword;await user.save();res.json({success: true,message: 'Password changed successfully'});} catch (error) {console.error('Change password error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 邮箱验证async verifyEmail(req, res) {try {const { token } = req.params;const user = await User.findOne({emailVerificationToken: token,emailVerificationExpires: { $gt: Date.now() }});if (!user) {return res.status(400).json({success: false,message: 'Invalid or expired verification token'});}user.emailVerified = true;user.emailVerificationToken = undefined;user.emailVerificationExpires = undefined;await user.save();res.json({success: true,message: 'Email verified successfully'});} catch (error) {console.error('Email verification error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 7. 发送验证邮件async sendVerificationEmail(user) {const verificationUrl = `${process.env.FRONTEND_URL}/verify-email/${user.emailVerificationToken}`;await sendEmail({to: user.email,subject: 'Verify Your Email Address',template: 'email-verification',data: {username: user.username,verificationUrl}});}// 8. 记录用户活动async logUserActivity(userId, action, req) {// 这里可以记录到数据库或日志系统console.log(`User ${userId} performed ${action} from ${req.ip}`);}
}module.exports = new UserController();
4. 商品服务实现
4.1 商品服务数据流
4.2 商品模型设计
// 1-services/product-service/models/Product.js
const mongoose = require('mongoose');// 1. 商品规格模式
const specificationSchema = new mongoose.Schema({name: { type: String, required: true },value: { type: String, required: true },unit: { type: String },order: { type: Number, default: 0 }
});// 2. 商品变体模式
const variantSchema = new mongoose.Schema({sku: { type: String, required: true, unique: true },attributes: [{name: { type: String, required: true },value: { type: String, required: true }}],price: {base: { type: Number, required: true },sale: { type: Number },cost: { type: Number }},inventory: {quantity: { type: Number, default: 0 },reserved: { type: Number, default: 0 },available: { type: Number, default: 0 },lowStockThreshold: { type: Number, default: 10 }},images: [{ type: String }],weight: { type: Number },dimensions: {length: { type: Number },width: { type: Number },height: { type: Number }},status: {type: String,enum: ['active', 'inactive', 'out_of_stock'],default: 'active'}
});// 3. 主商品模式
const productSchema = new mongoose.Schema({// 基本信息name: { type: String, required: true, trim: true },slug: { type: String, required: true, unique: true },description: { type: String, required: true },shortDescription: { type: String },// 分类信息category: {type: mongoose.Schema.Types.ObjectId,ref: 'Category',required: true},subcategories: [{type: mongoose.Schema.Types.ObjectId,ref: 'Category'}],tags: [{ type: String }],// 品牌信息brand: {type: mongoose.Schema.Types.ObjectId,ref: 'Brand'},// 媒体资源images: [{url: { type: String, required: true },alt: { type: String },order: { type: Number, default: 0 },isMain: { type: Boolean, default: false }}],videos: [{url: { type: String },title: { type: String },duration: { type: Number }}],// 商品规格specifications: [specificationSchema],// 商品变体variants: [variantSchema],// 价格信息(基础价格,如果没有变体)price: {base: { type: Number },sale: { type: Number },cost: { type: Number },currency: { type: String, default: 'USD' }},// 库存信息(如果没有变体)inventory: {quantity: { type: Number, default: 0 },reserved: { type: Number, default: 0 },available: { type: Number, default: 0 },lowStockThreshold: { type: Number, default: 10 },trackQuantity: { type: Boolean, default: true }},// 物理属性weight: { type: Number },dimensions: {length: { type: Number },width: { type: Number },height: { type: Number },unit: { type: String, default: 'cm' }},// 状态信息status: {type: String,enum: ['draft', 'active', 'inactive', 'archived'],default: 'draft'},visibility: {type: String,enum: ['public', 'private', 'hidden'],default: 'public'},// SEO信息seo: {title: { type: String },description: { type: String },keywords: [{ type: String }],canonicalUrl: { type: String }},// 销售信息sales: {totalSold: { type: Number, default: 0 },totalRevenue: { type: Number, default: 0 },averageRating: { type: Number, default: 0 },reviewCount: { type: Number, default: 0 }},// 配送信息shipping: {weight: { type: Number },requiresShipping: { type: Boolean, default: true },shippingClass: { type: String },freeShipping: { type: Boolean, default: false }},// 税务信息tax: {taxable: { type: Boolean, default: true },taxClass: { type: String, default: 'standard' }},// 元数据metadata: {vendor: { type: String },manufacturerPartNumber: { type: String },gtin: { type: String }, // Global Trade Item Numbercondition: {type: String,enum: ['new', 'used', 'refurbished'],default: 'new'}},// 时间戳publishedAt: { type: Date },featuredUntil: { type: Date }
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 4. 虚拟字段
productSchema.virtual('isOnSale').get(function() {if (this.variants && this.variants.length > 0) {return this.variants.some(variant => variant.price.sale && variant.price.sale < variant.price.base);}return this.price.sale && this.price.sale < this.price.base;
});productSchema.virtual('minPrice').get(function() {if (this.variants && this.variants.length > 0) {const prices = this.variants.map(v => v.price.sale || v.price.base);return Math.min(...prices);}return this.price.sale || this.price.base;
});productSchema.virtual('maxPrice').get(function() {if (this.variants && this.variants.length > 0) {const prices = this.variants.map(v => v.price.sale || v.price.base);return Math.max(...prices);}return this.price.sale || this.price.base;
});productSchema.virtual('totalInventory').get(function() {if (this.variants && this.variants.length > 0) {return this.variants.reduce((total, variant) => total + variant.inventory.available, 0);}return this.inventory.available;
});// 5. 索引
productSchema.index({ name: 'text', description: 'text', tags: 'text' });
productSchema.index({ slug: 1 });
productSchema.index({ category: 1 });
productSchema.index({ brand: 1 });
productSchema.index({ status: 1 });
productSchema.index({ 'price.base': 1 });
productSchema.index({ 'sales.totalSold': -1 });
productSchema.index({ 'sales.averageRating': -1 });
productSchema.index({ createdAt: -1 });
productSchema.index({ publishedAt: -1 });// 6. 中间件
// 生成slug
productSchema.pre('save', function(next) {if (this.isModified('name') && !this.slug) {this.slug = this.name.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/(^-|-$)/g, '');}next();
});// 更新可用库存
productSchema.pre('save', function(next) {if (this.variants && this.variants.length > 0) {this.variants.forEach(variant => {variant.inventory.available = Math.max(0, variant.inventory.quantity - variant.inventory.reserved);});} else {this.inventory.available = Math.max(0, this.inventory.quantity - this.inventory.reserved);}next();
});// 7. 实例方法
// 检查库存
productSchema.methods.checkStock = function(variantId, quantity = 1) {if (variantId) {const variant = this.variants.id(variantId);return variant && variant.inventory.available >= quantity;}return this.inventory.available >= quantity;
};// 预留库存
productSchema.methods.reserveStock = async function(variantId, quantity) {if (variantId) {const variant = this.variants.id(variantId);if (variant && variant.inventory.available >= quantity) {variant.inventory.reserved += quantity;await this.save();return true;}} else if (this.inventory.available >= quantity) {this.inventory.reserved += quantity;await this.save();return true;}return false;
};// 释放库存
productSchema.methods.releaseStock = async function(variantId, quantity) {if (variantId) {const variant = this.variants.id(variantId);if (variant) {variant.inventory.reserved = Math.max(0, variant.inventory.reserved - quantity);await this.save();}} else {this.inventory.reserved = Math.max(0, this.inventory.reserved - quantity);await this.save();}
};// 8. 静态方法
// 搜索商品
productSchema.statics.search = function(query, options = {}) {const {category,brand,minPrice,maxPrice,inStock,sort = '-createdAt',page = 1,limit = 20} = options;const filter = {status: 'active',visibility: 'public'};if (query) {filter.$text = { $search: query };}if (category) {filter.category = category;}if (brand) {filter.brand = brand;}if (minPrice || maxPrice) {filter['price.base'] = {};if (minPrice) filter['price.base'].$gte = minPrice;if (maxPrice) filter['price.base'].$lte = maxPrice;}if (inStock) {filter['inventory.available'] = { $gt: 0 };}const skip = (page - 1) * limit;return this.find(filter).populate('category brand').sort(sort).skip(skip).limit(limit);
};module.exports = mongoose.model('Product', productSchema);
4.3 商品服务控制器
// 1-services/product-service/controllers/productController.js
const Product = require('../models/Product');
const Category = require('../models/Category');
const { validationResult } = require('express-validator');
const { uploadToS3, deleteFromS3 } = require('../../../2-shared/utils/storage');class ProductController {// 1. 创建商品async createProduct(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const productData = req.body;// 验证分类是否存在const category = await Category.findById(productData.category);if (!category) {return res.status(400).json({success: false,message: 'Invalid category'});}// 处理图片上传if (req.files && req.files.length > 0) {const imageUrls = await Promise.all(req.files.map(async (file, index) => {const url = await uploadToS3(file, 'products');return {url,alt: `${productData.name} image ${index + 1}`,order: index,isMain: index === 0};}));productData.images = imageUrls;}const product = new Product(productData);await product.save();await product.populate('category brand');res.status(201).json({success: true,message: 'Product created successfully',data: { product }});} catch (error) {console.error('Create product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 获取商品列表async getProducts(req, res) {try {const {page = 1,limit = 20,category,brand,minPrice,maxPrice,inStock,sort = '-createdAt',search} = req.query;const options = {category,brand,minPrice: minPrice ? parseFloat(minPrice) : undefined,maxPrice: maxPrice ? parseFloat(maxPrice) : undefined,inStock: inStock === 'true',sort,page: parseInt(page),limit: parseInt(limit)};const products = await Product.search(search, options);const total = await Product.countDocuments({status: 'active',visibility: 'public'});res.json({success: true,data: {products,pagination: {page: parseInt(page),limit: parseInt(limit),total,pages: Math.ceil(total / limit)}}});} catch (error) {console.error('Get products error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 3. 获取单个商品async getProduct(req, res) {try {const { id } = req.params;const product = await Product.findOne({$or: [{ _id: id }, { slug: id }],status: 'active',visibility: 'public'}).populate('category brand');if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}res.json({success: true,data: { product }});} catch (error) {console.error('Get product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新商品async updateProduct(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const { id } = req.params;const updates = req.body;// 处理新上传的图片if (req.files && req.files.length > 0) {const newImages = await Promise.all(req.files.map(async (file, index) => {const url = await uploadToS3(file, 'products');return {url,alt: `Product image ${index + 1}`,order: index};}));// 合并现有图片和新图片if (updates.images) {updates.images = [...updates.images, ...newImages];} else {updates.images = newImages;}}const product = await Product.findByIdAndUpdate(id,updates,{ new: true, runValidators: true }).populate('category brand');if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}res.json({success: true,message: 'Product updated successfully',data: { product }});} catch (error) {console.error('Update product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 5. 删除商品async deleteProduct(req, res) {try {const { id } = req.params;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}// 删除关联的图片if (product.images && product.images.length > 0) {await Promise.all(product.images.map(image => deleteFromS3(image.url)));}await Product.findByIdAndDelete(id);res.json({success: true,message: 'Product deleted successfully'});} catch (error) {console.error('Delete product error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 检查库存async checkStock(req, res) {try {const { id } = req.params;const { variantId, quantity = 1 } = req.query;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}const inStock = product.checkStock(variantId, parseInt(quantity));res.json({success: true,data: {inStock,availableQuantity: variantId ? product.variants.id(variantId)?.inventory.available || 0: product.inventory.available}});} catch (error) {console.error('Check stock error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 7. 预留库存async reserveStock(req, res) {try {const { id } = req.params;const { variantId, quantity } = req.body;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}const reserved = await product.reserveStock(variantId, quantity);if (!reserved) {return res.status(400).json({success: false,message: 'Insufficient stock'});}res.json({success: true,message: 'Stock reserved successfully'});} catch (error) {console.error('Reserve stock error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 8. 获取商品推荐async getRecommendations(req, res) {try {const { id } = req.params;const { limit = 10 } = req.query;const product = await Product.findById(id);if (!product) {return res.status(404).json({success: false,message: 'Product not found'});}// 基于分类和标签的推荐const recommendations = await Product.find({_id: { $ne: id },$or: [{ category: product.category },{ tags: { $in: product.tags } }],status: 'active',visibility: 'public'}).populate('category brand').sort({ 'sales.totalSold': -1 }).limit(parseInt(limit));res.json({success: true,data: { recommendations }});} catch (error) {console.error('Get recommendations error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}
}module.exports = new ProductController();
5. 订单服务实现
5.1 订单处理流程
5.2 订单状态机
5.3 订单模型设计
// 1-services/order-service/models/Order.js
const mongoose = require('mongoose');// 1. 订单项模式
const orderItemSchema = new mongoose.Schema({product: {type: mongoose.Schema.Types.ObjectId,ref: 'Product',required: true},variant: {type: mongoose.Schema.Types.ObjectId,required: false},sku: { type: String, required: true },name: { type: String, required: true },description: { type: String },image: { type: String },// 价格信息unitPrice: { type: Number, required: true },salePrice: { type: Number },quantity: { type: Number, required: true, min: 1 },totalPrice: { type: Number, required: true },// 折扣信息discount: {amount: { type: Number, default: 0 },percentage: { type: Number, default: 0 },couponCode: { type: String }},// 税费信息tax: {rate: { type: Number, default: 0 },amount: { type: Number, default: 0 }},// 状态信息status: {type: String,enum: ['pending', 'confirmed', 'processing', 'shipped', 'delivered', 'cancelled', 'returned'],default: 'pending'},// 履行信息fulfillment: {warehouse: { type: String },trackingNumber: { type: String },carrier: { type: String },shippedAt: { type: Date },deliveredAt: { type: Date }}
});// 2. 地址模式
const addressSchema = new mongoose.Schema({firstName: { type: String, required: true },lastName: { type: String, required: true },company: { type: String },street1: { type: String, required: true },street2: { type: String },city: { type: String, required: true },state: { type: String, required: true },zipCode: { type: String, required: true },country: { type: String, required: true },phone: { type: String }
});// 3. 支付信息模式
const paymentSchema = new mongoose.Schema({method: {type: String,enum: ['credit_card', 'debit_card', 'paypal', 'stripe', 'bank_transfer', 'cash_on_delivery'],required: true},status: {type: String,enum: ['pending', 'processing', 'completed', 'failed', 'cancelled', 'refunded'],default: 'pending'},transactionId: { type: String },gatewayResponse: { type: mongoose.Schema.Types.Mixed },amount: { type: Number, required: true },currency: { type: String, default: 'USD' },paidAt: { type: Date },refundedAt: { type: Date },refundAmount: { type: Number, default: 0 }
});// 4. 主订单模式
const orderSchema = new mongoose.Schema({// 订单基本信息orderNumber: { type: String, required: true, unique: true },customer: {type: mongoose.Schema.Types.ObjectId,ref: 'User',required: true},// 订单项items: [orderItemSchema],// 地址信息shippingAddress: { type: addressSchema, required: true },billingAddress: { type: addressSchema, required: true },// 价格信息pricing: {subtotal: { type: Number, required: true },discount: { type: Number, default: 0 },tax: { type: Number, default: 0 },shipping: { type: Number, default: 0 },total: { type: Number, required: true },currency: { type: String, default: 'USD' }},// 优惠券信息coupons: [{code: { type: String, required: true },discount: { type: Number, required: true },type: { type: String, enum: ['percentage', 'fixed'], required: true }}],// 配送信息shipping: {method: { type: String, required: true },carrier: { type: String },service: { type: String },cost: { type: Number, default: 0 },estimatedDelivery: { type: Date },trackingNumber: { type: String },trackingUrl: { type: String }},// 支付信息payment: paymentSchema,// 订单状态status: {type: String,enum: ['pending', // 待确认'confirmed', // 已确认'processing', // 处理中'shipped', // 已发货'delivered', // 已送达'completed', // 已完成'cancelled', // 已取消'returned', // 已退货'refunded' // 已退款],default: 'pending'},// 状态历史statusHistory: [{status: { type: String, required: true },timestamp: { type: Date, default: Date.now },note: { type: String },updatedBy: { type: mongoose.Schema.Types.ObjectId, ref: 'User' }}],// 备注信息notes: {customer: { type: String },internal: { type: String }},// 时间戳placedAt: { type: Date, default: Date.now },confirmedAt: { type: Date },shippedAt: { type: Date },deliveredAt: { type: Date },completedAt: { type: Date },cancelledAt: { type: Date },// 元数据metadata: {source: { type: String, default: 'web' }, // web, mobile, apiuserAgent: { type: String },ipAddress: { type: String },referrer: { type: String }}
}, {timestamps: true,toJSON: { virtuals: true },toObject: { virtuals: true }
});// 5. 虚拟字段
orderSchema.virtual('totalItems').get(function() {return this.items.reduce((total, item) => total + item.quantity, 0);
});orderSchema.virtual('canCancel').get(function() {return ['pending', 'confirmed'].includes(this.status);
});orderSchema.virtual('canReturn').get(function() {return ['delivered'].includes(this.status);
});orderSchema.virtual('isCompleted').get(function() {return ['completed', 'cancelled', 'refunded'].includes(this.status);
});// 6. 索引
orderSchema.index({ orderNumber: 1 });
orderSchema.index({ customer: 1 });
orderSchema.index({ status: 1 });
orderSchema.index({ placedAt: -1 });
orderSchema.index({ 'payment.status': 1 });
orderSchema.index({ 'shipping.trackingNumber': 1 });// 7. 中间件
// 生成订单号
orderSchema.pre('save', async function(next) {if (this.isNew && !this.orderNumber) {this.orderNumber = await this.constructor.generateOrderNumber();}next();
});// 更新状态历史
orderSchema.pre('save', function(next) {if (this.isModified('status')) {this.statusHistory.push({status: this.status,timestamp: new Date()});}next();
});// 8. 实例方法
// 更新订单状态
orderSchema.methods.updateStatus = async function(newStatus, note, updatedBy) {const validTransitions = {pending: ['confirmed', 'cancelled'],confirmed: ['processing', 'cancelled'],processing: ['shipped', 'cancelled'],shipped: ['delivered', 'returned'],delivered: ['completed', 'returned'],returned: ['refunded'],cancelled: [],completed: [],refunded: []};if (!validTransitions[this.status].includes(newStatus)) {throw new Error(`Invalid status transition from ${this.status} to ${newStatus}`);}this.status = newStatus;this.statusHistory.push({status: newStatus,timestamp: new Date(),note,updatedBy});// 更新相关时间戳switch (newStatus) {case 'confirmed':this.confirmedAt = new Date();break;case 'shipped':this.shippedAt = new Date();break;case 'delivered':this.deliveredAt = new Date();break;case 'completed':this.completedAt = new Date();break;case 'cancelled':this.cancelledAt = new Date();break;}await this.save();
};// 计算总价
orderSchema.methods.calculateTotal = function() {const subtotal = this.items.reduce((total, item) => total + item.totalPrice, 0);const discount = this.coupons.reduce((total, coupon) => total + coupon.discount, 0);const tax = this.pricing.tax || 0;const shipping = this.shipping.cost || 0;this.pricing.subtotal = subtotal;this.pricing.discount = discount;this.pricing.total = subtotal - discount + tax + shipping;return this.pricing.total;
};// 9. 静态方法
// 生成订单号
orderSchema.statics.generateOrderNumber = async function() {const date = new Date();const year = date.getFullYear().toString().slice(-2);const month = (date.getMonth() + 1).toString().padStart(2, '0');const day = date.getDate().toString().padStart(2, '0');const prefix = `ORD${year}${month}${day}`;// 查找当天最大的订单号const lastOrder = await this.findOne({orderNumber: { $regex: `^${prefix}` }}).sort({ orderNumber: -1 });let sequence = 1;if (lastOrder) {const lastSequence = parseInt(lastOrder.orderNumber.slice(-4));sequence = lastSequence + 1;}return `${prefix}${sequence.toString().padStart(4, '0')}`;
};// 按状态查询订单
orderSchema.statics.findByStatus = function(status, options = {}) {const { page = 1, limit = 20, sort = '-placedAt' } = options;const skip = (page - 1) * limit;return this.find({ status }).populate('customer', 'username email profile').sort(sort).skip(skip).limit(limit);
};// 按客户查询订单
orderSchema.statics.findByCustomer = function(customerId, options = {}) {const { page = 1, limit = 20, sort = '-placedAt' } = options;const skip = (page - 1) * limit;return this.find({ customer: customerId }).sort(sort).skip(skip).limit(limit);
};module.exports = mongoose.model('Order', orderSchema);
5.4 订单服务控制器
// 1-services/order-service/controllers/orderController.js
const Order = require('../models/Order');
const { validationResult } = require('express-validator');
const { reserveStock, releaseStock } = require('../services/inventoryService');
const { processPayment } = require('../services/paymentService');
const { sendOrderConfirmation } = require('../services/notificationService');class OrderController {// 1. 创建订单async createOrder(req, res) {try {const errors = validationResult(req);if (!errors.isEmpty()) {return res.status(400).json({success: false,message: 'Validation failed',errors: errors.array()});}const orderData = req.body;orderData.customer = req.user.id;// 验证库存const stockValidation = await this.validateStock(orderData.items);if (!stockValidation.valid) {return res.status(400).json({success: false,message: 'Insufficient stock',details: stockValidation.errors});}// 预留库存const reservationResults = await this.reserveOrderStock(orderData.items);if (!reservationResults.success) {return res.status(400).json({success: false,message: 'Failed to reserve stock',details: reservationResults.errors});}try {// 创建订单const order = new Order(orderData);order.calculateTotal();await order.save();// 处理支付if (orderData.payment.method !== 'cash_on_delivery') {const paymentResult = await processPayment({amount: order.pricing.total,currency: order.pricing.currency,method: orderData.payment.method,orderId: order._id,customer: req.user});if (!paymentResult.success) {// 支付失败,释放库存await this.releaseOrderStock(orderData.items);await order.updateStatus('cancelled', 'Payment failed');return res.status(400).json({success: false,message: 'Payment failed',details: paymentResult.error});}// 更新支付信息order.payment.status = 'completed';order.payment.transactionId = paymentResult.transactionId;order.payment.paidAt = new Date();await order.updateStatus('confirmed', 'Payment completed');} else {await order.updateStatus('confirmed', 'Cash on delivery order');}// 发送订单确认通知await sendOrderConfirmation(order);await order.populate('customer', 'username email profile');res.status(201).json({success: true,message: 'Order created successfully',data: { order }});} catch (error) {// 创建订单失败,释放库存await this.releaseOrderStock(orderData.items);throw error;}} catch (error) {console.error('Create order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 2. 获取订单列表async getOrders(req, res) {try {const {page = 1,limit = 20,status,customer,startDate,endDate,sort = '-placedAt'} = req.query;const filter = {};// 如果不是管理员,只能查看自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;} else if (customer) {filter.customer = customer;}if (status) {filter.status = status;}if (startDate || endDate) {filter.placedAt = {};if (startDate) filter.placedAt.$gte = new Date(startDate);if (endDate) filter.placedAt.$lte = new Date(endDate);}const skip = (page - 1) * limit;const orders = await Order.find(filter).populate('customer', 'username email profile').sort(sort).skip(skip).limit(parseInt(limit));const total = await Order.countDocuments(filter);res.json({success: true,data: {orders,pagination: {page: parseInt(page),limit: parseInt(limit),total,pages: Math.ceil(total / limit)}}});} catch (error) {console.error('Get orders error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 3. 获取单个订单async getOrder(req, res) {try {const { id } = req.params;const filter = { _id: id };// 如果不是管理员,只能查看自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;}const order = await Order.findOne(filter).populate('customer', 'username email profile').populate('items.product', 'name images');if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}res.json({success: true,data: { order }});} catch (error) {console.error('Get order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 4. 更新订单状态async updateOrderStatus(req, res) {try {const { id } = req.params;const { status, note } = req.body;const order = await Order.findById(id);if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}await order.updateStatus(status, note, req.user.id);res.json({success: true,message: 'Order status updated successfully',data: { order }});} catch (error) {console.error('Update order status error:', error);res.status(500).json({success: false,message: error.message || 'Internal server error'});}}// 5. 取消订单async cancelOrder(req, res) {try {const { id } = req.params;const { reason } = req.body;const filter = { _id: id };// 如果不是管理员,只能取消自己的订单if (!req.user.roles.includes('admin')) {filter.customer = req.user.id;}const order = await Order.findOne(filter);if (!order) {return res.status(404).json({success: false,message: 'Order not found'});}if (!order.canCancel) {return res.status(400).json({success: false,message: 'Order cannot be cancelled'});}// 释放库存await this.releaseOrderStock(order.items);// 处理退款(如果已支付)if (order.payment.status === 'completed') {// 这里应该调用支付服务进行退款order.payment.status = 'refunded';order.payment.refundedAt = new Date();order.payment.refundAmount = order.pricing.total;}await order.updateStatus('cancelled', reason, req.user.id);res.json({success: true,message: 'Order cancelled successfully',data: { order }});} catch (error) {console.error('Cancel order error:', error);res.status(500).json({success: false,message: 'Internal server error'});}}// 6. 验证库存async validateStock(items) {const errors = [];for (const item of items) {const stockCheck = await reserveStock(item.product,item.variant,item.quantity,true // 仅检查,不实际预留);if (!stockCheck.success) {errors.push({product: item.product,variant: item.variant,requested: item.quantity,available: stockCheck.available,message: stockCheck.message});}}return {valid: errors.length === 0,errors};}// 7. 预留订单库存async reserveOrderStock(items) {const errors = [];const reservations = [];for (const item of items) {try {const result = await reserveStock(item.product,item.variant,item.quantity);if (result.success) {reservations.push({product: item.product,variant: item.variant,quantity: item.quantity});} else {errors.push({product: item.product,variant: item.variant,message: result.message});}} catch (error) {errors.push({product: item.product,variant: item.variant,message: error.message});}}// 如果有任何失败,回滚所有预留if (errors.length > 0) {for (const reservation of reservations) {await releaseStock(reservation.product,reservation.variant,reservation.quantity);}}return {success: errors.length === 0,errors,reservations};}// 8. 释放订单库存async releaseOrderStock(items) {for (const item of items) {try {await releaseStock(item.product,item.variant,item.quantity);} catch (error) {console.error('Failed to release stock:', error);}}}
}module.exports = new OrderController();
6. 支付服务实现
6.1 支付系统架构
6.2 支付流程图
6.3 支付服务实现
// 1-services/payment-service/services/paymentService.js
const stripe = require('stripe')(process.env.STRIPE_SECRET_KEY);
const paypal = require('@paypal/checkout-server-sdk');
const Payment = require('../models/Payment');
const { sendPaymentNotification } = require('./notificationService');class PaymentService {constructor() {this.gateways = {stripe: new StripeGateway(),paypal: new PayPalGateway(),bank: new BankGateway()};}// 1. 处理支付async processPayment(paymentData) {try {// 创建支付记录const payment = new Payment({orderId: paymentData.orderId,customerId: paymentData.customerId,amount: paymentData.amount,currency: paymentData.currency,method: paymentData.method,status: 'pending',metadata: paymentData.metadata});await payment.save();// 风险检查const riskAssessment = await this.assessRisk(payment);if (riskAssessment.risk === 'high') {payment.status = 'requires_review';payment.riskScore = riskAssessment.score;await payment.save();return {success: false,payment,message: 'Payment requires manual review'};}// 选择支付网关const gateway = this.selectGateway(paymentData.method);if (!gateway) {throw new Error(`Unsupported payment method: ${paymentData.method}`);}// 处理支付const result = await gateway.processPayment(payment, paymentData);// 更新支付状态payment.status = result.status;payment.transactionId = result.transactionId;payment.gatewayResponse = result.response;payment.processedAt = new Date();if (result.status === 'completed') {payment.completedAt = new Date();}await payment.save();// 发送通知await sendPaymentNotification(payment);return {success: result.status === 'completed',payment,transactionId: result.transactionId};} catch (error) {console.error('Payment processing error:', error);throw error;}}// 2. 处理退款async processRefund(paymentId, refundData) {try {const payment = await Payment.findById(paymentId);if (!payment) {throw new Error('Payment not found');}if (payment.status !== 'completed') {throw new Error('Payment is not completed');}const refundAmount = refundData.amount || payment.amount;if (refundAmount > payment.amount - payment.refundedAmount) {throw new Error('Refund amount exceeds available amount');}// 选择支付网关const gateway = this.selectGateway(payment.method);const result = await gateway.processRefund(payment, refundAmount);// 创建退款记录const refund = {amount: refundAmount,reason: refundData.reason,transactionId: result.transactionId,status: result.status,processedAt: new Date()};payment.refunds.push(refund);payment.refundedAmount += refundAmount;if (payment.refundedAmount >= payment.amount) {payment.status = 'refunded';} else {payment.status = 'partially_refunded';}await payment.save();return {success: true,refund,payment};} catch (error) {console.error('Refund processing error:', error);throw error;}}// 3. 风险评估async assessRisk(payment) {let score = 0;const factors = [];// 金额风险if (payment.amount > 1000) {score += 20;factors.push('high_amount');}// 地理位置风险if (payment.metadata.country && this.isHighRiskCountry(payment.metadata.country)) {score += 30;factors.push('high_risk_country');}// 用户历史const userPayments = await Payment.find({customerId: payment.customerId,status: { $in: ['completed', 'failed'] }}).limit(10);const failureRate = userPayments.filter(p => p.status === 'failed').length / userPayments.length;if (failureRate > 0.3) {score += 25;factors.push('high_failure_rate');}// 设备指纹if (payment.metadata.deviceFingerprint) {const devicePayments = await Payment.find({'metadata.deviceFingerprint': payment.metadata.deviceFingerprint,status: 'failed'}).limit(5);if (devicePayments.length > 2) {score += 15;factors.push('suspicious_device');}}let risk = 'low';if (score >= 50) {risk = 'high';} else if (score >= 25) {risk = 'medium';}return {score,risk,factors};}// 4. 选择支付网关selectGateway(method) {const gatewayMap = {'credit_card': 'stripe','debit_card': 'stripe','paypal': 'paypal','bank_transfer': 'bank'};const gatewayName = gatewayMap[method];return this.gateways[gatewayName];}// 5. 检查高风险国家isHighRiskCountry(country) {const highRiskCountries = ['XX', 'YY', 'ZZ']; // 示例return highRiskCountries.includes(country);}// 6. 处理 Webhookasync handleWebhook(provider, payload, signature) {try {const gateway = this.gateways[provider];if (!gateway) {throw new Error(`Unknown payment provider: ${provider}`);}const event = await gateway.verifyWebhook(payload, signature);await this.processWebhookEvent(event);return { success: true };} catch (error) {console.error('Webhook processing error:', error);throw error;}}// 7. 处理 Webhook 事件async processWebhookEvent(event) {switch (event.type) {case 'payment.completed':await this.handlePaymentCompleted(event.data);break;case 'payment.failed':await this.handlePaymentFailed(event.data);break;case 'refund.completed':await this.handleRefundCompleted(event.data);break;default:console.log(`Unhandled webhook event: ${event.type}`);}}// 8. 处理支付完成事件async handlePaymentCompleted(data) {const payment = await Payment.findOne({transactionId: data.transactionId});if (payment && payment.status !== 'completed') {payment.status = 'completed';payment.completedAt = new Date();await payment.save();// 通知订单服务await this.notifyOrderService(payment.orderId, 'payment_completed');}}// 9. 处理支付失败事件async handlePaymentFailed(data) {const payment = await Payment.findOne({transactionId: data.transactionId});if (payment && payment.status !== 'failed') {payment.status = 'failed';payment.failureReason = data.reason;await payment.save();// 通知订单服务await this.notifyOrderService(payment.orderId, 'payment_failed');}}// 10. 通知订单服务async notifyOrderService(orderId, event) {// 这里应该发送消息到订单服务console.log(`Notifying order service: ${orderId} - ${event}`);}
}// Stripe 网关实现
class StripeGateway {async processPayment(payment, paymentData) {try {const paymentIntent = await stripe.paymentIntents.create({amount: Math.round(payment.amount * 100), // Stripe 使用分为单位currency: payment.currency,payment_method: paymentData.paymentMethodId,confirmation_method: 'manual',confirm: true,metadata: {orderId: payment.orderId.toString(),paymentId: payment._id.toString()}});return {status: this.mapStripeStatus(paymentIntent.status),transactionId: paymentIntent.id,response: paymentIntent};} catch (error) {return {status: 'failed',transactionId: null,response: { error: error.message }};}}async processRefund(payment, amount) {try {const refund = await stripe.refunds.create({payment_intent: payment.transactionId,amount: Math.round(amount * 100)});return {status: 'completed',transactionId: refund.id};} catch (error) {throw new Error(`Stripe refund failed: ${error.message}`);}}async verifyWebhook(payload, signature) {try {return stripe.webhooks.constructEvent(payload,signature,process.env.STRIPE_WEBHOOK_SECRET);} catch (error) {throw new Error(`Webhook verification failed: ${error.message}`);}}mapStripeStatus(stripeStatus) {const statusMap = {'succeeded': 'completed','requires_payment_method': 'failed','requires_confirmation': 'pending','requires_action': 'pending','processing': 'processing','canceled': 'cancelled'};return statusMap[stripeStatus] || 'pending';}
}// PayPal 网关实现
class PayPalGateway {constructor() {const environment = process.env.PAYPAL_MODE === 'live' ? new paypal.core.LiveEnvironment(process.env.PAYPAL_CLIENT_ID,process.env.PAYPAL_CLIENT_SECRET): new paypal.core.SandboxEnvironment(process.env.PAYPAL_CLIENT_ID,process.env.PAYPAL_CLIENT_SECRET);this.client = new paypal.core.PayPalHttpClient(environment);}async processPayment(payment, paymentData) {try {const request = new paypal.orders.OrdersCaptureRequest(paymentData.orderId);const response = await this.client.execute(request);return {status: this.mapPayPalStatus(response.result.status),transactionId: response.result.id,response: response.result};} catch (error) {return {status: 'failed',transactionId: null,response: { error: error.message }};}}async processRefund(payment, amount) {// PayPal 退款实现// 这里需要根据 PayPal API 实现退款逻辑throw new Error('PayPal refund not implemented');}async verifyWebhook(payload, signature) {// PayPal Webhook 验证实现throw new Error('PayPal webhook verification not implemented');}mapPayPalStatus(paypalStatus) {const statusMap = {'COMPLETED': 'completed','PENDING': 'pending','DECLINED': 'failed','VOIDED': 'cancelled'};return statusMap[paypalStatus] || 'pending';}
}// 银行网关实现
class BankGateway {async processPayment(payment, paymentData) {// 银行转账通常是异步的return {status: 'pending',transactionId: `bank_${Date.now()}`,response: { message: 'Bank transfer initiated' }};}async processRefund(payment, amount) {// 银行退款实现throw new Error('Bank refund requires manual processing');}async verifyWebhook(payload, signature) {// 银行 Webhook 验证实现throw new Error('Bank webhook verification not implemented');}
}module.exports = PaymentService;
7. 实时通知系统
7.1 通知系统架构
7.2 通知流程图
7.3 通知服务实现
// 1-services/notification-service/services/notificationService.js
const Queue = require('bull');
const nodemailer = require('nodemailer');
const twilio = require('twilio');
const webpush = require('web-push');
const Notification = require('../models/Notification');
const NotificationTemplate = require('../models/NotificationTemplate');
const UserPreference = require('../models/UserPreference');class NotificationService {constructor() {// 初始化队列this.emailQueue = new Queue('email notifications', process.env.REDIS_URL);this.smsQueue = new Queue('sms notifications', process.env.REDIS_URL);this.pushQueue = new Queue('push notifications', process.env.REDIS_URL);this.websocketQueue = new Queue('websocket notifications', process.env.REDIS_URL);// 初始化服务提供商this.emailTransporter = this.initEmailTransporter();this.smsClient = this.initSMSClient();this.initPushService();// 设置队列处理器this.setupQueueProcessors();}// 1. 发送通知(主入口)async sendNotification(notificationData) {try {const {userId,type,title,content,data = {},channels = ['email', 'push'],priority = 'normal',scheduledAt = null} = notificationData;// 检查用户偏好const userPreferences = await this.getUserPreferences(userId);const allowedChannels = this.filterChannelsByPreferences(channels, userPreferences, type);if (allowedChannels.length === 0) {console.log(`No allowed channels for user ${userId} and notification type ${type}`);return { success: true, message: 'Notification skipped due to user preferences' };}// 创建通知记录const notification = new Notification({userId,type,title,content,data,channels: allowedChannels,priority,status: 'pending',scheduledAt: scheduledAt || new Date()});await notification.save();// 根据渠道分发通知const results = await Promise.allSettled(allowedChannels.map(channel => this.dispatchToChannel(channel, notification)));// 更新通知状态const hasFailures = results.some(result => result.status === 'rejected');notification.status = hasFailures ? 'partial_failure' : 'sent';notification.sentAt = new Date();await notification.save();return {success: true,notificationId: notification._id,results};} catch (error) {console.error('Send notification error:', error);throw error;}}// 2. 分发到指定渠道async dispatchToChannel(channel, notification) {const jobData = {notificationId: notification._id,userId: notification.userId,type: notification.type,title: notification.title,content: notification.content,data: notification.data};const options = {priority: this.getPriorityValue(notification.priority),delay: notification.scheduledAt > new Date() ? notification.scheduledAt.getTime() - Date.now() : 0,attempts: 3,backoff: {type: 'exponential',delay: 2000}};switch (channel) {case 'email':return this.emailQueue.add('send-email', jobData, options);case 'sms':return this.smsQueue.add('send-sms', jobData, options);case 'push':return this.pushQueue.add('send-push', jobData, options);case 'websocket':return this.websocketQueue.add('send-websocket', jobData, options);default:throw new Error(`Unknown notification channel: ${channel}`);}}// 3. 设置队列处理器setupQueueProcessors() {// 邮件处理器this.emailQueue.process('send-email', async (job) => {return this.processEmailNotification(job.data);});// 短信处理器this.smsQueue.process('send-sms', async (job) => {return this.processSMSNotification(job.data);});// 推送处理器this.pushQueue.process('send-push', async (job) => {return this.processPushNotification(job.data);});// WebSocket处理器this.websocketQueue.process('send-websocket', async (job) => {return this.processWebSocketNotification(job.data);});// 错误处理[this.emailQueue, this.smsQueue, this.pushQueue, this.websocketQueue].forEach(queue => {queue.on('failed', (job, err) => {console.error(`Job ${job.id} failed:`, err);this.handleJobFailure(job, err);});queue.on('completed', (job) => {console.log(`Job ${job.id} completed successfully`);this.handleJobSuccess(job);});});}// 4. 处理邮件通知async processEmailNotification(data) {try {const { userId, type, title, content, data: notificationData } = data;// 获取用户邮箱const user = await this.getUserById(userId);if (!user || !user.email) {throw new Error('User email not found');}// 获取邮件模板const template = await this.getTemplate(type, 'email');if (!template) {throw new Error(`Email template not found for type: ${type}`);}// 渲染邮件内容const renderedContent = await this.renderTemplate(template, {user,title,content,...notificationData});// 发送邮件const mailOptions = {from: process.env.EMAIL_FROM,to: user.email,subject: renderedContent.subject,html: renderedContent.html,text: renderedContent.text};const result = await this.emailTransporter.sendMail(mailOptions);return {success: true,messageId: result.messageId,channel: 'email'};} catch (error) {console.error('Email notification error:', error);throw error;}}// 5. 处理短信通知async processSMSNotification(data) {try {const { userId, type, content } = data;// 获取用户手机号const user = await this.getUserById(userId);if (!user || !user.phone) {throw new Error('User phone number not found');}// 获取短信模板const template = await this.getTemplate(type, 'sms');if (!template) {throw new Error(`SMS template not found for type: ${type}`);}// 渲染短信内容const renderedContent = await this.renderTemplate(template, {user,content,...data.data});// 发送短信const result = await this.smsClient.messages.create({body: renderedContent.text,from: process.env.TWILIO_PHONE_NUMBER,to: user.phone});return {success: true,messageId: result.sid,channel: 'sms'};} catch (error) {console.error('SMS notification error:', error);throw error;}}// 6. 处理推送通知async processPushNotification(data) {try {const { userId, title, content, data: notificationData } = data;// 获取用户推送订阅const subscriptions = await this.getUserPushSubscriptions(userId);if (!subscriptions || subscriptions.length === 0) {throw new Error('No push subscriptions found for user');}const payload = JSON.stringify({title,body: content,icon: '/icon-192x192.png',badge: '/badge-72x72.png',data: notificationData});// 发送到所有订阅const results = await Promise.allSettled(subscriptions.map(subscription =>webpush.sendNotification(subscription, payload)));const successCount = results.filter(r => r.status === 'fulfilled').length;return {success: successCount > 0,successCount,totalCount: subscriptions.length,channel: 'push'};} catch (error) {console.error('Push notification error:', error);throw error;}}// 7. 处理WebSocket通知async processWebSocketNotification(data) {try {const { userId, title, content, data: notificationData } = data;// 获取WebSocket连接const connections = await this.getUserWebSocketConnections(userId);if (!connections || connections.length === 0) {throw new Error('No WebSocket connections found for user');}const message = {type: 'notification',title,content,data: notificationData,timestamp: new Date().toISOString()};// 发送到所有连接let successCount = 0;connections.forEach(connection => {try {if (connection.readyState === 1) { // WebSocket.OPENconnection.send(JSON.stringify(message));successCount++;}} catch (error) {console.error('WebSocket send error:', error);}});return {success: successCount > 0,successCount,totalCount: connections.length,channel: 'websocket'};} catch (error) {console.error('WebSocket notification error:', error);throw error;}}// 8. 获取用户偏好async getUserPreferences(userId) {try {let preferences = await UserPreference.findOne({ userId });if (!preferences) {// 创建默认偏好preferences = new UserPreference({userId,channels: {email: { enabled: true, types: ['all'] },sms: { enabled: false, types: [] },push: { enabled: true, types: ['all'] },websocket: { enabled: true, types: ['all'] }},doNotDisturb: {enabled: false,startTime: '22:00',endTime: '08:00'},frequency: {maxPerHour: 10,maxPerDay: 50}});await preferences.save();}return preferences;} catch (error) {console.error('Get user preferences error:', error);return null;}}// 9. 根据偏好过滤渠道filterChannelsByPreferences(channels, preferences, notificationType) {if (!preferences) return channels;const allowedChannels = [];channels.forEach(channel => {const channelPref = preferences.channels[channel];if (channelPref && channelPref.enabled) {// 检查通知类型是否允许if (channelPref.types.includes('all') || channelPref.types.includes(notificationType)) {// 检查免打扰时间if (!this.isInDoNotDisturbTime(preferences.doNotDisturb)) {allowedChannels.push(channel);}}}});return allowedChannels;}// 10. 检查是否在免打扰时间isInDoNotDisturbTime(doNotDisturb) {if (!doNotDisturb.enabled) return false;const now = new Date();const currentTime = now.getHours() * 60 + now.getMinutes();const [startHour, startMin] = doNotDisturb.startTime.split(':').map(Number);const [endHour, endMin] = doNotDisturb.endTime.split(':').map(Number);const startTime = startHour * 60 + startMin;const endTime = endHour * 60 + endMin;if (startTime <= endTime) {return currentTime >= startTime && currentTime <= endTime;} else {// 跨天的情况return currentTime >= startTime || currentTime <= endTime;}}// 11. 初始化邮件传输器initEmailTransporter() {return nodemailer.createTransporter({service: process.env.EMAIL_SERVICE,auth: {user: process.env.EMAIL_USER,pass: process.env.EMAIL_PASSWORD}});}// 12. 初始化短信客户端initSMSClient() {return twilio(process.env.TWILIO_ACCOUNT_SID,process.env.TWILIO_AUTH_TOKEN);}// 13. 初始化推送服务initPushService() {webpush.setVapidDetails('mailto:' + process.env.EMAIL_USER,process.env.VAPID_PUBLIC_KEY,process.env.VAPID_PRIVATE_KEY);}// 14. 获取优先级数值getPriorityValue(priority) {const priorities = {'low': 1,'normal': 5,'high': 10,'urgent': 15};return priorities[priority] || 5;}// 15. 处理任务失败async handleJobFailure(job, error) {try {await Notification.findByIdAndUpdate(job.data.notificationId, {$push: {failures: {channel: job.queue.name.split(' ')[0],error: error.message,timestamp: new Date()}}});} catch (err) {console.error('Handle job failure error:', err);}}// 16. 处理任务成功async handleJobSuccess(job) {try {await Notification.findByIdAndUpdate(job.data.notificationId, {$push: {deliveries: {channel: job.queue.name.split(' ')[0],timestamp: new Date()}}});} catch (err) {console.error('Handle job success error:', err);}}// 辅助方法(需要根据实际情况实现)async getUserById(userId) {// 调用用户服务获取用户信息// 这里应该是实际的用户服务调用return { id: userId, email: 'user@example.com', phone: '+1234567890' };}async getTemplate(type, channel) {return NotificationTemplate.findOne({ type, channel });}async renderTemplate(template, data) {// 使用模板引擎渲染内容// 这里可以使用 Handlebars, Mustache 等模板引擎return {subject: template.subject,html: template.htmlContent,text: template.textContent};}async getUserPushSubscriptions(userId) {// 获取用户的推送订阅return [];}async getUserWebSocketConnections(userId) {// 获取用户的WebSocket连接return [];}
}module.exports = NotificationService;
结语
感谢您的阅读!期待您的一键三连!欢迎指正!