- Introduction
- Setting up the baseline
- 1. First Pass Solution
- 2. Process strings differently
- 3. Use local maps instead of sync.Map
- 4. Custom float parser
- 5. Allocations and Syscall
- 6. Custom hash table and reduce allocations
- Conclusion
In this blog post, we will explore the "1 Billion Row Challenge" using the Go programming language.
The challenge involves reading a text file containing 1 billion rows of data containing station names and their temperatures separated by a semicolon. There can be atmost 10,000 unique station names. The goal is to calculat the minimum, maximum and average temperature for each station. You can read more about the challenge here. This repository also contains generator code to create the input data file.
I would seriously urge you to try and solve this challenge on your own before reading further. It is a fun exercise and will help you learn a lot about Go.
By the way, You can look at my repository here m3rashid/1brc. Almost every commit is a different iteration of the solution. You can cycle through the commits to see how the solution evolved over time.
Before going into the solution, I wanted to setup a baseline to compare my solution against. There can be a lot of factors that can affect the performance of my solution. The baseline will help me understand if my solution is actually better than a naive approach.
On the actual submission, it runs in a Hetzner AX161 dedicated server (32 core AMD EPYC™ 7502P (Zen2), 128 GB RAM), and the data is loaded from a RAM Disk. So, reading from the disk is not a bottleneck in the actual submission.
But to test things locally, I have a linux machine with the following configuration
OS Fedora 42 (Workstation) with Linux Kernel 6.16.5
Language Go version go1.25.0 linux/amd64
CPU AMD Ryzen 7 7435HS, 2100MHz, 8 core, 16 threads
RAM 32G (2 Slots of 16G each), 4800MHz DDR5
SSD 512G (17.0 GB/s)
I could create a 20G RAM Disk and load the data file into it using the linux tempfs, but I wanted to keep things simple and real.
Anyways, if you're interested in setting up a RAM Disk in Linux, you can do so by running the following command:
sudo mount -t tmpfs -o size=20G tmpfs /mnt/ramdisk
Make sure you have enough free RAM before creating a RAM Disk.
The first obvious step is to check the speed of your disk read/write operations. So, I generated the measurements.txt
file (~13G) and wrote to /dev/null to measure the speed.
dd if=measurements.txt of=/dev/null bs=1M iflag=direct
# which outputs
13795358366 bytes (14 GB, 13 GiB) copied, 0.828823 s, 16.6 GB/s
This means, we can never go beyond 16.6 GB/s in reading the data from the disk. So, that extra ~0.8 seconds is the overhead of reading the file. So, the absolute best we can do is ~0.8 seconds (and this is just reading the data, any processing is not included).
lscpu | grep 'Model name\|CPU MHz\|Core(s) per socket\|Thread(s) per core'
# which outputs
Model name: AMD Ryzen 7 7435HS
Thread(s) per core: 2
Core(s) per socket: 8
You can also run lscpu
or cat /proc/cpuinfo
for more details. This CPU has 8 cores and 16 threads.
free -h
# which outputs
total used free shared buff/cache available
Mem: 31Gi 8.3Gi 646Mi 128Mi 22Gi 22Gi
Swap: 8.0Gi 120Ki 8.0Gi
Although, this is not a big for this challenge. The RAM is enough to hold the entire data in memory. If this would be less then 13G,then we would have to read data in chunks, which would complicate the solution. This would lead to thrashing and that would be a different challenge altogether.
Although not a big factor, I am using Go 1.25 on Fedora 42 with Linux Kernel 6.16.5.
I knew if we have to go anywhere close to the disk read speed, we have to read the file in parallel. To work with it parallelly, I need to chunk the data and process each chunk simultaneously.
- Parallelly read the file in chunks
- Divide the whole file into N chunks
- Process each chunk in a separate goroutine
- Merge the results from each goroutine
- Print the final results
Here is the code for the first iteration:
package main
import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
)
const fileName = "inputs/input1B.txt"
var workers = runtime.NumCPU()
type WhetherReport struct {
max float32
min float32
sum float32
occurrence float32
}
var weatherMap sync.Map
func process(line string) {
var station string
var tempStr string
flipped := false
for _, ch := range line {
char := string(ch)
if char == ";" {
flipped = true
continue
}
if char == "\n" {
continue
}
if flipped {
tempStr = tempStr + char
} else {
station = station + char
}
}
temp64, _ := strconv.ParseFloat(tempStr, 32)
temp := float32(temp64)
data, found := weatherMap.Load(station)
if found {
v := data.(WhetherReport)
weatherMap.Store(station, WhetherReport{
occurrence: v.occurrence + 1,
max: max(v.max, temp),
min: min(v.min, temp),
sum: v.sum + temp,
})
} else {
weatherMap.Store(station, WhetherReport{
max: temp,
min: temp,
sum: temp,
occurrence: 1,
})
}
}
func main() {
runtime.GOMAXPROCS(workers)
file, err := os.Open(fileName)
if err != nil {
panic(err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
panic(err)
}
size := info.Size()
chunkSize := size / int64(workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
start := int64(id) * chunkSize
end := start + chunkSize
if id == -1 {
end = size
}
f, err := os.Open(fileName)
if err != nil {
panic(err)
}
defer f.Close()
f.Seek(start, os.SEEK_SET)
r := bufio.NewReader(f)
// if not the first worker, align to next newline
if start > 0 {
_, err := r.ReadBytes('\n')
if err != nil {
return
}
}
var pos = start
for {
if pos >= end {
break
}
line, err := r.ReadBytes('\n')
if len(line) > 0 {
process(string(line))
}
pos += int64(len(line))
if err != nil {
break
}
}
}(i)
}
wg.Wait()
outputFileName := strings.Replace(fileName, "input", "output", 2)
err = os.Remove(outputFileName)
if err != nil {
// no-op
}
err = os.MkdirAll(filepath.Dir(outputFileName), 0755)
if err != nil {
log.Println("Error creating output directory")
panic(err)
}
outfile, err := os.Create(outputFileName)
if err != nil {
log.Println("Error creating output file")
panic(err)
}
defer outfile.Close()
weatherMap.Range(func(key, value interface{}) bool {
v := value.(WhetherReport)
outfile.Write([]byte(fmt.Sprintf("%s, %f, %f, %f\n", key, v.min, v.max, v.sum/v.occurrence)))
return true
})
}
This uses sync.Map
to store the intermediate results from each goroutine. Each goroutine reads its own chunk of data and processes it line by line. The process
function updates the sync.Map
with the min, max, sum and occurrence of each station.
sync.Map
is a concurrent map that is safe for concurrent use by multiple goroutines without additional locking or coordination. It is optimized for scenarios where the keys are read frequently but written infrequently.
This solution took around 36.5 seconds.
A slight improvement I made by not iterating over each character in the line to split the station and temperature. Instead, I used strings.Index
to find the index of the semicolon and then used slicing to get the station and temperature.
func process(line string) {
idx := strings.IndexByte(line, ';')
station := line[:idx]
tempStr := line[idx+1:]
// ...
}
and changing how we write to the output file
fmt.Fprintf(outfile, "%s, %f, %f, %f\n", key, v.min, v.max, v.sum/v.occurrence)
This reduced the time to around 27.57 seconds. At the scale we're working with, even small optimizations can have a significant impact on performance.
I found that using sync.Map
was not the most efficient way to store the intermediate results because:
- It uses a lot of locking
- All the threads contending for the same map
- It is not cache friendly
So, I decided to use a local map for each goroutine and then merge the results at the end. This way, each goroutine has its own map and there is lesser contention compared to sync.Map
.
func process(line []byte, m *map[string]WhetherReport) {
idx := bytes.IndexByte(line, ';')
station := string(line[:idx])
temp, _ := strconv.ParseFloat(string(line[idx+1:len(line)-1]), 32)
v, ok := (*m)[station]
t := float32(temp)
if ok {
if t > v.max {
v.max = t
}
if t < v.min {
v.min = t
}
v.sum += t
v.occurrence++
} else {
v = WhetherReport{max: t, min: t, sum: t, occurrence: 1}
}
(*m)[station] = v
}
and merging in the end
final := make(map[string]WhetherReport)
for _, local := range results {
for station, v := range local {
if existing, ok := final[station]; ok {
if v.max > existing.max {
existing.max = v.max
}
if v.min < existing.min {
existing.min = v.min
}
existing.sum += v.sum
existing.occurrence += v.occurrence
final[station] = existing
} else {
final[station] = v
}
}
}
var finalStr strings.Builder
for k, v := range final {
finalStr.WriteString(fmt.Sprintf("%s, %f, %f, %f\n", k, v.min, v.max, v.sum/v.occurrence))
}
// ...
This was the most significant improvement and reduced the time to around 10.56 seconds.
Because the number would always be in the format of xx.x
, I wrote a custom float parser to parse the temperature. This reduced the time to around 9.91 seconds.
func parseTemp(b []byte) float32 {
sign := float32(1)
i := 0
if b[0] == '-' {
sign = -1
i++
}
val := float32(0)
for ; i < len(b) && b[i] != '.'; i++ {
val = val*10 + float32(b[i]-'0')
}
if i < len(b) && b[i] == '.' {
i++
frac, div := float32(0), float32(1)
for ; i < len(b); i++ {
frac = frac*10 + float32(b[i]-'0')
div *= 10
}
val += frac / div
}
return sign * val
}
Also, I added the sorting and printing on the console in the end (as the challenge expected) instead of writing to a file. This reduced the time to ~9.78 seconds
As you can see, we are opening the file again in each goroutine. But, we can open it once and map different sections of the file to memory using syscall.Mmap
. This way, we can avoid the overhead of opening the file multiple times and also avoid the overhead of reading from disk multiple times.
data, err := syscall.Mmap(
int(fd.Fd()),
0,
int(size),
syscall.PROT_READ,
syscall.MAP_SHARED,
)
if err != nil {
panic(err)
}
defer syscall.Munmap(data)
This reduced the time to ~7 seconds. But, I was expecting a bigger improvement here. So, I gave another shot to this using the github.com/edsrzf/mmap-go package. This reduced the time to ~6.35 seconds.
The final improvement I made was to use a custom hash table instead of the built-in map. This way, I can avoid the overhead of map lookups and also try to reduce the number of allocations. This is my current solution and it runs in ~2.7 seconds.
And here is the complete code
package main
import (
"bytes"
"fmt"
"os"
"runtime"
"slices"
"github.com/edsrzf/mmap-go"
)
const fileName = "inputs/input1B.txt"
type WhetherReport struct {
Max int
Min int
Sum int64
Count int
}
type Item struct {
Key []byte
Stat *WhetherReport
}
const (
NumBuckets = 1 << 17
Offset64 = 14695981039346656037
Prime64 = 1099511628211
)
type MemChunk struct {
start, end int
}
func splitMem(mem mmap.MMap, n int) []MemChunk {
total := len(mem)
chunkSize := total / n
chunks := make([]MemChunk, n)
chunks[0].start = 0
for i := 1; i < n; i++ {
for j := i * chunkSize; j < i*chunkSize+50; j++ {
if mem[j] == '\n' {
chunks[i-1].end = j
chunks[i].start = j + 1
break
}
}
}
chunks[n-1].end = total - 1
return chunks
}
func process(ch chan map[string]*WhetherReport, data mmap.MMap, start int, end int) {
temperature := 0
prev := start
hash := uint64(Offset64)
items := make([]Item, NumBuckets)
size := 0
for i := start; i <= end; i++ {
hash ^= uint64(data[i])
hash *= Prime64
if data[i] == ';' {
stationBytes := data[prev:i]
temperature = 0
i += 1
negative := false
for data[i] != '\n' {
ch := data[i]
if ch == '.' {
i += 1
continue
}
if ch == '-' {
negative = true
i += 1
continue
}
ch -= '0'
if ch > 9 {
panic("invalid character")
}
temperature = temperature*10 + int(ch)
i += 1
}
if negative {
temperature = -temperature
}
hashIndex := int(hash & uint64(NumBuckets-1))
for {
if items[hashIndex].Key == nil {
items[hashIndex] = Item{
Key: stationBytes,
Stat: &WhetherReport{Min: temperature, Max: temperature, Sum: int64(temperature), Count: 1},
}
size++
if size > NumBuckets/2 {
panic("too many buckets")
}
break
}
if bytes.Equal(items[hashIndex].Key, stationBytes) {
s := items[hashIndex].Stat
s.Min = min(s.Min, temperature)
s.Max = max(s.Max, temperature)
s.Sum += int64(temperature)
s.Count++
break
}
hashIndex++
if hashIndex >= NumBuckets {
hashIndex = 0
}
}
prev = i + 1
temperature = 0
hash = uint64(Offset64)
}
}
measurements := make(map[string]*WhetherReport)
for _, item := range items {
if item.Key == nil {
continue
}
measurements[string(item.Key)] = item.Stat
}
ch <- measurements
}
func main() {
var workers = runtime.NumCPU()
runtime.GOMAXPROCS(workers)
dataFile, err := os.Open(fileName)
if err != nil {
panic(err)
}
defer dataFile.Close()
data, err := mmap.Map(dataFile, mmap.RDONLY, 0)
if err != nil {
panic(err)
}
defer data.Unmap()
chunks := splitMem(data, workers)
totals := make(map[string]*WhetherReport, workers)
measurementsChan := make(chan map[string]*WhetherReport)
for i := 0; i < workers; i++ {
go process(measurementsChan, data, chunks[i].start, chunks[i].end)
}
for i := 0; i < workers; i++ {
measurements := <-measurementsChan
for station, measurement := range measurements {
total := totals[station]
if total == nil {
totals[station] = measurement
} else {
total.Count += measurement.Count
total.Min = min(total.Min, measurement.Min)
total.Max = max(total.Max, measurement.Max)
total.Sum += measurement.Sum
}
}
}
length := len(totals)
stationNames := make([]string, 0, length)
for k := range totals {
stationNames = append(stationNames, k)
}
slices.Sort(stationNames)
fmt.Printf("{")
for i, station := range stationNames {
v := totals[station]
fmt.Printf("%s=%.1f/%.1f/%.1f", station, float32(v.Min/10), float32(v.Max/10), float32(v.Sum/(10*int64(v.Count))))
if i < length-1 {
fmt.Printf(", ")
}
}
fmt.Printf("}\n")
}
One of the optimizations I make is to use Int
instead of Float32
to store the temperature. Since the temperature is always in the format of xx.x
, we can multiply it by 10 and store it as an integer. This way, we can avoid the overhead of floating-point arithmetic.
Challenges like this are a great way to learn and improve your skills. I learned a lot about Go and performance optimization, operating systems, memory allocations, disk io among a lot of other things. I hope you found this blog post helpful and informative. Feel free to reach out to me on Twitter @m3_rashid or Linkedin MD Rashid Hussain.