Elasticsearch 集成详解
Go-ES 项目的一个核心特性是集成了 Elasticsearch 实现高效的全文搜索功能。本文将详细介绍项目中 Elasticsearch 的集成方式和关键实现点。
Elasticsearch 配置
配置文件设置
项目中 Elasticsearch 的配置在 config.yaml
中定义:
elasticsearch:
addresses: # ES地址,可以是多个
- http://localhost:9200
username: "" # ES用户名,如果有的话
password: "" # ES密码,如果有的话
api_key: "" # ES API密钥,如果有的话
index_prefix: go_es_ # 索引前缀
这种配置方式支持:
- 连接到单节点或多节点 Elasticsearch 集群
- 使用用户名密码或 API Key 认证
- 通过索引前缀隔离不同应用的索引
Elasticsearch 客户端封装
项目在 infrastructure/elasticsearch/es_client.go
中封装了 Elasticsearch 客户端:
// EsClient Elasticsearch客户端封装
type EsClient struct {
client *elasticsearch.Client
indexPrefix string
}
// NewEsClient 创建新的ES客户端
func NewEsClient(cfg *config.Config) (*EsClient, error) {
esCfg := elasticsearch.Config{
Addresses: cfg.Elasticsearch.Addresses,
}
// 如果设置了API Key,优先使用API Key认证
if cfg.Elasticsearch.APIKey != "" {
esCfg.APIKey = cfg.Elasticsearch.APIKey
} else if cfg.Elasticsearch.Username != "" && cfg.Elasticsearch.Password != "" {
// 否则使用用户名密码认证
esCfg.Username = cfg.Elasticsearch.Username
esCfg.Password = cfg.Elasticsearch.Password
}
client, err := elasticsearch.NewClient(esCfg)
if err != nil {
return nil, err
}
// 测试连接
_, err = client.Info()
if err != nil {
return nil, err
}
return &EsClient{
client: client,
indexPrefix: cfg.Elasticsearch.IndexPrefix,
}, nil
}
索引设计
产品索引映射
产品索引映射在 infrastructure/elasticsearch/product_index.go
中定义:
// ProductIndex 产品索引常量
const (
ProductIndexName = "products"
ProductIndexMapping = `
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"text_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
}
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"name": {
"type": "text",
"analyzer": "text_analyzer",
"fields": {
"keyword": { "type": "keyword" }
}
},
"description": {
"type": "text",
"analyzer": "text_analyzer"
},
"price": { "type": "double" },
"category": {
"type": "keyword",
"fields": {
"text": { "type": "text" }
}
},
"tags": {
"type": "keyword",
"fields": {
"text": { "type": "text" }
}
},
"created_at": { "type": "date" },
"updated_at": { "type": "date" }
}
}
}`
)
此索引映射包含以下设计要点:
-
自定义分析器:使用
text_analyzer
自定义分析器进行文本分词 -
多字段类型:为某些字段同时设置
text
和keyword
类型,以支持全文搜索和精确匹配 - 分片设置:使用单分片设计简化小型应用的部署
索引管理
ProductIndexer
提供了索引管理功能:
// Setup 设置产品索引
func (i *ProductIndexer) Setup(ctx context.Context) error {
return i.esClient.CreateIndex(ctx, ProductIndexName, ProductIndexMapping)
}
该方法在应用启动时自动创建索引(如果不存在)。
文档索引与删除
文档索引
产品文档的索引操作在 infrastructure/elasticsearch/product_index.go
中实现:
// IndexProduct 索引产品
func (i *ProductIndexer) IndexProduct(ctx context.Context, product *entity.Product) error {
return i.esClient.IndexDocument(ctx, ProductIndexName, product.ID, product.ToMap())
}
这里利用了产品实体的 ToMap()
方法将领域实体转换为适合 Elasticsearch 存储的文档。
文档删除
// DeleteProduct 从索引中删除产品
func (i *ProductIndexer) DeleteProduct(ctx context.Context, id string) error {
return i.esClient.DeleteDocument(ctx, ProductIndexName, id)
}
搜索实现
搜索查询构建
产品搜索在 infrastructure/elasticsearch/product_index.go
中实现:
// SearchProducts 搜索产品
func (i *ProductIndexer) SearchProducts(ctx context.Context, keyword, category string, page, pageSize int) ([]map[string]interface{}, int64, error) {
from := (page - 1) * pageSize
// 构建查询
query := map[string]interface{}{
"from": from,
"size": pageSize,
"sort": []map[string]interface{}{
{"_score": map[string]interface{}{"order": "desc"}},
{"created_at": map[string]interface{}{"order": "desc"}},
},
}
// 构建搜索条件
if keyword != "" || category != "" {
boolQuery := map[string]interface{}{}
var musts []interface{}
if keyword != "" {
musts = append(musts, map[string]interface{}{
"multi_match": map[string]interface{}{
"query": keyword,
"fields": []string{"name^3", "description", "category", "tags"},
"type": "best_fields",
},
})
}
if category != "" {
musts = append(musts, map[string]interface{}{
"term": map[string]interface{}{
"category": category,
},
})
}
if len(musts) > 0 {
boolQuery["must"] = musts
query["query"] = map[string]interface{}{
"bool": boolQuery,
}
}
} else {
// 如果没有关键词和类别,则查询所有
query["query"] = map[string]interface{}{
"match_all": map[string]interface{}{},
}
}
// 将查询转换为JSON
jsonQuery, err := json.Marshal(query)
if err != nil {
return nil, 0, err
}
log.Printf("ES查询: %s", string(jsonQuery))
return i.esClient.Search(ctx, ProductIndexName, string(jsonQuery))
}
搜索查询的主要特点包括:
-
多字段搜索:使用
multi_match
查询同时搜索多个字段 -
字段权重:为不同字段设置不同权重,例如
name^3
表示名称字段权重更高 - 分类过滤:支持按类别精确过滤
- 结果排序:按相关性分数(_score)和创建时间排序
- 分页支持:通过 from 和 size 参数实现分页
搜索执行
EsClient
的 Search
方法执行搜索请求并处理响应:
// Search 搜索文档
func (c *EsClient) Search(ctx context.Context, indexName, query string) ([]map[string]interface{}, int64, error) {
fullIndexName := c.getFullIndexName(indexName)
req := esapi.SearchRequest{
Index: []string{fullIndexName},
Body: strings.NewReader(query),
}
res, err := req.Do(ctx, c.client)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
if res.IsError() {
return nil, 0, errors.New("搜索失败: " + res.String())
}
var response map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return nil, 0, err
}
// 提取命中的文档和总数
hits, ok := response["hits"].(map[string]interface{})
if !ok {
return nil, 0, errors.New("解析搜索结果失败")
}
total, _ := hits["total"].(map[string]interface{})
totalValue := int64(0)
if value, ok := total["value"].(float64); ok {
totalValue = int64(value)
}
var documents []map[string]interface{}
if hitsArray, ok := hits["hits"].([]interface{}); ok {
for _, hit := range hitsArray {
hitMap, ok := hit.(map[string]interface{})
if !ok {
continue
}
source, ok := hitMap["_source"].(map[string]interface{})
if !ok {
continue
}
documents = append(documents, source)
}
}
return documents, totalValue, nil
}
索引重建
为了支持数据同步和恢复,项目提供了索引重建功能:
// ReindexAll 重建所有产品索引
func (r *ProductRepositoryImpl) ReindexAll(ctx context.Context) error {
// 分批处理,避免一次加载过多数据
batchSize := 100
page := 1
for {
var products []*entity.Product
if err := r.db.GetDB().Offset((page - 1) * batchSize).Limit(batchSize).Find(&products).Error; err != nil {
return fmt.Errorf("批量加载产品失败: %w", err)
}
if len(products) == 0 {
break
}
// 索引这一批产品
for _, product := range products {
if err := r.productIndexer.IndexProduct(ctx, product); err != nil {
log.Printf("索引产品 %s 失败: %v", product.ID, err)
}
}
log.Printf("已重建 %d 个产品的索引", page*batchSize)
page++
// 如果这一批数量少于批次大小,说明已经处理完所有数据
if len(products) < batchSize {
break
}
}
log.Println("所有产品索引重建完成")
return nil
}
重建索引的特点:
- 批量处理:分批加载数据,避免内存占用过多
- 进度日志:记录重建进度,便于监控
- 容错处理:单个产品索引失败不会影响整体流程
可靠性与性能优化
为确保 Elasticsearch 集成的可靠性和性能,项目采取了以下措施:
1. 连接检测
在客户端初始化时验证 Elasticsearch 连接:
// 测试连接
_, err = client.Info()
if err != nil {
return nil, err
}
2. 错误处理
所有 Elasticsearch 操作都有完善的错误处理,包括:
- 文档不存在时的处理
- 索引创建失败时的回滚
- 查询解析错误的处理
3. 查询日志
输出查询语句,便于调试和优化:
log.Printf("ES查询: %s", string(jsonQuery))
4. 索引前缀
使用索引前缀隔离不同应用的索引:
// 获取完整索引名称
func (c *EsClient) getFullIndexName(indexName string) string {
return c.indexPrefix + indexName
}
与领域层的整合
Elasticsearch 功能通过仓储接口与领域层整合:
// ProductRepository 产品仓储接口
type ProductRepository interface {
// ...其他方法...
// Elasticsearch 相关操作
IndexProduct(ctx context.Context, product *entity.Product) error
DeleteFromIndex(ctx context.Context, id string) error
ReindexAll(ctx context.Context) error
}
仓储实现确保数据库和 Elasticsearch 的一致性:
// Create 创建产品
func (r *ProductRepositoryImpl) Create(ctx context.Context, product *entity.Product) error {
// 保存到数据库
if err := r.db.GetDB().Create(product).Error; err != nil {
return err
}
// 同步到 Elasticsearch
return r.productIndexer.IndexProduct(ctx, product)
}
这种设计使得领域层不需要关心 Elasticsearch 的具体实现,同时保证了数据的一致性。