A bit ago, researchers proposed an interesting algorithm which can be used to estimate number of distinct elements in a large stream of elements containing much more data than available buffer size. For example, let’s say there is a stream of 1 000 000 numbers, and 10 000 of them distinct. How can we calculate number of distinct elements if we have buffer capable of holding only 1 000 numbers while processing full stream? Well, this is where CVM algorithm comes to the rescue. For anyone interested in more details about CVM algorithm, full paper is available to read.

Processing stream element includes generating random priority number that will be attached to each stream element, so data will be saved in pair. That means a data structure capable of holding up pairs of data would fit best for this implementation. As it turns out, treap data structure is exactly what’s needed. Name *treap* comes from combination of words *tree* and *heap*, which indicates that this data structure is combination of binary tree and heap. Since nodes consists of pairs (value + priority), nodes are organized as a binary tree based on a value and simultaneously in a heap based on priority.

Full implementation of CVM Algorithm using Go language can be found in GitHub repo. Buffer utilizing treap data structure is implemented in treap_buffer.go file. Structure is implemented in a way to accept any data type, so *Comparator* function needs to be defined in order to create treap buffer. This function will define how to compare values of wanted data type. It can be seen that 3 Go data types are defined in that file:

```
// Comparator is a function used to compare elements while saving them to a treap buffer.
// Function should return:
// return 0 if x == y.
// return < 0 (negative int) if x < y.
// return > 0 (positive int) if x > y.
type Comparator[T any] func(x, y T) int
type treapBuffer[T any] struct {
root *node[T]
currentSize int
comparator Comparator[T]
}
type node[T any] struct {
value T
priority float64
left *node[T]
right *node[T]
}
```

As it can be seen, *Comparator* is a function that accepts 2 arguments of any type, and returns integer. Return value shows whether first passed argument is bigger, smaller or same as second argument passed.

Type *treapBuffer* is used to store data about buffer, like pointer to current root node of treap data structure, current size of buffer (number of nodes), and comparator function used to compare values in nodes.

Finally, there is *node* structure type, used to save all node data. Node contains value of any type, priority and pointers to left and right children nodes.

Treap buffer implemented in file mentioned above is used in CVM algorithm implementation which can be found in cvm.go file. This file contains CVM Algorithm runner:

```
// A CVM structure is used to run CVM algorithm to estimate number of distinct elements.
type CVM[T any] struct {
buffer *treapBuffer[T]
bufferSize int
total int
p float64
}
```

CVM structure uses treapBuffer structure to store data, it has defined maximum buffer size and saves total number of currently processed elements. Finally, it stores *p* variable used by CVM algorithm calculations.

Repo holding this code is Go module, which means it can be fetched with one of these 2 commands:

go get github.com/tentameneu/cvm-go

go install github.com/tentameneu/cvm-go

And then imported to your code with

import "github.com/tentameneu/cvm-go"

Readme offers simple example of usage:

```
package main
import (
"fmt"
"github.com/tentameneu/cvm-go"
)
func main() {
total, distinct := 1_000_000, 50_000
bufferSize := 10_000
stream := make([]int, total)
for i := 0; i < total; i++ {
stream[i] = i % distinct
}
cvmSim := cvm.NewCVM(bufferSize, func(x, y int) int { return x - y })
for _, element := range stream {
cvmSim.Process(element)
}
fmt.Printf("Estimated number of distinct elements is %d\n", cvmSim.N())
// Output:
// Estimated number of distinct elements is 50161
}
```

There is a stream of 1 000 000 integer numbers. Stream is generated in a way so it contains 50 000 distinct numbers, and buffer we have for processing stream can hold only 10 000 values at once. CVM Algorithm runner will process all elements of the stream, and return estimated number of distinct number of elements processed so far. When all elements are processed, result 50 161 is returned (note that your result will probably be somewhat different due to natural randomness in algorithm, but it should be close to 50 000. Precision of that result could be calculated like

`(1 - |50 000 - 50 161|/50 000) * 100 = 99.678%`

That’s not bad, precision of 99.678% while using buffer that contains only 1/5 of actual distinct number of elements.

A lot of more different streams processing can be simulated with ** cvm-sim** application. This is Go application that imports

*cvm-go*library and runs it against user’s input passed through CLI parameters while logging important actions and result. Binary linked above can be used, but source code is also available in cvm-go-sim Github Repo, so application can be built from source code by following instructions in README file. To see all supported CLI parameters, run application with

*argument:*

**–help**```
cvm-sim --help
This program runs CVM algorithm simulator.
Algorithm estimates number of distinct elements in stream of elements much bigger than available buffer size.
Usage:
cvm-sim [arguments]
Supported arguments:
-buffer-size int
number of elements that can be stored in buffer while processing stream (default 10000)
-distinct int
number of distinct elements in generated test stream (default 5000000)
-file-path string
path to file containing numbers separated by whitespace. used in stream generated from file
-log-level string
logging level. valid values are: [info, debug, deep] (default "info")
-random-max int
used in random stream generator - generates values in range [random-min, random-max] (default 10000000)
-random-min int
used in random stream generator - generates values in range [random-min, random-max]
-stream-type string
how to generate test stream of elements. valid values are: [incremental, random, file] (default "incremental")
-total int
total number of elements in generated test stream (default 100000000)
```

There are 3 types of streams that can be generated by CVM simulator app: *incremental*, *random* and *file*.

Incremental stream is generated simply by incrementing numbers and than repeating them. It’s configured using ** -distinct** and

*arguments. For example, incremental stream containing 10 total elements, of which 5 are distinct will look like this:*

**-total**`[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]`

.Random stream is generated by generating random numbers and then repeating them. Same as incremental stream, ** -distinct** and

*arguments are used for for configuring stream data, but additionally,*

**-total****and**

*-random-min**are used to configure borders used while generating random numbers. So one example of random stream containing 10 total elements, of which 5 are distinct and numbers are between [100, 999] could be:*

**-random-max**`[145, 532, 981, 411, 745, 145, 532, 981, 411, 745]`

.File stream is generated from file. File must contain only numbers separated by single whitespace. File stream ignores *-distinct*, *-total*, *-random-min* and *-random-max* arguments but requires ** -file-path** argument containing path to input file. Actual total and distinct numbers will be calculated while processing file.

Output when app is run with default parameters is looking like this:

```
./cvm-sim
17:00:32.447 || INFO || Starting CVM Algorithm...
17:00:48.568 || INFO || Done estimating number of distinct elements. N=4974029 precision=99.48%
```

Have fun tweaking CLI arguments and running your own simulations!