使用 Go 语言并发处理 CSV 文件到数据库
数据迁移是常见的任务,尤其当我们需要将数据从一个系统迁移到另一个系统时。传统的逐条处理方法在面对大量数据时效率较低,而 Go 语言提供了强大的并发机制,可以帮助我们更高效地完成数据迁移工作。本文将介绍如何利用 Go 语言的并发特性,将 CSV 文件中的联系人信息迁移到数据库中。
问题背景
假设你拥有一个包含大量联系人信息的 CSV 文件,需要将这些信息迁移到数据库中。这些联系人信息可能包含姓名、电话号码、邮箱地址等。如果使用传统的单线程方式,逐条处理数据,迁移过程可能会非常缓慢,尤其是在数据量很大的情况下。
并发解决方案
Go 语言的并发机制通过 goroutine
和 channel
提供了高效的解决方案。我们可以利用这些特性来并发处理 CSV 文件中的数据,将其迁移到数据库,提升整个迁移过程的速度。
代码示例
package main
import (
"encoding/csv"
"fmt"
"log"
"os"
"regexp"
"sync"
)
// 联系人信息结构体
type Contact struct {
Name string
Mobile string
Email string
}
// 失败记录结构体
type FailedRow struct {
Name string
Mobile string
Email string
ErrorReason string
Succeed bool
}
// 最终响应结构体
type FinalResponse struct {
Status string `json:"status"`
FailedRow []FailedRow `json:"failed_row"`
}
func main() {
// 打开 CSV 文件
file, err := os.Open("contacts.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 创建 CSV 读取器
reader := csv.NewReader(file)
// 读取所有 CSV 记录
records, err := reader.ReadAll()
if err != nil {
log.Fatal(err)
}
// 存储所有联系人的切片
var contacts []Contact
// 遍历 CSV 记录
for i, record := range records {
// 跳过第一行(表头)
if i == 0 {
continue
}
// 将记录映射到 Contact 结构体
contact := Contact{
Name: record[0],
Mobile: record[1],
Email: record[2],
}
// 将联系人添加到切片
contacts = append(contacts, contact)
}
// 设置并发工作线程数量
maxWorkers := 10
// 创建输入通道
inputCh := make(chan Contact, len(contacts))
// 创建错误通道
errorCh := make(chan FailedRow, len(contacts))
// 创建 WaitGroup 用于同步 goroutine
var wg sync.WaitGroup
// 启动工作 goroutine
for i := 0; i < maxWorkers; i++ {
go HandleContactMigration(inputCh, errorCh, &wg)
}
// 将联系人数据发送到输入通道
for i := 0; i < len(contacts); i++ {
wg.Add(1)
inputCh <- contacts[i]
}
// 关闭输入通道
close(inputCh)
// 存储失败记录的切片
totalError := make([]FailedRow, 0)
// 启动 goroutine 处理错误信息
go func() {
for failedMigration := range errorCh {
if !failedMigration.Succeed {
totalError = append(totalError, failedMigration)
}
wg.Done()
}
}()
// 等待所有 goroutine 完成
wg.Wait()
// 关闭错误通道
close(errorCh)
// 生成最终响应
resp := FinalResponse{}
if len(totalError) == 0 {
resp.Status = "SUCCEED"
} else if len(totalError) == len(contacts) {
resp.Status = "FAILED"
resp.FailedRow = totalError
} else {
resp.FailedRow = totalError
resp.Status = "PARTIAL_SUCCEED"
}
// 打印最终响应
fmt.Println(resp)
}
// 处理单个联系人迁移的函数
func HandleContactMigration(rows <-chan Contact, errorCh chan FailedRow, wg *sync.WaitGroup) {
for row := range rows {
// 创建失败记录,默认 Succeed 为 false
errorResp := FailedRow{
Name: row.Name,
Mobile: row.Mobile,
Email: row.Email,
Succeed: false,
}
// 迁移单个联系人
err := MigrateSingleRow(row)
if err != nil {
fmt.Println(err)
errorResp.ErrorReason = err.Error()
errorCh <- errorResp
continue
}
// 迁移成功,设置 Succeed 为 true
errorResp.Succeed = true
errorCh <- errorResp
}
}
// 迁移单个联系人的函数,包括数据验证和保存
func MigrateSingleRow(row Contact) error {
// 验证邮箱地址
if !isValidEmail(row.Email) {
return fmt.Errorf("Invalid email")
}
// 验证手机号码
if !isValidMobile(row.Mobile) {
return fmt.Errorf("Invalid mobile")
}
// 保存到数据库
return SaveToDb(row)
}
// 保存联系人到数据库的函数(示例)
func SaveToDb(row Contact) error {
// 此处添加保存联系人到数据库的逻辑
return nil
}
// 验证手机号码的函数
func isValidMobile(phone string) bool {
if len(phone) != 11 || phone[0] != '1' {
return false
}
return true
}
// 验证邮箱地址的函数
func isValidEmail(email string) bool {
var emailRegex = regexp.MustCompile(`^[a-zA-Z0-9._%+-]+\@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`)
return emailRegex.MatchString(email)
}
代码解析
- 数据结构:定义了
Contact
、FailedRow
和FinalResponse
结构体,分别用于存储联系人信息、迁移失败信息和最终的迁移结果。 - 并发处理:通过
goroutine
并发处理每个联系人数据,使用sync.WaitGroup
来确保所有任务完成后才能继续执行。 - 错误处理:在迁移过程中,如果出现错误(如邮箱或手机号不合法),记录详细的错误原因。
- 结果响应:生成包含成功、失败或部分成功的最终结果,并打印输出。
扩展
- 可以根据需求调整
maxWorkers
的值,优化并发性能。 - 可以添加更详细的数据验证逻辑。
- 将 CSV 数据替换为其他格式(如 JSON)作为输入。
希望本文能帮助你更好地理解如何利用 Go 语言的并发特性来加速 CSV 文件的数据迁移过程!