rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Microservices in Go: Searching with Elasticsearch
May 24, 2021

Disclaimer: This post includes Amazon affiliate links. If you click on one of them and you make a purchase I’ll earn a commission. Please notice your final price is not affected at all by using those links.

What is Elasticsearch?

Source (emphasis mine):

… is a distributed, free and open search and analytics engine for all types of data, including textual, numerical, geospatial, structured, and unstructured.

Elasticsearch is the central component of the Elastic Stack, commonly known as ELK, this stack consists of the following services:

  • E: Elasticsearch,
  • L: Logstash, and
  • K: Kibana.

Elasticsearch is a well known tool, already supported by all three big cloud providers:

Using Elasticsearch in Go

There are two two popular packages for interacting with Elasticsearch in Go:

Which one should we choose?

Both of those packages are well supported and production ready, however between the two of them I’m leaning towards using elastic/go-elasticsearch instead of olivere/elastic this is because of the comment made by Oliver Eilhard (the author of olivere/elastic) on Github where he mentions that perhaps a v8 of that package will not be available, that could be an issue if you’re planning to upgrade to Elasticsearch Version 8 when that version is available.

To be fair, to date, the work done by Oliver Eilhard simplifies a lot of the things needed when interacting with the Elasticsearch API which otherwise would have to be explicitly indicated when using the official one, this is because the way olivere/elastic/v7 is implemented it literally covers all the possible options available in the official API and because it’s implemented using a fluent-like API its usage and default values are predefined from the beginning.



The code used for this post is available on Github.

Using elastic/go-elasticsearch

For implementing the Elasticsearch datastore we are going to follow the Repository Pattern to update our “To Do Microservice”. We will define a new elasticsearch package with a new type, it will include the corresponding logic needed for indexing, searching as well as deleting records. This type will be called as part of the already existing service.Task, finally we will expose this feature to our clients via a new HTTP API for searching tasks.

The code in practice, for indexing, looks more or less like this:

func (t *Task) Index(ctx context.Context, task internal.Task) error {
	// XXX: Excluding OpenTelemetry and error checking for simplicity

	body := indexedTask{
		ID:          task.ID,
		Description: task.Description,
		Priority:    task.Priority,
		IsDone:      task.IsDone,
		DateStart:   task.Dates.Start.UnixNano(),
		DateDue:     task.Dates.Due.UnixNano(),
	}

	var buf bytes.Buffer

	_ = json.NewEncoder(&buf).Encode(body) // XXX: error omitted

	req := esv7api.IndexRequest{
		Index:      t.index,
		Body:       &buf,
		DocumentID: task.ID,
		Refresh:    "true",
	}

	_ = req.Do(ctx, t.client) // XXX: error omitted
	defer resp.Body.Close()

	io.Copy(ioutil.Discard, resp.Body)

	return nil
}

And for searching it would look similar to:

func (t *Task) Search(ctx context.Context, description *string, priority *internal.Priority, isDone *bool) ([]internal.Task, error) {
	// XXX: Excluding OpenTelemetry and error checking for simplicity

	if description == nil && priority == nil && isDone == nil {
		return nil, nil
	}

	should := make([]interface{}, 0, 3)

	if description != nil {
		should = append(should, map[string]interface{}{
			"match": map[string]interface{}{
				"description": *description,
			},
		})
	}

	if priority != nil {
		should = append(should, map[string]interface{}{
			"match": map[string]interface{}{
				"priority": *priority,
			},
		})
	}

	if isDone != nil {
		should = append(should, map[string]interface{}{
			"match": map[string]interface{}{
				"is_done": *isDone,
			},
		})
	}

	var query map[string]interface{}

	if len(should) > 1 {
		query = map[string]interface{}{
			"query": map[string]interface{}{
				"bool": map[string]interface{}{
					"should": should,
				},
			},
		}
	} else {
		query = map[string]interface{}{
			"query": should[0],
		}
	}

	var buf bytes.Buffer

	_ = json.NewEncoder(&buf).Encode(query)

	req := esv7api.SearchRequest{
		Index: []string{t.index},
		Body:  &buf,
	}

	resp, _ = req.Do(ctx, t.client) // XXX: error omitted
	defer resp.Body.Close()

	var hits struct {
		Hits struct {
			Hits []struct {
				Source indexedTask `json:"_source"`
			} `json:"hits"`
		} `json:"hits"`
	}

	_ = json.NewDecoder(resp.Body).Decode(&hits) // XXX: error omitted

	res := make([]internal.Task, len(hits.Hits.Hits))

	for i, hit := range hits.Hits.Hits {
		res[i].ID = hit.Source.ID
		res[i].Description = hit.Source.Description
		res[i].Priority = internal.Priority(hit.Source.Priority)
		res[i].Dates.Due = time.Unix(0, hit.Source.DateDue).UTC()
		res[i].Dates.Start = time.Unix(0, hit.Source.DateStart).UTC()
	}

	return res, nil
}

In both cases please refer to the original implementation for concrete details, like I mentioned before there’s also a Delete method meant to be called when deleting previously indexed Tasks.

To connect those calls we need to update our service type to explicitly call the Search Store to execute the new actions we already defined; in future posts I will cover how to do this using events instead of explicitly calling stores directly in the service implementation.

For example the service.Task type will be doing something like the following in Create:

// Create stores a new record.
func (t *Task) Create(ctx context.Context, description string, priority internal.Priority, dates internal.Dates) (internal.Task, error) {
	// XXX: Excluding OpenTelemetry and error checking for simplicity

	task, _ := t.repo.Create(ctx, description, priority, dates)

	_ = t.search.Index(ctx, task) // XXX: New Search call to index and store records

	return task, nil
}

Similar calls are going to be added to index and a new method for searching records:

func (t *Task) By(ctx context.Context, description *string, priority *internal.Priority, isDone *bool) ([]internal.Task, error) {
  // XXX: Excluding OpenTelemetry and error checking for simplicity

	res, _ := t.search.Search(ctx, description, priority, isDone) // XXX: error omitted

	return res, nil
}

Similar to the other datastore used by this service.Task we also use Dependency Injection to refer to the concrete initialized elasticsearch.Task store.

Conclusion

Elasticsearch is a powerful tool that allows searching records in multiple different ways, it scales horizontally and supports multiple ways to index and search values, for example when using text-based fields it could transform numbers into words to allow searching both the actual literal number and the humanized way to express the value.

Similarly it allows scoring fields to sort them depending on how many matches they provide giving you the flexibility to display those first by popularity, as well as other popular features like fuzzy searching that match records related to the searched terms.

One thing to keep in mind when using Elasticsearch is that sometimes major version upgrades bring behavior-like breaking changes even if the API is still the same.

Elasticsearch includes more than what I mentioned above it’s a really powerful tool that could also be used for analytics, it could be overwhelming to use for sure but it’s a tool that should not be ignored.

If you’re looking to sink your teeth into more Elasticsearch-related topics I recommend the following books:


Back to posts