编程 Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

2024-11-18 17:43:54 +0800 CST views 995

Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

1. 背景

加入自动驾驶事业部数仓团队后,工作围绕数据和检索展开,熟练掌握 Elasticsearch 的基本操作成为提高工作效率的关键。在项目中使用官方提供的包 github.com/elastic/go-elasticsearch,但其底层设计复杂,开发成本较高,因此使用了三方包 github.com/olivere/elastic(版本 7.0.32),以实现更简便的开发。


2. Elasticsearch基础操作

2.1 创建 Elasticsearch 客户端

定义一个 ES 实例结构:

type EsInstance struct {
  Client *elastic.Client    // es客户端
  Index  map[string]Indexes // 所有索引
}
 
type Indexes struct {
  Name    string `json:"name"`    // 索引名称
  Mapping string `json:"mapping"` // 索引结构
}

创建实例:

func NewEsInstance() (*EsInstance, error) {
  client, err := elastic.NewClient(
    elastic.SetURL("http://127.0.0.1:9200"), 
    elastic.SetBasicAuth("user_name", "user_password"),
    elastic.SetSniff(false), // 跳过ip检查
  )
  if err != nil {
    return &EsInstance{}, err
  }
  return &EsInstance{
    Client: client,
    Index:  map[string]Indexes{},
  }, nil
}

2.2 创建、删除索引

创建索引:

func (es *EsInstance) CreateIndex(index string, mapping string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if exists {
    return nil
  }
  createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  if err != nil {
    return err
  }
  if !createIndex.Acknowledged {
    return errors.New("create index failed")
  }
  es.Index[index] = Indexes{Name: index, Mapping: mapping}
  return nil
}

删除索引:

func (es *EsInstance) DeleteIndex(index string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !exists {
    return nil
  }
  deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !deleteIndex.Acknowledged {
    return errors.New("delete index failed")
  }
  return nil
}

2.3 插入文档

定义文档结构:

type Record struct {
  Index  string `json:"_index"`
  Type   string `json:"_type"`
  Id     string `json:"_id"`
  Source Source `json:"source"`
}

type Source struct {
  Id   string `json:"id"`
  Name string `json:"name"`
  Age  int    `json:"age"`
  Sex  string `json:"sex"`
}

插入文档:

func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {
  _, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())
  return err
}

2.4 查询文档

根据文档 id 获取文档:

func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {
  record := &Record{}
  result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())
  if err != nil {
    return record, err
  }
  if result.Found {
    err := json.Unmarshal(result.Source, &record.Source)
  }
  record.Id = result.Id
  record.Index = result.Index
  record.Type = result.Type
  return record, nil
}

2.5 删除文档

根据文档 id 删除文档:

func (es *EsInstance) DeleteOneRecord(indexName, id string) error {
  _, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())
  return err
}

2.6 更新文档

根据文档 id 更新文档:

func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {
  _, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())
  return err
}

2.7 逻辑查询

自定义 DSL 查询:

func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())
  if err != nil {
    return records, err
  }
  if searchResult.Hits.TotalHits.Value > 0 {
    for _, hit := range searchResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  return records, nil
}

2.8 滚动查询

滚动查询数据:

func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())
  scrollId := searchResult.ScrollId

  for {
    scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())
    if err != nil || scrollResult.Hits.TotalHits.Value == 0 {
      break
    }
    scrollId = scrollResult.ScrollId
    for _, hit := range scrollResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  es.Client.ClearScroll(scrollId).Do(context.Background())
  return records, err
}

2.9 批量插入

批量插入数据:

func (es *EsInstance) BatchInsertRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.10 批量更新

批量更新数据:

func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.11 批量删除

批量删除数据:

func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkDeleteRequest().Id(record.Id)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

3. 检索条件设置

示例:

boolQuery := elastic.NewBoolQuery()
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))

4. 测试

检索某辆车、某个时间段的数据:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))

SearchScroll("index_2024", 4000, boolQuery)

5. 总结

本文介绍了 Elasticsearch 的基础操作,包括创建、删除、更新、查询、批量操作等。使用三方包 github.com/olivere/elastic 能有效降低开发成本,提升工作效率。

复制全文 生成海报 Elasticsearch Golang 数据处理 开发工具

推荐文章

如何在Vue3中处理全局状态管理?
2024-11-18 19:25:59 +0800 CST
Go语言中实现RSA加密与解密
2024-11-18 01:49:30 +0800 CST
CSS 特效与资源推荐
2024-11-19 00:43:31 +0800 CST
JavaScript 的模板字符串
2024-11-18 22:44:09 +0800 CST
纯CSS实现3D云动画效果
2024-11-18 18:48:05 +0800 CST
Node.js中接入微信支付
2024-11-19 06:28:31 +0800 CST
服务器购买推荐
2024-11-18 23:48:02 +0800 CST
淘宝npm镜像使用方法
2024-11-18 23:50:48 +0800 CST
mysql 计算附近的人
2024-11-18 13:51:11 +0800 CST
阿里云免sdk发送短信代码
2025-01-01 12:22:14 +0800 CST
Nginx负载均衡详解
2024-11-17 07:43:48 +0800 CST
PHP来做一个短网址(短链接)服务
2024-11-17 22:18:37 +0800 CST
使用 node-ssh 实现自动化部署
2024-11-18 20:06:21 +0800 CST
在 Rust 生产项目中存储数据
2024-11-19 02:35:11 +0800 CST
mendeley2 一个Python管理文献的库
2024-11-19 02:56:20 +0800 CST
总结出30个代码前端代码规范
2024-11-19 07:59:43 +0800 CST
前端如何一次性渲染十万条数据?
2024-11-19 05:08:27 +0800 CST
JavaScript 异步编程入门
2024-11-19 07:07:43 +0800 CST
程序员茄子在线接单