Data Pagination Using Elasticsearch in Golang
Elasticsearch, a powerful search and analytics engine, provides robust capabilities for data management suited for the cases when non-relational databases are used. This article will explore how to use Elasticsearch’s features within Golang to implement different data pagination strategies.
Setting up Elasticsearch and Golang Environment
Before diving into the code, let’s set up the necessary environment. Ensure you have Elasticsearch installed and configured. Additionally, have Golang installed on your machine along with the required Elasticsearch Golang client libraries.
You can find how to setup dockerized ES on your local machine in my other article: https://satorsight.medium.com/setting-up-elasticsearch-with-localstack-in-docker-compose-5a48ebbdf7f1
For the Golang part I will use go 1.21.1 with go-elasticsearch/v7 v7.11.0 plus some common utility libraries like prometheus and zap.
Also, you can find entire project used in this article on github: https://github.com/SatorSight/go-elastic-w-pagination
Setting up Golang part
Here is how I prepare ES client using go-elasticsearch library:
// main.go
func prepareESClient() *es.Client {
esHost := "http://localhost:4566/es/us-east-1/my-data"
esUsername := ""
esPassword := ""
esIndex := "my-index"
lg, err := logger.New(logger.Config{
Level: "debug",
Encoding: "json",
Color: true,
Outputs: []string{"stdout"},
Tags: []string{},
}, "Development", "my-app", "1")
if err != nil {
panic("logger init error")
}
esLogger := logger.NewLoggerForEs(lg)
var t http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: false,
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
// DisableCompression - this is important, in dev env we start AWS Localstack without https,
// so we need to disable any compressions,
DisableCompression: true,
}
esCfg := elasticsearch.Config{
Addresses: []string{esHost},
Username: esUsername,
Password: esPassword,
CloudID: "",
APIKey: "",
Header: nil,
CACert: nil,
RetryOnStatus: nil, // List of status codes for retry. Default: 502, 503, 504.
DisableRetry: false,
EnableRetryOnTimeout: true,
MaxRetries: 3,
DiscoverNodesOnStart: false,
DiscoverNodesInterval: 0,
EnableMetrics: true,
EnableDebugLogger: true,
RetryBackoff: nil,
Transport: t,
Logger: esLogger,
Selector: nil,
ConnectionPoolFunc: nil,
}
maxTimeoutStr := "30s"
maxTimeout, _ := time.ParseDuration(maxTimeoutStr)
customStorageCfg := es.Config{
DefaultIndex: esIndex,
MaxSearchQueryTimeout: maxTimeout,
PathToMappingSchema: "",
IsTrackTotalHits: true, // always needed for cnt operations.
}
esClient, err := es.New(lg, esCfg, customStorageCfg)
if err != nil {
log.Fatalln("failed to init esClient")
}
return esClient
}
...
// es.go
func New(log *logger.Logger, esCfg elasticsearch.Config, customCfg Config) (*Client, error) {
es, err := elasticsearch.NewClient(esCfg)
if err != nil {
log.Info("Could not create new ElasticSearch client due error")
return nil, err
}
c := &Client{
log: log,
esCfg: esCfg,
esClient: es,
defaultIndex: customCfg.DefaultIndex,
maxSearchQueryTimeout: customCfg.MaxSearchQueryTimeout,
isTrackTotalHits: true,
}
return c, nil
}
Lets first create new index, for that we need to add mapping.json with ES mapping to the app root:
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"created_at": {
"type": "date"
},
"username": {
"type": "keyword"
}
}
}
}
Our ES index will contain users with id, created_at and username fields.
// main.go
func createIndex(client *es.Client, ctx context.Context, index string) {
err := client.CreateIndex(ctx, index, "mapping.json")
if err != nil {
log.Fatalf("failed to create index: %v", err)
}
}
// es.go
func (c *Client) CreateIndex(ctx context.Context, index string, mapping string) error {
var file []byte
file, err := os.ReadFile(mapping)
if err != nil || file == nil {
c.log.Fatal("Could not read file with mapping defaultIndex schema",
zap.String("path_to_mapping_schema", mapping),
zap.Error(err))
}
indexMappingSchema := string(file)
req := esapi.IndicesCreateRequest{
Index: index,
Body: strings.NewReader(indexMappingSchema),
}
res, err := req.Do(ctx, c.esClient)
if err != nil {
return fmt.Errorf("err creating defaultIndex: %v", err)
}
defer func() {
err = res.Body.Close()
if err != nil {
c.log.Error("res.Body.Close() problem", zap.Error(err))
}
}()
if res.IsError() {
return fmt.Errorf("err creating defaultIndex. res: %s", res.String())
}
return nil
}
Then in main.go
func main() {
esClient := prepareESClient() // described above
ctx := prepareContext()
createIndex(esClient, ctx, "my-index")
}
func prepareContext() context.Context {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
return ctx
}
After running it we will have index created, next lets load 100k users into it:
// main.go
func main() {
esClient := prepareESClient()
ctx := prepareContext()
create100kUsers(esClient, ctx, "my-index")
}
func create100kUsers(client *es.Client, ctx context.Context, index string) {
client.Create100kUsers(ctx, index)
}
// es.go
func (c *Client) Create100kUsers(ctx context.Context, index string) {
user := User{
ID: 0,
CreatedAt: time.Now(),
Username: "init",
}
for i := 0; i < 100000; i++ {
user.Username = fmt.Sprintf("%v %v", "user", i)
user.ID = i
err := c.Store(ctx, index, user)
if err != nil {
log.Fatal("failed to store", zap.Error(err))
}
}
}
This will run for a while but after it we will have an index with 100k users to test pagination on.
Fetching Data Without Pagination
Let’s start by looking at a simple data retrieval method from Elasticsearch in Golang without pagination. The following code demonstrates a basic retrieval mechanism. In Load function I omit most of the boilerplate code like error handling because it’s pretty wordy:
// main.go
func main() {
esClient := prepareESClient()
ctx := prepareContext()
res := simpleLoad(esClient, ctx, "my-index")
pp(res)
}
func simpleLoad(client *es.Client, ctx context.Context, index string) es.SearchResult {
// params after index are "from", "size", "cursor", cursor is ignored if its zero
res, err := client.Load(ctx, index, 0, 10, 0)
if err != nil {
log.Fatalf("failed to fetch results: %v", err)
}
return res
}
// es.go
func (c *Client) Load(
ctx context.Context,
index string,
from int,
size int,
cursor float64,
) (SearchResult, error) {
query := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]interface{}{},
},
}
sortQuery := []map[string]map[string]interface{}{
{"ID": {"order": "asc"}},
}
query["sort"] = sortQuery
if cursor != 0 {
query["search_after"] = []float64{cursor}
}
var buf bytes.Buffer
if err := jsoniter.NewEncoder(&buf).Encode(query); err != nil {
return SearchResult{}, fmt.Errorf("es.client.Load(): error encoding query: %v. err: %w", query, err)
}
var res *esapi.Response
var err error
res, err = c.esClient.Search(
c.esClient.Search.WithContext(ctx),
c.esClient.Search.WithTimeout(c.maxSearchQueryTimeout),
c.esClient.Search.WithIndex(index),
c.esClient.Search.WithBody(&buf),
c.esClient.Search.WithFrom(from),
c.esClient.Search.WithSize(size),
c.esClient.Search.WithTrackTotalHits(c.isTrackTotalHits),
c.esClient.Search.WithPretty(), // todo remove in case of performance degradation.
)
result :=
func() SearchResult {
totalCnt := int64(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
if totalCnt == 0 {
return SearchResult{}
}
cntFind := len(r["hits"].(map[string]interface{})["hits"].([]interface{}))
docs := make([]User, 0, cntFind)
var lastSort float64
for _, v := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
lastSort = v.(map[string]interface{})["sort"].([]interface{})[0].(float64)
doc := User{}
jsonBody, err := jsoniter.Marshal(v.(map[string]interface{})["_source"].(map[string]interface{}))
if err != nil {
c.log.Error("es.client.Load() err from jsoniter.Marshal",
zap.Any("v", v),
zap.Any("r['hits']", r["hits"]),
zap.Error(err),
)
return SearchResult{Users: docs, TotalCount: totalCnt}
}
if err = jsoniter.Unmarshal(jsonBody, &doc); err != nil {
c.log.Error("es.client.Load() err from jsoniter.Unmarshal",
zap.Any("v", v),
zap.Any("r['hits']", r["hits"]),
zap.Error(err),
)
return SearchResult{Users: docs, TotalCount: totalCnt}
}
docs = append(docs, doc)
}
return SearchResult{Users: docs, TotalCount: totalCnt, LastSort: lastSort}
}()
return result, nil
}
This function will fetch the first 10 users from ES. The request is gonna contain _search?from=0&size=10. The same function is gonna be reused further.
Let’s go next to an actual pagination.
Implementing Basic Pagination with from
and size
Probably first that comes to mind, is to do it the same way as we would do it in SQL based DBs and use offset and limit. ES also has this feature with from
and size
params. Let’s implement a basic paginated output iterating with from
and size
parameters:
func loadSimplePagination(client *es.Client, ctx context.Context, index string) []es.User {
from := 0
size := 10
var result []es.User
for i := from; i < 100; i += size {
res, err2 := client.Load(ctx, index, i, size, 0)
if err2 != nil {
log.Fatalf("failed to fetch results: %v", err2)
}
users := res.Users
result = append(result, users...)
}
return result
}
This will iterate through the first 10 pages and do 10 requests.
The problem
The problem begins if we have a lot of data and try to go beyond 10000 records depth. If we try to do that by making request with something like
?from=10000&size=10, we will get following error:
Result window is too large, from + size must be less than or equal to: [10000]
but was [10010]. See the scroll api for a more efficient way to request
large data sets. This limit can be set by changing the
[index.max_result_window] index level setting.
This means that we can use from&size pagination only for the first 10k records using certain sort conditions. The reason is written in ES docs:
Search requests take heap memory and time proportional to
from + size
and this limits that memory.
In theory we can raise that limit to something like 100k,
PUT /my-index/_settings
{
"index.max_result_window": 100000
}
but if we have millions of records it won’t solve the problem and at some point performance can start degrading. As a workaround ES has Scroll API and Search After API, the Scroll one is deprecated, so let use Search After.
Enhanced Pagination Using Cursors and search_after
API
To use Search After API we first need to choose a sort field and direction. In this article I will be using the most simple sort by ID ASC:
sortQuery := []map[string]map[string]interface{}{
{"ID": {"order": "asc"}},
}
In real projects integer ids are not always present (for example when using UUID), and in that case I would consider using created_at + uuid or inner “_id” field for pagination. For each field in sort subquery ES will give us cursors in response, for example:
"search_after": [
"1O9tYowBuAaJdMU4BeRn",
1702465740836
],
"sort": {
"_id": {
"order": "asc"
},
"created_at": {
"order": "asc"
}
}
And in the next query we can take the first record (or last if the order is DESC) and use them to paginate to the next set passing as follows.
query["search_after"] = []float64{cursor}
// or array of cursors if we use multiple fields for sort
Lets implement it in code:
func cursorPaginate(client *es.Client, ctx context.Context, index string) []es.User {
from := 0
size := 10
var res []es.User
// make initial request
initRes, _ := client.Load(ctx, index, from, size, 0)
ls := initRes.LastSort
res = append(res, initRes.Users...)
// make subsequent requests applying cursors
for i := 1; i < 10; i++ {
// third param is offset and its not used when using cursors
res2, err2 := client.Load(ctx, index, 0, size, ls)
if err2 != nil {
log.Fatalf("failed to fetch results: %v", err2)
}
ls = res2.LastSort
log.Printf("current cursor: %v\n", ls)
users := res2.Users
res = append(res, users...)
}
return res
}
This will consequently scroll results using cursors.
Another thing to be careful about is memory consumption, in this example all results are stored in memory which can overflow at some point if loading too much data. In real world apps I would rather flush data between requests to some file or send it somewhere else to prevent leaks.
Another thing to be careful about is that when using multiple cursors, the order of them is important, and should be the same as the order of the fields mentioned in the sort. For example,
sortQuery := []map[string]map[string]interface{}{
{"created_at": {"order": "asc"}},
{"_id": {"order": "asc"}},
}
...
"search_after": [
"1O9tYowBuAaJdMU4BeRn", // cursor for _id
1702465740836 // cursor for created_at
],
// error!
This one will give 400 error from the go-elasticsearch library because it will try to cast cursors into the wrong type, in created_at cursor is float64 timestamp and for _id (inner uuid-like thing) it will be string.
Conclusion
Elasticsearch offers powerful tools for handling data efficiently, especially when it comes to pagination in large datasets. While basic pagination methods like from
and size
can be suitable for small datasets, the cursor-based approach using search_after
API proves to be more scalable and efficient for extensive pagination needs.