基于Elasticsearch的短视频平台个性化推荐系统设计与实现
在当今内容爆炸的时代,个性化推荐系统已成为短视频平台的核心竞争力之一。本文将详细介绍如何利用Elasticsearch(ES)构建一个高效、可扩展的短视频个性化推荐系统。
一、系统架构概述
我们的推荐系统将采用混合推荐策略,结合协同过滤、内容相似度和热度推荐等多种方法。Elasticsearch作为核心搜索引擎和数据存储,将承担以下职责:
- 用户画像存储与查询
- 视频内容索引与检索
- 实时行为日志分析
- 推荐结果计算与排序
二、数据模型设计
1. 用户数据模型
{"mappings": {"properties": {"user_id": {"type": "keyword"},"age": {"type": "integer"},"gender": {"type": "keyword"},"location": {"type": "geo_point"},"interests": {"type": "keyword"},"watch_history": {"type": "nested","properties": {"video_id": {"type": "keyword"},"watch_time": {"type": "date"},"duration": {"type": "float"},"interaction": {"type": "nested","properties": {"type": {"type": "keyword"}, // like, share, comment, etc."timestamp": {"type": "date"}}}}},"followers": {"type": "keyword"},"following": {"type": "keyword"},"created_at": {"type": "date"}}}
}
2. 视频数据模型
{"mappings": {"properties": {"video_id": {"type": "keyword"},"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"description": {"type": "text","analyzer": "ik_max_word"},"tags": {"type": "keyword"},"category": {"type": "keyword"},"creator_id": {"type": "keyword"},"duration": {"type": "integer"},"created_at": {"type": "date"},"location": {"type": "geo_point"},"stats": {"properties": {"views": {"type": "integer"},"likes": {"type": "integer"},"shares": {"type": "integer"},"comments": {"type": "integer"},"watch_time_avg": {"type": "float"}}},"embedding": {"type": "dense_vector","dims": 512}}}
}
三、核心推荐算法实现
1. 基于用户画像的内容推荐
from elasticsearch import Elasticsearch
from datetime import datetime, timedeltaes = Elasticsearch(["localhost:9200"])def get_content_based_recommendations(user_id, size=10):# 获取用户画像user_profile = es.get(index="user_profiles", id=user_id)['_source']# 构建查询query = {"bool": {"should": [{"terms": {"tags": user_profile.get("interests", [])}},{"term": {"category": user_profile.get("primary_interest")}},{"geo_distance": {"distance": "100km","location": user_profile.get("location")}}],"must_not": [{"terms": {"video_id": [h['video_id'] for h in user_profile.get('watch_history', [])]}}]}}# 添加时间衰减因子 - 优先推荐新内容recency_script = {"script_score": {"script": {"source": """double decay = 0.5;double scale = 7;double offset = 0;double decayValue = decay * Math.exp(-Math.max(doc['created_at'].value.toInstant().toEpochMilli() - params.now, 0) / scale);return decayValue + _score;""","params": {"now": datetime.now().timestamp() * 1000}}}}response = es.search(index="videos",body={"query": {"function_score": {"query": query,"functions": [recency_script],"score_mode": "sum"}},"size": size})return [hit['_source'] for hit in response['hits']['hits']]
2. 基于协同过滤的相似用户推荐
def find_similar_users(user_id, size=5):# 获取目标用户观看历史target_user = es.get(index="user_profiles", id=user_id)['_source']target_videos = {h['video_id'] for h in target_user.get('watch_history', [])}# 查找观看过相同视频的用户query = {"bool": {"must": [{"nested": {"path": "watch_history","query": {"terms": {"watch_history.video_id": list(target_videos)[:100] # 限制数量防止查询过大}}}},{"range": {"watch_history.watch_time": {"gte": "now-30d/d"}}}],"must_not": [{"term": {"user_id": user_id}}]}}# 使用脚本评分计算相似度script = {"script_score": {"script": {"source": """double score = 0;for (def item : params.target_videos) {for (def wh : doc['watch_history']) {if (wh.video_id == item) {score += 1;break;}}}return score;""","params": {"target_videos": list(target_videos)}}}}response = es.search(index="user_profiles",body={"query": {"function_score": {"query": query,"functions": [script],"score_mode": "sum"}},"size": size})return [hit['_source']['user_id'] for hit in response['hits']['hits']]def get_collaborative_recommendations(user_id, size=10):similar_users = find_similar_users(user_id)# 获取相似用户观看但目标用户未观看的视频query = {"bool": {"must": [{"terms": {"creator_id": similar_users}},{"nested": {"path": "watch_history","query": {"terms": {"watch_history.user_id": similar_users}}}}],"must_not": [{"term": {"watch_history.user_id": user_id}}]}}# 根据观看次数和互动率排序response = es.search(index="videos",body={"query": query,"sort": [{"stats.likes": {"order": "desc"}},{"stats.watch_time_avg": {"order": "desc"}}],"size": size})return [hit['_source'] for hit in response['hits']['hits']]
3. 基于向量相似度的深度推荐
import numpy as np
from sentence_transformers import SentenceTransformer# 初始化模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')def get_video_embeddings(video_ids):# 从ES获取视频文本内容response = es.mget(index="videos",body={"ids": video_ids})videos = [doc['_source'] for doc in response['docs'] if doc['found']]texts = [f"{v['title']} {v['description']} {' '.join(v.get('tags', []))}"for v in videos]# 生成嵌入向量embeddings = model.encode(texts, convert_to_tensor=False)# 更新ES中的视频嵌入for i, vid in enumerate(video_ids):es.update(index="videos",id=vid,body={"doc": {"embedding": embeddings[i].tolist()}})return dict(zip(video_ids, embeddings))def get_semantic_recommendations(user_id, size=10):# 获取用户最近观看的视频user = es.get(index="user_profiles", id=user_id)['_source']recent_watched = [h for h in sorted(user.get('watch_history', []),key=lambda x: x.get('watch_time', 0),reverse=True)[:5]]if not recent_watched:return []# 获取这些视频的嵌入向量video_ids = [h['video_id'] for h in recent_watched]video_embeddings = get_video_embeddings(video_ids)# 计算平均用户兴趣向量user_vector = np.mean([video_embeddings[vid] for vid in video_ids], axis=0)# 在ES中搜索相似视频script_query = {"script_score": {"query": {"match_all": {}},"script": {"source": """double similarity = cosineSimilarity(params.user_vector, 'embedding');return similarity;""","params": {"user_vector": user_vector.tolist()}}}}response = es.search(index="videos",body={"query": script_query,"size": size,"_source": ["video_id", "title", "description", "tags"]})return [hit['_source'] for hit in response['hits']['hits']]
四、混合推荐策略
def hybrid_recommendation(user_id, size=20):# 获取各种推荐结果content_based = get_content_based_recommendations(user_id, size//4)collaborative = get_collaborative_recommendations(user_id, size//4)semantic = get_semantic_recommendations(user_id, size//4)# 获取热门推荐作为补充popular = get_popular_videos(size//4)# 合并结果并去重all_recs = {}for rec_list in [content_based, collaborative, semantic, popular]:for rec in rec_list:vid = rec['video_id']if vid not in all_recs:all_recs[vid] = rec# 个性化排序ranked = personalize_ranking(user_id, list(all_recs.values()))return ranked[:size]def personalize_ranking(user_id, recommendations):user = es.get(index="user_profiles", id=user_id)['_source']# 为每个推荐项计算个性化分数for rec in recommendations:score = 0# 内容匹配分数content_score = 0if 'interests' in user and 'tags' in rec:common_tags = set(user['interests']) & set(rec['tags'])content_score = len(common_tags) * 0.2# 创作者关注分数creator_score = 1 if rec['creator_id'] in user.get('following', []) else 0# 热度分数popularity_score = min(rec['stats']['likes'] / 1000, 5)# 时间衰减recency = (datetime.now() - datetime.fromisoformat(rec['created_at'])).daysrecency_score = max(0, 1 - recency / 30)# 综合分数rec['personal_score'] = (0.4 * content_score +0.3 * creator_score +0.2 * popularity_score +0.1 * recency_score)# 按分数排序return sorted(recommendations, key=lambda x: x['personal_score'], reverse=True)def get_popular_videos(size=5, time_range="7d"):response = es.search(index="videos",body={"query": {"range": {"created_at": {"gte": f"now-{time_range}/d"}}},"sort": [{"stats.likes": {"order": "desc"}},{"stats.views": {"order": "desc"}}],"size": size})return [hit['_source'] for hit in response['hits']['hits']]
五、实时反馈与模型更新
def log_user_interaction(user_id, video_id, interaction_type):# 记录用户交互timestamp = datetime.utcnow().isoformat()script = """if (ctx._source.watch_history == null) {ctx._source.watch_history = [];}boolean found = false;for (int i = 0; i < ctx._source.watch_history.size(); i++) {if (ctx._source.watch_history[i].video_id == params.video_id) {ctx._source.watch_history[i].last_watched = params.timestamp;if (params.interaction_type == 'watch') {ctx._source.watch_history[i].watch_count += 1;} else {if (ctx._source.watch_history[i].interactions == null) {ctx._source.watch_history[i].interactions = [];}ctx._source.watch_history[i].interactions.add({'type': params.interaction_type,'timestamp': params.timestamp});}found = true;break;}}if (!found && params.interaction_type == 'watch') {ctx._source.watch_history.add({'video_id': params.video_id,'first_watched': params.timestamp,'last_watched': params.timestamp,'watch_count': 1,'interactions': []});}"""es.update(index="user_profiles",id=user_id,body={"script": {"source": script,"lang": "painless","params": {"video_id": video_id,"interaction_type": interaction_type,"timestamp": timestamp}}})# 更新视频统计if interaction_type in ['like', 'share', 'comment']:es.update(index="videos",id=video_id,body={"script": {"source": f"ctx._source.stats.{interaction_type}s += 1","lang": "painless"}})
六、性能优化与扩展
-
索引优化:
- 为常用查询字段设置合适的mapping类型
- 使用index sorting预排序
- 合理设置分片数和副本数
-
查询优化:
- 使用filter context缓存常用过滤条件
- 合理使用bool查询的must/should/filter组合
- 限制返回字段数量
-
缓存策略:
- 使用Redis缓存热门推荐结果
- 实现用户推荐结果的短期缓存
- 对向量相似度计算实现近似最近邻(ANN)搜索
-
扩展性考虑:
- 实现AB测试框架评估不同推荐策略
- 设计插件式架构便于添加新的推荐算法
- 考虑使用Elasticsearch的机器学习功能进行异常检测
七、总结
本文详细介绍了基于Elasticsearch构建短视频平台个性化推荐系统的完整方案。通过结合内容推荐、协同过滤和语义向量相似度等多种技术,我们能够为用户提供精准的个性化内容推荐。Elasticsearch的强大搜索和分析能力使其成为构建推荐系统的理想选择。
实际应用中,还需要考虑以下方面:
- 冷启动问题的解决方案
- 推荐多样性与惊喜度的平衡
- 实时推荐与批量推荐的结合
- 推荐结果的解释性