需求EAV模型的优化与思考
针对EAV(Entity-Attribute-Value)存储模型及Elasticsearch集成需求,以下是详细方案设计和实现建议
一、存储优化建议
分库分表策略
垂直分库:将核心业务表(Entity/Attr/Value)与非核心表分离,减轻主库压力
水平分表:
-- 按项目ID分片(示例) CREATE TABLE requirement_attr_${project_id % 16} (entity_id NUMBER(20),attr_id NUMBER(10),value CLOB,PRIMARY KEY (entity_id, attr_id) );
分区表:对Value表按时间分区(适合基线发布场景)
CREATE TABLE requirement_value PARTITION BY RANGE (publish_time) (PARTITION p202401 VALUES LESS THAN (TO_DATE('2024-02-01')),PARTITION p202402 VALUES LESS THAN (TO_DATE('2024-03-01')) );
L1缓存:Redis Cluster(存储热点属性和值映射)
数据结构:Hash
HSET attr:${attr_id} value_mapping ${value}->${entity_id}
L2缓存:Caffeine本地缓存(缓存频繁访问的Entity元数据)
缓存更新:基于Oracle的物化视图日志触发更新二、Elasticsearch方案设计
二、Elasticsearch方案设计
索引结构设计
PUT /requirement_baseline {"mappings": {"properties": {"entity_id": {"type": "keyword"},"project_id": {"type": "keyword"},"attrs": {"type": "nested","properties": {"attr_id": {"type": "keyword"},"attr_name": {"type": "text", "analyzer": "ik_max_word"},"value": {"type": "text","fields": {"keyword": {"type": "keyword"},"numeric": {"type": "scaled_float", "scaling_factor": 100}}}}},"publish_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}}},"settings": {"number_of_shards": 6,"number_of_replicas": 1} }
数据同步流程
三、Java实现代码(Spring Cloud Alibaba)
POM依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.6</version>
</dependency>
实体类定义
@Data
@Document(indexName = "requirement_baseline")
public class RequirementDoc {@Idprivate String id; // ES文档ID@Field(type = FieldType.Keyword)private Long entityId;@Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd HH:mm:ss")private Date publishTime;@Field(type = FieldType.Nested)private List<AttrValue> attrs = new ArrayList<>();@Data@AllArgsConstructorpublic static class AttrValue {@Field(type = FieldType.Keyword)private String attrId;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String attrName;@MultiField(mainField = @Field(type = FieldType.Text),otherFields = {@InnerField(suffix = "keyword", type = FieldType.Keyword),@InnerField(suffix = "number", type = FieldType.Double)})private Object value;}
}
ES动态查询服务
@Service
public class RequirementSearchService {@Autowiredprivate ElasticsearchRestTemplate elasticsearchTemplate;public SearchHits<RequirementDoc> searchByAttributes(Map<String, Object> attrQueries) {NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();// 动态构建Nested查询attrQueries.forEach((attrId, value) -> {queryBuilder.withQuery(QueryBuilders.nestedQuery("attrs", QueryBuilders.boolQuery().must(QueryBuilders.termQuery("attrs.attr_id", attrId)).must(buildValueQuery(value)), ScoreMode.Avg));});return elasticsearchTemplate.search(queryBuilder.build(), RequirementDoc.class);}private Query buildValueQuery(Object value) {if (value instanceof Number) {return QueryBuilders.termQuery("attrs.value.number", ((Number) value).doubleValue());} else if (value instanceof String) {String strVal = (String) value;// 数字字符串特殊处理if (strVal.matches("\\d+(\\.\\d+)?")) {return QueryBuilders.termQuery("attrs.value.number", Double.parseDouble(strVal));}return QueryBuilders.matchQuery("attrs.value", strVal);}throw new IllegalArgumentException("Unsupported value type");}
}
Canal数据同步服务
@Component public class CanalSyncService {@Autowiredprivate ElasticsearchRestTemplate elasticsearchTemplate;@KafkaListener(topics = "oracle.requirement.change")public void syncData(ChangeRecord record) {switch (record.getEventType()) {case INSERT:case UPDATE:RequirementDoc doc = buildDoc(record.getAfter());elasticsearchTemplate.save(doc);break;case DELETE:elasticsearchTemplate.delete(record.getEntityId(), RequirementDoc.class);break;}}private RequirementDoc buildDoc(Map<String, Object> data) {RequirementDoc doc = new RequirementDoc();doc.setEntityId(Long.parseLong(data.get("ENTITY_ID").toString()));// 动态加载属性List<AttrValue> attrs = attrService.loadAttributes(doc.getEntityId());doc.setAttrs(attrs);return doc;} }
多级缓存实现
@Service @CacheConfig(cacheNames = "requirementCache") public class AttributeCacheService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Cacheable(key = "'attr:' + #attrId", cacheManager = "caffeineCacheManager")public Map<Object, Set<Long>> getValueMapping(String attrId) {// 二级缓存查询Map<Object, Set<Long>> mapping = (Map<Object, Set<Long>>) redisTemplate.opsForValue().get("attr:" + attrId);if (mapping == null) {mapping = loadFromDatabase(attrId);redisTemplate.opsForValue().set("attr:" + attrId, mapping, 1, TimeUnit.HOURS);}return mapping;}@CacheEvict(key = "'attr:' + #attrId")public void refreshCache(String attrId) {redisTemplate.delete("attr:" + attrId);} }
性能优化建议
ES写入优化# application.yml
spring:data:elasticsearch:bulk-flush-interval: 500concurrent-requests: 16max-batch-size: 1000查询加速策略使用预加载字段:对高频访问属性设置eager_global_ordinals冷热分离:为历史基线创建独立索引路由优化:按project_id分片路由
@Document(indexName="requirement", routing="project_id")混合查询方案
// 结合缓存和ES的复合查询
public List<Long> hybridSearch(String attrId, Object value) {// 1. 先查缓存获取精确匹配Set<Long> cachedIds = attributeCacheService.getValueMapping(attrId).get(value);// 2. ES扩展查询(包含同义词等)SearchHits<RequirementDoc> hits = searchService.fuzzySearch(attrId, value);// 3. 结果合并去重return Stream.concat(cachedIds.stream(),hits.stream().map(h -> h.getContent().getEntityId())).distinct().collect(Collectors.toList());
}
五、监控与保障
监控指标
ES:
search_latency_99th_percentile
Oracle:
AWR报告中的逻辑读/物理读
Redis:
Keyspace命中率
降级方案
@Fallback(fallbackMethod = "searchFallback")
@HystrixCommand(commandKey = "esSearch")
public SearchResult search(QueryParam param) {// 正常查询逻辑
}private SearchResult searchFallback(QueryParam param) {// 1. 尝试从Redis获取缓存结果// 2. 降级到Oracle基于物化视图查询
}
该方案特点:
动态Schema处理:通过Nested类型支持动态属性
混合存储优势:Oracle保证ACID,ES提供搜索能力
三级缓存体系:Caffeine+Redis+Oracle物化视图
实时同步保障:基于CDC的准实时数据管道
类型智能识别:自动处理数值型属性的范围查询