gin + es 实践 03

时间:2025-05-09 08:32:27

Elasticsearch 集成详解

Go-ES 项目的一个核心特性是集成了 Elasticsearch 实现高效的全文搜索功能。本文将详细介绍项目中 Elasticsearch 的集成方式和关键实现点。

Elasticsearch 配置

配置文件设置

项目中 Elasticsearch 的配置在 config.yaml 中定义:

elasticsearch:
  addresses:         # ES地址,可以是多个
    - http://localhost:9200
  username: ""       # ES用户名,如果有的话
  password: ""       # ES密码,如果有的话
  api_key: ""        # ES API密钥,如果有的话
  index_prefix: go_es_  # 索引前缀

这种配置方式支持:

  • 连接到单节点或多节点 Elasticsearch 集群
  • 使用用户名密码或 API Key 认证
  • 通过索引前缀隔离不同应用的索引

Elasticsearch 客户端封装

项目在 infrastructure/elasticsearch/es_client.go 中封装了 Elasticsearch 客户端:

// EsClient Elasticsearch客户端封装
type EsClient struct {
    client      *elasticsearch.Client
    indexPrefix string
}

// NewEsClient 创建新的ES客户端
func NewEsClient(cfg *config.Config) (*EsClient, error) {
    esCfg := elasticsearch.Config{
        Addresses: cfg.Elasticsearch.Addresses,
    }

    // 如果设置了API Key,优先使用API Key认证
    if cfg.Elasticsearch.APIKey != "" {
        esCfg.APIKey = cfg.Elasticsearch.APIKey
    } else if cfg.Elasticsearch.Username != "" && cfg.Elasticsearch.Password != "" {
        // 否则使用用户名密码认证
        esCfg.Username = cfg.Elasticsearch.Username
        esCfg.Password = cfg.Elasticsearch.Password
    }

    client, err := elasticsearch.NewClient(esCfg)
    if err != nil {
        return nil, err
    }

    // 测试连接
    _, err = client.Info()
    if err != nil {
        return nil, err
    }

    return &EsClient{
        client:      client,
        indexPrefix: cfg.Elasticsearch.IndexPrefix,
    }, nil
}

索引设计

产品索引映射

产品索引映射在 infrastructure/elasticsearch/product_index.go 中定义:

// ProductIndex 产品索引常量
const (
    ProductIndexName    = "products"
    ProductIndexMapping = `
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "analysis": {
      "analyzer": {
        "text_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase", "asciifolding"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "name": { 
        "type": "text",
        "analyzer": "text_analyzer",
        "fields": {
          "keyword": { "type": "keyword" }
        }
      },
      "description": { 
        "type": "text",
        "analyzer": "text_analyzer"
      },
      "price": { "type": "double" },
      "category": { 
        "type": "keyword",
        "fields": {
          "text": { "type": "text" }
        }
      },
      "tags": { 
        "type": "keyword",
        "fields": {
          "text": { "type": "text" }
        }
      },
      "created_at": { "type": "date" },
      "updated_at": { "type": "date" }
    }
  }
}`
)

此索引映射包含以下设计要点:

  1. 自定义分析器:使用 text_analyzer 自定义分析器进行文本分词
  2. 多字段类型:为某些字段同时设置 textkeyword 类型,以支持全文搜索和精确匹配
  3. 分片设置:使用单分片设计简化小型应用的部署

索引管理

ProductIndexer 提供了索引管理功能:

// Setup 设置产品索引
func (i *ProductIndexer) Setup(ctx context.Context) error {
    return i.esClient.CreateIndex(ctx, ProductIndexName, ProductIndexMapping)
}

该方法在应用启动时自动创建索引(如果不存在)。

文档索引与删除

文档索引

产品文档的索引操作在 infrastructure/elasticsearch/product_index.go 中实现:

// IndexProduct 索引产品
func (i *ProductIndexer) IndexProduct(ctx context.Context, product *entity.Product) error {
    return i.esClient.IndexDocument(ctx, ProductIndexName, product.ID, product.ToMap())
}

这里利用了产品实体的 ToMap() 方法将领域实体转换为适合 Elasticsearch 存储的文档。

文档删除

// DeleteProduct 从索引中删除产品
func (i *ProductIndexer) DeleteProduct(ctx context.Context, id string) error {
    return i.esClient.DeleteDocument(ctx, ProductIndexName, id)
}

搜索实现

搜索查询构建

产品搜索在 infrastructure/elasticsearch/product_index.go 中实现:

// SearchProducts 搜索产品
func (i *ProductIndexer) SearchProducts(ctx context.Context, keyword, category string, page, pageSize int) ([]map[string]interface{}, int64, error) {
    from := (page - 1) * pageSize

    // 构建查询
    query := map[string]interface{}{
        "from": from,
        "size": pageSize,
        "sort": []map[string]interface{}{
            {"_score": map[string]interface{}{"order": "desc"}},
            {"created_at": map[string]interface{}{"order": "desc"}},
        },
    }

    // 构建搜索条件
    if keyword != "" || category != "" {
        boolQuery := map[string]interface{}{}
        var musts []interface{}

        if keyword != "" {
            musts = append(musts, map[string]interface{}{
                "multi_match": map[string]interface{}{
                    "query":  keyword,
                    "fields": []string{"name^3", "description", "category", "tags"},
                    "type":   "best_fields",
                },
            })
        }

        if category != "" {
            musts = append(musts, map[string]interface{}{
                "term": map[string]interface{}{
                    "category": category,
                },
            })
        }

        if len(musts) > 0 {
            boolQuery["must"] = musts
            query["query"] = map[string]interface{}{
                "bool": boolQuery,
            }
        }
    } else {
        // 如果没有关键词和类别,则查询所有
        query["query"] = map[string]interface{}{
            "match_all": map[string]interface{}{},
        }
    }

    // 将查询转换为JSON
    jsonQuery, err := json.Marshal(query)
    if err != nil {
        return nil, 0, err
    }

    log.Printf("ES查询: %s", string(jsonQuery))
    return i.esClient.Search(ctx, ProductIndexName, string(jsonQuery))
}

