Golang 实现超大文件读取的两种方法

时间:2021-12-09 14:44:01

Golang超大文件读取的两个方案

 

流处理方式

分片处理

去年的面试中我被问到超大文件你怎么处理,这个问题确实当时没多想,回来之后仔细研究和讨论了下这个问题,对大文件读取做了一个分析

比如我们有一个log文件,运行了几年,有100G之大。按照我们之前的操作可能代码会这样写:

?
1
2
3
4
5
6
7
func ReadFile(filePath string) []byte{
    content, err := ioutil.ReadFile(filePath)
    if err != nil {
        log.Println("Read error")
    }
    return content
}

上面的代码读取几兆的文件可以,但是如果大于你本身及其内存,那就直接翻车了。因为上面的代码,是把文件所有的内容全部都读取到内存之后返回,几兆的文件,你内存够大可以处理,但是一旦上几百兆的文件,就没那么好处理了。

那么,正确的方法有两种

第一个是使用流处理方式代码如下

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ReadFile(filePath string, handle func(string)) error {
    f, err := os.Open(filePath)
    defer f.Close()
    if err != nil {
        return err
    }
    buf := bufio.NewReader(f)
 
    for {
        line, err := buf.ReadLine("\n")
        line = strings.TrimSpace(line)
        handle(line)
        if err != nil {
            if err == io.EOF{
                return nil
            }
            return err
        }
        return nil
    }
}

第二个方案就是分片处理

 

当读取的是二进制文件,没有换行符的时候,使用下面的方案一样处理大文件

?
1
2
3
4
5
6
7
8
9
10
11
12
func ReadBigFile(fileName string, handle func([]byte)) error {
    f, err := os.Open(fileName)
    if err != nil {
        fmt.Println("can't opened this file")
        return err
    }
    defer f.Close()
    s := make([]byte, 4096)
    for {
        switch nr, err := f.Read(s[:]); true {
        case nr < 0:
            fmt.Fprintf(os.Stderr, "cat: error reading: %s\n

补充:golang 读取大文件处理sync.pool + bufio.NewReader(f)

看代码吧~

 

文件大小

Golang 实现超大文件读取的两种方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
    "bufio"
    "fmt"
    "io"
    //"math"
    "os"
    "strings"
    "sync"
    "time"
)
func main() {
    /*
    文件数据样例
    {"remark": "来电时间:  2021/04/15 13:52:07客户电话:13913xx39xx ", "no": "600020510132021101310210547639", "title": "b-ae0e-0242ac100907", "call_in_date": "2021-04-15 13:52:12", "name": "张三", "_date": "2021-06-15", "name": "张三", "meet": "1"}
    1、我们取出 call_in_date": "2021-04-15 13:52:1的数据写入另一个文件
    */
    var (
        s time.Time //当前时间
        file *os.File
        fileStat os.FileInfo
        err error
        lastLineSize int64
    )
    s = time.Now()
    if file, err = os.Open("/Users/zhangsan/Downloads/log.txt");err != nil{
        fmt.Println(err)
    }
    defer func() {
        err = file.Close() //close after checking err
    }()
    //queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
    //if err != nil {
    //  fmt.Println("Could not able to parse the start time", startTimeArg)
    //  return
    //}
    //
    //queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
    //if err != nil {
    //  fmt.Println("Could not able to parse the finish time", finishTimeArg)
    //  return
    //}
    /**
    * {name:"log.log", size:911100961, mode:0x1a4,
    modTime:time.Time{wall:0x656c25c, ext:63742660691,
    loc:(*time.Location)(0x1192c80)}, sys:syscall.Stat_t{Dev:16777220,
    Mode:0x81a4, Nlink:0x1, Ino:0x118cba7, Uid:0x1f5, Gid:0x14, Rdev:0,
    Pad_cgo_0:[4]uint8{0x0, 0x0, 0x0, 0x0}, Atimespec:syscall.Timespec{Sec:1607063899, Nsec:977970393},
    Mtimespec:syscall.Timespec{Sec:1607063891, Nsec:106349148}, Ctimespec:syscall.Timespec{Sec:1607063891,
    Nsec:258847043}, Birthtimespec:syscall.Timespec{Sec:1607063883, Nsec:425808150},
    Size:911100961, Blocks:1784104, Blksize:4096, Flags:0x0, Gen:0x0, Lspare:0, Qspare:[2]int64{0, 0}}
    *
    */
    if fileStat, err = file.Stat();err != nil {
        return
    }
    fileSize := fileStat.Size()//72849354767
    offset := fileSize - 1
    //检测是不是都是空行 只有\n
    for {
        var (
            b []byte
            n int
            char string
        )
        b = make([]byte, 1)
        //从指定位置读取
        if n, err = file.ReadAt(b, offset);err != nil {
            fmt.Println("Error reading file ", err)
            break
        }
        char = string(b[0])
        if char == "\n" {
            break
        }
        offset--
        //获取一行的大小
        lastLineSize += int64(n)
    }
    var (
        lastLine []byte
        logSlice []string
        logSlice1 []string
    )
    //初始化一行大小的空间
    lastLine = make([]byte, lastLineSize)
    _, err = file.ReadAt(lastLine, offset)
    if err != nil {
        fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
        return
    }
    //根据条件进行区分
    logSlice = strings.Split(strings.Trim(string(lastLine),"\n"),"next_pay_date")
    logSlice1  = strings.Split(logSlice[1],"\"")
    if logSlice1[2] == "2021-06-15"{
        Process(file)
    }
    fmt.Println("\nTime taken - ", time.Since(s))
        fmt.Println(err)
}
func Process(f *os.File) error {
    //读取数据的key,减小gc压力
    linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 250*1024)
        return lines
    }}
    //读取回来的数据池
    stringPool := sync.Pool{New: func() interface{} {
        lines := ""
        return lines
    }}
    //一个文件对象本身是实现了io.Reader的 使用bufio.NewReader去初始化一个Reader对象,存在buffer中的,读取一次就会被清空
    r := bufio.NewReader(f) //
    //设置读取缓冲池大小 默认16
    r = bufio.NewReaderSize(r,250 *1024)
    var wg sync.WaitGroup
    for {
        buf := linesPool.Get().([]byte)
        //读取Reader对象中的内容到[]byte类型的buf中
        n, err := r.Read(buf)
        buf = buf[:n]
        if n == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
        }
        //补齐剩下没满足的剩余
        nextUntillNewline, err := r.ReadBytes('\n')
        //fmt.Println(string(nextUntillNewline))
        if err != io.EOF {
            buf = append(buf, nextUntillNewline...)
        }
        wg.Add(1)
        go func() {
            ProcessChunk(buf, &linesPool, &stringPool)
            wg.Done()
        }()
    }
    wg.Wait()
    return nil
}
func ProcessChunk(chunk []byte, linesPool *sync.Pool,stringPool *sync.Pool) {
//做相应的处理
}

执行

?
1
go run test2.go "2020-01-01T00:00:00.0000Z" "2020-02-02T00:00:00.0000Z" /Users/zhangsan/go/src/workspace/test/log.log
?
1
2
3
EOF
Time taken -  20.023517675s
<nil>

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。如有错误或未考虑完全的地方,望不吝赐教。

原文链接:https://blog.csdn.net/cpongo2ppp1/article/details/89383147