编程 使用 Go 语言并发处理 CSV 文件到数据库

2024-11-18 12:08:55 +0800 CST views 558

使用 Go 语言并发处理 CSV 文件到数据库

数据迁移是常见的任务,尤其当我们需要将数据从一个系统迁移到另一个系统时。传统的逐条处理方法在面对大量数据时效率较低,而 Go 语言提供了强大的并发机制,可以帮助我们更高效地完成数据迁移工作。本文将介绍如何利用 Go 语言的并发特性,将 CSV 文件中的联系人信息迁移到数据库中。

问题背景

假设你拥有一个包含大量联系人信息的 CSV 文件,需要将这些信息迁移到数据库中。这些联系人信息可能包含姓名、电话号码、邮箱地址等。如果使用传统的单线程方式,逐条处理数据,迁移过程可能会非常缓慢,尤其是在数据量很大的情况下。

并发解决方案

Go 语言的并发机制通过 goroutinechannel 提供了高效的解决方案。我们可以利用这些特性来并发处理 CSV 文件中的数据,将其迁移到数据库,提升整个迁移过程的速度。

代码示例

package main

import (
    "encoding/csv"
    "fmt"
    "log"
    "os"
    "regexp"
    "sync"
)

// 联系人信息结构体
type Contact struct {
    Name   string
    Mobile string
    Email  string
}

// 失败记录结构体
type FailedRow struct {
    Name        string
    Mobile      string
    Email       string
    ErrorReason string
    Succeed     bool
}

// 最终响应结构体
type FinalResponse struct {
    Status    string      `json:"status"`
    FailedRow []FailedRow `json:"failed_row"`
}

func main() {
    // 打开 CSV 文件
    file, err := os.Open("contacts.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // 创建 CSV 读取器
    reader := csv.NewReader(file)

    // 读取所有 CSV 记录
    records, err := reader.ReadAll()
    if err != nil {
        log.Fatal(err)
    }

    // 存储所有联系人的切片
    var contacts []Contact

    // 遍历 CSV 记录
    for i, record := range records {
        // 跳过第一行(表头)
        if i == 0 {
            continue
        }

        // 将记录映射到 Contact 结构体
        contact := Contact{
            Name:   record[0],
            Mobile: record[1],
            Email:  record[2],
        }

        // 将联系人添加到切片
        contacts = append(contacts, contact)
    }

    // 设置并发工作线程数量
    maxWorkers := 10

    // 创建输入通道
    inputCh := make(chan Contact, len(contacts))

    // 创建错误通道
    errorCh := make(chan FailedRow, len(contacts))

    // 创建 WaitGroup 用于同步 goroutine
    var wg sync.WaitGroup

    // 启动工作 goroutine
    for i := 0; i < maxWorkers; i++ {
        go HandleContactMigration(inputCh, errorCh, &wg)
    }

    // 将联系人数据发送到输入通道
    for i := 0; i < len(contacts); i++ {
        wg.Add(1)
        inputCh <- contacts[i]
    }

    // 关闭输入通道
    close(inputCh)

    // 存储失败记录的切片
    totalError := make([]FailedRow, 0)

    // 启动 goroutine 处理错误信息
    go func() {
        for failedMigration := range errorCh {
            if !failedMigration.Succeed {
                totalError = append(totalError, failedMigration)
            }
            wg.Done()
        }
    }()

    // 等待所有 goroutine 完成
    wg.Wait()

    // 关闭错误通道
    close(errorCh)

    // 生成最终响应
    resp := FinalResponse{}
    if len(totalError) == 0 {
        resp.Status = "SUCCEED"
    } else if len(totalError) == len(contacts) {
        resp.Status = "FAILED"
        resp.FailedRow = totalError
    } else {
        resp.FailedRow = totalError
        resp.Status = "PARTIAL_SUCCEED"
    }

    // 打印最终响应
    fmt.Println(resp)
}

// 处理单个联系人迁移的函数
func HandleContactMigration(rows <-chan Contact, errorCh chan FailedRow, wg *sync.WaitGroup) {
    for row := range rows {
        // 创建失败记录,默认 Succeed 为 false
        errorResp := FailedRow{
            Name:    row.Name,
            Mobile:  row.Mobile,
            Email:   row.Email,
            Succeed: false,
        }

        // 迁移单个联系人
        err := MigrateSingleRow(row)
        if err != nil {
            fmt.Println(err)
            errorResp.ErrorReason = err.Error()
            errorCh <- errorResp
            continue
        }

        // 迁移成功,设置 Succeed 为 true
        errorResp.Succeed = true
        errorCh <- errorResp
    }
}

// 迁移单个联系人的函数,包括数据验证和保存
func MigrateSingleRow(row Contact) error {
    // 验证邮箱地址
    if !isValidEmail(row.Email) {
        return fmt.Errorf("Invalid email")
    }

    // 验证手机号码
    if !isValidMobile(row.Mobile) {
        return fmt.Errorf("Invalid mobile")
    }

    // 保存到数据库
    return SaveToDb(row)
}

// 保存联系人到数据库的函数(示例)
func SaveToDb(row Contact) error {
    // 此处添加保存联系人到数据库的逻辑
    return nil
}

// 验证手机号码的函数
func isValidMobile(phone string) bool {
    if len(phone) != 11 || phone[0] != '1' {
        return false
    }
    return true
}

// 验证邮箱地址的函数
func isValidEmail(email string) bool {
    var emailRegex = regexp.MustCompile(`^[a-zA-Z0-9._%+-]+\@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`)
    return emailRegex.MatchString(email)
}

代码解析

  • 数据结构:定义了 ContactFailedRowFinalResponse 结构体,分别用于存储联系人信息、迁移失败信息和最终的迁移结果。
  • 并发处理:通过 goroutine 并发处理每个联系人数据,使用 sync.WaitGroup 来确保所有任务完成后才能继续执行。
  • 错误处理:在迁移过程中,如果出现错误(如邮箱或手机号不合法),记录详细的错误原因。
  • 结果响应:生成包含成功、失败或部分成功的最终结果,并打印输出。

扩展

  • 可以根据需求调整 maxWorkers 的值,优化并发性能。
  • 可以添加更详细的数据验证逻辑。
  • 将 CSV 数据替换为其他格式(如 JSON)作为输入。

希望本文能帮助你更好地理解如何利用 Go 语言的并发特性来加速 CSV 文件的数据迁移过程!

推荐文章

Vue3中如何处理WebSocket通信?
2024-11-19 09:50:58 +0800 CST
一键压缩图片代码
2024-11-19 00:41:25 +0800 CST
Roop是一款免费开源的AI换脸工具
2024-11-19 08:31:01 +0800 CST
Vue3中的JSX有什么不同?
2024-11-18 16:18:49 +0800 CST
回到上次阅读位置技术实践
2025-04-19 09:47:31 +0800 CST
支付页面html收银台
2025-03-06 14:59:20 +0800 CST
为什么大厂也无法避免写出Bug?
2024-11-19 10:03:23 +0800 CST
Vue 3 是如何实现更好的性能的?
2024-11-19 09:06:25 +0800 CST
pip安装到指定目录上
2024-11-17 16:17:25 +0800 CST
JavaScript数组 splice
2024-11-18 20:46:19 +0800 CST
程序员茄子在线接单