编程 Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

2024-11-18 17:43:54 +0800 CST views 1413

Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

1. 背景

加入自动驾驶事业部数仓团队后,工作围绕数据和检索展开,熟练掌握 Elasticsearch 的基本操作成为提高工作效率的关键。在项目中使用官方提供的包 github.com/elastic/go-elasticsearch,但其底层设计复杂,开发成本较高,因此使用了三方包 github.com/olivere/elastic(版本 7.0.32),以实现更简便的开发。


2. Elasticsearch基础操作

2.1 创建 Elasticsearch 客户端

定义一个 ES 实例结构:

type EsInstance struct {
  Client *elastic.Client    // es客户端
  Index  map[string]Indexes // 所有索引
}
 
type Indexes struct {
  Name    string `json:"name"`    // 索引名称
  Mapping string `json:"mapping"` // 索引结构
}

创建实例:

func NewEsInstance() (*EsInstance, error) {
  client, err := elastic.NewClient(
    elastic.SetURL("http://127.0.0.1:9200"), 
    elastic.SetBasicAuth("user_name", "user_password"),
    elastic.SetSniff(false), // 跳过ip检查
  )
  if err != nil {
    return &EsInstance{}, err
  }
  return &EsInstance{
    Client: client,
    Index:  map[string]Indexes{},
  }, nil
}

2.2 创建、删除索引

创建索引:

func (es *EsInstance) CreateIndex(index string, mapping string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if exists {
    return nil
  }
  createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  if err != nil {
    return err
  }
  if !createIndex.Acknowledged {
    return errors.New("create index failed")
  }
  es.Index[index] = Indexes{Name: index, Mapping: mapping}
  return nil
}

删除索引:

func (es *EsInstance) DeleteIndex(index string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !exists {
    return nil
  }
  deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !deleteIndex.Acknowledged {
    return errors.New("delete index failed")
  }
  return nil
}

2.3 插入文档

定义文档结构:

type Record struct {
  Index  string `json:"_index"`
  Type   string `json:"_type"`
  Id     string `json:"_id"`
  Source Source `json:"source"`
}

type Source struct {
  Id   string `json:"id"`
  Name string `json:"name"`
  Age  int    `json:"age"`
  Sex  string `json:"sex"`
}

插入文档:

func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {
  _, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())
  return err
}

2.4 查询文档

根据文档 id 获取文档:

func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {
  record := &Record{}
  result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())
  if err != nil {
    return record, err
  }
  if result.Found {
    err := json.Unmarshal(result.Source, &record.Source)
  }
  record.Id = result.Id
  record.Index = result.Index
  record.Type = result.Type
  return record, nil
}

2.5 删除文档

根据文档 id 删除文档:

func (es *EsInstance) DeleteOneRecord(indexName, id string) error {
  _, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())
  return err
}

2.6 更新文档

根据文档 id 更新文档:

func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {
  _, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())
  return err
}

2.7 逻辑查询

自定义 DSL 查询:

func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())
  if err != nil {
    return records, err
  }
  if searchResult.Hits.TotalHits.Value > 0 {
    for _, hit := range searchResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  return records, nil
}

2.8 滚动查询

滚动查询数据:

func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())
  scrollId := searchResult.ScrollId

  for {
    scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())
    if err != nil || scrollResult.Hits.TotalHits.Value == 0 {
      break
    }
    scrollId = scrollResult.ScrollId
    for _, hit := range scrollResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  es.Client.ClearScroll(scrollId).Do(context.Background())
  return records, err
}

2.9 批量插入

批量插入数据:

func (es *EsInstance) BatchInsertRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.10 批量更新

批量更新数据:

func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.11 批量删除

批量删除数据:

func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkDeleteRequest().Id(record.Id)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

3. 检索条件设置

示例:

boolQuery := elastic.NewBoolQuery()
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))

4. 测试

检索某辆车、某个时间段的数据:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))

SearchScroll("index_2024", 4000, boolQuery)

5. 总结

本文介绍了 Elasticsearch 的基础操作,包括创建、删除、更新、查询、批量操作等。使用三方包 github.com/olivere/elastic 能有效降低开发成本,提升工作效率。

复制全文 生成海报 Elasticsearch Golang 数据处理 开发工具

推荐文章

windows安装sphinx3.0.3(中文检索)
2024-11-17 05:23:31 +0800 CST
HTML5的 input:file上传类型控制
2024-11-19 07:29:28 +0800 CST
JavaScript设计模式:适配器模式
2024-11-18 17:51:43 +0800 CST
向满屏的 Import 语句说再见!
2024-11-18 12:20:51 +0800 CST
Vue3中的JSX有什么不同?
2024-11-18 16:18:49 +0800 CST
api接口怎么对接
2024-11-19 09:42:47 +0800 CST
markdowns滚动事件
2024-11-19 10:07:32 +0800 CST
Go语言中实现RSA加密与解密
2024-11-18 01:49:30 +0800 CST
Go 如何做好缓存
2024-11-18 13:33:37 +0800 CST
Plyr.js 播放器介绍
2024-11-18 12:39:35 +0800 CST
jQuery中向DOM添加元素的多种方法
2024-11-18 23:19:46 +0800 CST
mysql时间对比
2024-11-18 14:35:19 +0800 CST
Boost.Asio: 一个美轮美奂的C++库
2024-11-18 23:09:42 +0800 CST
从Go开发者的视角看Rust
2024-11-18 11:49:49 +0800 CST
利用图片实现网站的加载速度
2024-11-18 12:29:31 +0800 CST
JavaScript 上传文件的几种方式
2024-11-18 21:11:59 +0800 CST
Vue3 中提供了哪些新的指令
2024-11-19 01:48:20 +0800 CST
Rust async/await 异步运行时
2024-11-18 19:04:17 +0800 CST
程序员茄子在线接单