搜索查询的主要特点包括:

  1. 多字段搜索:使用 multi_match 查询同时搜索多个字段
  2. 字段权重:为不同字段设置不同权重,例如 name^3 表示名称字段权重更高
  3. 分类过滤:支持按类别精确过滤
  4. 结果排序:按相关性分数(_score)和创建时间排序
  5. 分页支持:通过 from 和 size 参数实现分页

搜索执行

EsClientSearch 方法执行搜索请求并处理响应:

// Search 搜索文档
func (c *EsClient) Search(ctx context.Context, indexName, query string) ([]map[string]interface{}, int64, error) {
    fullIndexName := c.getFullIndexName(indexName)

    req := esapi.SearchRequest{
        Index: []string{fullIndexName},
        Body:  strings.NewReader(query),
    }

    res, err := req.Do(ctx, c.client)
    if err != nil {
        return nil, 0, err
    }
    defer res.Body.Close()

    if res.IsError() {
        return nil, 0, errors.New("搜索失败: " + res.String())
    }

    var response map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
        return nil, 0, err
    }

    // 提取命中的文档和总数
    hits, ok := response["hits"].(map[string]interface{})
    if !ok {
        return nil, 0, errors.New("解析搜索结果失败")
    }

    total, _ := hits["total"].(map[string]interface{})
    totalValue := int64(0)
    if value, ok := total["value"].(float64); ok {
        totalValue = int64(value)
    }

    var documents []map[string]interface{}
    if hitsArray, ok := hits["hits"].([]interface{}); ok {
        for _, hit := range hitsArray {
            hitMap, ok := hit.(map[string]interface{})
            if !ok {
                continue
            }

            source, ok := hitMap["_source"].(map[string]interface{})
            if !ok {
                continue
            }

            documents = append(documents, source)
        }
    }

    return documents, totalValue, nil
}

索引重建

为了支持数据同步和恢复,项目提供了索引重建功能:

// ReindexAll 重建所有产品索引
func (r *ProductRepositoryImpl) ReindexAll(ctx context.Context) error {
    // 分批处理,避免一次加载过多数据
    batchSize := 100
    page := 1

    for {
        var products []*entity.Product
        if err := r.db.GetDB().Offset((page - 1) * batchSize).Limit(batchSize).Find(&products).Error; err != nil {
            return fmt.Errorf("批量加载产品失败: %w", err)
        }

        if len(products) == 0 {
            break
        }

        // 索引这一批产品
        for _, product := range products {
            if err := r.productIndexer.IndexProduct(ctx, product); err != nil {
                log.Printf("索引产品 %s 失败: %v", product.ID, err)
            }
        }

        log.Printf("已重建 %d 个产品的索引", page*batchSize)
        page++

        // 如果这一批数量少于批次大小,说明已经处理完所有数据
        if len(products) < batchSize {
            break
        }
    }

    log.Println("所有产品索引重建完成")
    return nil
}

重建索引的特点:

  1. 批量处理:分批加载数据,避免内存占用过多
  2. 进度日志:记录重建进度,便于监控
  3. 容错处理:单个产品索引失败不会影响整体流程

可靠性与性能优化

为确保 Elasticsearch 集成的可靠性和性能,项目采取了以下措施:

1. 连接检测

在客户端初始化时验证 Elasticsearch 连接:

// 测试连接
_, err = client.Info()
if err != nil {
    return nil, err
}

2. 错误处理

所有 Elasticsearch 操作都有完善的错误处理,包括:

  • 文档不存在时的处理
  • 索引创建失败时的回滚
  • 查询解析错误的处理

3. 查询日志

输出查询语句,便于调试和优化:

log.Printf("ES查询: %s", string(jsonQuery))

4. 索引前缀

使用索引前缀隔离不同应用的索引:

// 获取完整索引名称
func (c *EsClient) getFullIndexName(indexName string) string {
    return c.indexPrefix + indexName
}

与领域层的整合

Elasticsearch 功能通过仓储接口与领域层整合:

// ProductRepository 产品仓储接口
type ProductRepository interface {
    // ...其他方法...

    // Elasticsearch 相关操作
    IndexProduct(ctx context.Context, product *entity.Product) error
    DeleteFromIndex(ctx context.Context, id string) error
    ReindexAll(ctx context.Context) error
}

仓储实现确保数据库和 Elasticsearch 的一致性:

// Create 创建产品
func (r *ProductRepositoryImpl) Create(ctx context.Context, product *entity.Product) error {
    // 保存到数据库
    if err := r.db.GetDB().Create(product).Error; err != nil {
        return err
    }
    // 同步到 Elasticsearch
    return r.productIndexer.IndexProduct(ctx, product)
}

这种设计使得领域层不需要关心 Elasticsearch 的具体实现,同时保证了数据的一致性。