Elasticsearch:Elasticsearch 开发入门 - Golang
在本文中,我將分享如何在 Golang 中如何使用?Elasticsearch 來開發的經驗。 順便說一句,以防萬一你從未聽說過 Elasticsearch:
Elasticsearch 是一個高度可擴展的開源全文本搜索和分析引擎。 它使你可以快速,近乎實時地存儲,搜索和分析大量數據。 它通常用作支持具有復雜搜索功能和要求的應用程序的基礎引擎/技術。
如果你想了解更多關于 Elasticsearch 的介紹,你可以參閱我之前的文章 “Elasticsearch 簡介”。
針對 Golang 的 Elasticsearch 支持,你可以訪問 Elastic 的官方 github?https://github.com/elastic/go-elasticsearch。
前提條件
創建一個 Golang?項目
我們在自己的電腦里創建一個如下的目錄:
mkdir go-elasticsearch cd go-elasticsearch接著我們在這個目錄里創建一個叫做 main.go 的文件。你可以使用你喜歡的編輯器,比如:
vi main.go在上面我們使用 vi 編輯器來創建 main.go 文件。
用于 Elasticsearch 的 Golang 驅動程序(go-elasticsearch)必須安裝在服務器的 $GOPATH 中。 使用 git 將庫的存儲庫克隆到 $GOPATH 中,如下例所示:
git clone --branch master https://github.com/elastic/go-elasticsearch.git $GOPATH/src/github.com/elastic/go-elasticsearch在編譯 Go 應用時,有時遇到庫不能從 github 上下載的錯誤信息。我們需要在 terminal 中打入如下的命令:
export GO111MODULE=on export GOPROXY=https://goproxy.io我們也可以使用如下的方法來達到安裝的 go-elasticsearch 的目的。我們需要在?go-elasticsearch 目錄下創建一個叫做 go.mod 的文件。它的內容如下:
go.mod
require github.com/elastic/go-elasticsearch/v7 master客戶端主要版本與兼容的 Elasticsearch 主要版本相對應:要連接到 Elasticsearch 7.x,請使用客戶端的 7.x 版本,要連接到Elasticsearch 6.x,請使用客戶端的 6.x 版本。
require github.com/elastic/go-elasticsearch/v7 7.x require github.com/elastic/go-elasticsearch/v7 7.0.0可以在一個項目中使用客戶端的多個版本:
// go.mod github.com/elastic/go-elasticsearch/v6 6.x github.com/elastic/go-elasticsearch/v7 7.x// main.go import (elasticsearch6 "github.com/elastic/go-elasticsearch/v6"elasticsearch7 "github.com/elastic/go-elasticsearch/v7" ) // ... es6, _ := elasticsearch6.NewDefaultClient() es7, _ := elasticsearch7.NewDefaultClient()安裝 Elasticsearch 及 Kibana
如果你之前從來沒有安裝過 Elasticsearch 或 Kibana。你可以閱讀我之前的文章 “Elastic:菜鳥上手指南” 來進行安裝。在本練習中,我們將使用 docker 來安裝 Elasticsearch 及 Kibana。我們首先來創建一個叫做 docker-compose.yml 的文件:
docker-compose.yml
--- version: "3" services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0container_name: es01environment:- node.name=es01- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeulimits:memlock:soft: -1hard: -1volumes:- esdata:/usr/share/elasticsearch/dataports:- 9200:9200kibana:image: docker.elastic.co/kibana/kibana:7.10.0ports:- 5601:5601depends_on:- elasticsearchvolumes:esdata:driver: local在上面,我們使用了 Elastic Stack 7.10.0 發行版作為實驗的版本。在你實際的使用中,你可以根據自己的版本需求而進行修改。
我們必須先啟動 docker,然后在命令行中執行:
docker-compose up上面命令必須執行于 docker-compose.yml 文件所在的目錄中。
它將啟動 http://localhost:9200 中的 Elasticsearch 和 http://localhost:5601 中的 Kibana。 你可以通過在瀏覽器中打開鏈接來進行驗證。
測試 elasticsearch 包
elasticsearch 軟件包將兩個單獨的軟件包捆綁在一起,分別用于調用 Elasticsearch API 和通過 HTTP 傳輸數據:esapi 和 estransport。
使用 elasticsearch.NewDefaultClient() 函數創建具有默認設置的客戶端。
main.go
package mainimport ("log"// Import the Elasticsearch library packages"github.com/elastic/go-elasticsearch/v7" )func main() {es, err := elasticsearch.NewDefaultClient()if err != nil {log.Fatalf("Error creating the client: %s", err)}res, err := es.Info()if err != nil {log.Fatalf("Error getting response: %s", err)}defer res.Body.Close()log.Println(res) }我們使用如下的命令來運行:
go run main.go上面的命令顯示的結果為:
$ go run main.go go: finding github.com/elastic/go-elasticsearch latest 2020/12/24 10:56:23 [200 OK] {"name" : "es01","cluster_name" : "docker-cluster","cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A","version" : {"number" : "7.10.0","build_flavor" : "default","build_type" : "docker","build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96","build_date" : "2020-11-09T21:30:33.964949Z","build_snapshot" : false,"lucene_version" : "8.7.0","minimum_wire_compatibility_version" : "6.8.0","minimum_index_compatibility_version" : "6.0.0-beta1"},"tagline" : "You Know, for Search" }注意:關閉并使用響應 body 至關重要,以便在默認的 HTTP 傳輸中重新使用持久性 TCP 連接。 如果你對響應正文不感興趣,請調用 io.Copy(ioutil.Discard,res.Body)。
當你 export ELASTICSEARCH_URL環境變量時,它將用于設置集群端點。 用逗號分隔多個地址。
要以編程方式設置集群端點,請將配置對象傳遞給 elasticsearch.NewClient() 函數。
cfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200","http://localhost:9201",},// ... } es, err := elasticsearch.NewClient(cfg)要設置用戶名和密碼,請將其包括在端點 URL 中,或使用相應的配置選項。
cfg := elasticsearch.Config{// ...Username: "foo",Password: "bar", }若要設置用于對群集節點的證書進行簽名的自定義證書頒發機構,請使用 CACert 配置選項。
cert, _ := ioutil.ReadFile(*cacert)cfg := elasticsearch.Config{// ...CACert: cert, }插入文檔到索引
在這個章節中,我將一步一步地指導如何如何使用 go-elasticsearch 驅動來把文檔導入到 Elasticsearch 中。
創建一個 Go 腳本并導入包
現在,我們已經確保正確安裝和設置了我們需要的所有內容,我們可以開始使用 Go 腳本了。 編輯之前的 main.go 文件,然后將 main 包放在頂部。 請確保導入所有必需的程序包和庫,如以下示例所示:
package mainimport ( "context" "encoding/json" "fmt" "log" "reflect" "strconv" "strings"// Import the Elasticsearch library packages "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" )在上面,我們使用 v7 版本,它對應于 Elastic Stack 7.x 版本的發布。在之前的部署中,我們使用的版本是 7.10。
為 Elasticsearch 文檔的字段創建結構數據類型
我們將使用 Golang struct 數據類型為要編制索引的 Elasticsearch 文檔以及索引的相應字段創建框架:
// Declare a struct for Elasticsearch fields type ElasticDocs struct {SomeStr stringSomeInt intSomeBool bool }聲明一個將 Elasticsearch 結構數據轉換為 JSON 字符串的函數
接下來,讓我們看一個簡單的函數,它將 Elasticsearch struct 文檔實例轉換為 JSON 字符串。 下面顯示的代碼可能看起來有些復雜,但是實際上發生的事情很簡單–所有功能所做的就是將結構轉換為字符串文字,然后將該字符串傳遞給 Golang 的 json.Marshal() 方法以使其返回字符串的JSON編碼:
// A function for marshaling structs to JSON string func jsonStruct(doc ElasticDocs) string {// Create struct instance of the Elasticsearch fields struct objectdocStruct := &ElasticDocs{SomeStr: doc.SomeStr,SomeInt: doc.SomeInt,SomeBool: doc.SomeBool,}fmt.Println("\ndocStruct:", docStruct)fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))// Marshal the struct to JSON and check for errorsb, err := json.Marshal(docStruct)if err != nil {fmt.Println("json.Marshal ERROR:", err)return string(err.Error())}return string(b) }聲明 main() 函數并創建一個新的 Elasticsearch Golang 客戶端實例
在我們的 Go 腳本中,所有 API 方法調用都必須位于 main() 函數內部或從另一個函數內部進行調用。 讓我們為 API 調用創建一個新的上下文對象,并為 Elasticsearch 文檔創建一個 map 對象:
func main() {// Allow for custom formatting of log outputlog.SetFlags(0)// Create a context object for the API callsctx := context.Background()// Create a mapping for the Elasticsearch documentsvar (docMap map[string]interface{})fmt.Println("docMap:", docMap)fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))實例化 Elasticsearch 客戶端配置和 Golang 客戶端實例
在這一步中,我們將實例化一個新的 Elasticsearch 配置對象。 確保將正確的主機和端口信息以及任何用戶名或密碼傳遞給其 “Adressess” 屬性。
// Declare an Elasticsearch configurationcfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200",},Username: "user",Password: "pass",}// Instantiate a new Elasticsearch client object instanceclient, err := elasticsearch.NewClient(cfg)if err != nil {fmt.Println("Elasticsearch connection error:", err)}檢查用于 Elasticsearch 的 Golang 客戶端在連接到集群時是否返回了任何錯誤
接下來,我們將檢查與 Elasticsearch 的連接是否成功或是否返回了任何錯誤:
// Have the client instance return a responseres, err := client.Info()// Deserialize the response into a map.if err != nil {log.Fatalf("client.Info() ERROR:", err)} else {log.Printf("client response:", res)}創建 Elasticsearch 結構文檔并將其放入數組
我們將聲明一個空字符串數組,以存儲當前以 JSON 字符串表示的 Elasticsearch 文檔。 以下代碼顯示了一些將用于索引的 Elasticsearch 文檔示例。 要設置其字段的值,你需要做的就是修改結構實例的屬性:
我們會將這些文檔實例傳遞給我們先前聲明的 jsonStruct() 函數,并使它們返回代表每個文檔的 JSON 字符串。 然后,我們將使用 Golang 的 append() 函數將 JSON 字符串添加到字符串數組中:
迭代 Elasticsearch 文檔數組并調用 Golang 客戶端的 IndexRequest() 方法
現在我們已經建立了一個文檔數組,我們將對其進行迭代,并在進行過程中向 Elasticsearch 集群發出 API 請求。 這些API調用將通過調用 Golang 驅動程序的 esapi.IndexRequest() 方法來索引文檔:
// Iterate the array of string documentsfor i, bod := range docs {fmt.Println("\nDOC _id:", i+1)fmt.Println(bod)// Instantiate a request objectreq := esapi.IndexRequest {Index: "some_index",DocumentID: strconv.Itoa(i + 1),Body: strings.NewReader(bod),Refresh: "true",}fmt.Println(reflect.TypeOf(req))在上面一定要注意的是:我們設置 Refresh 為 true。這在實際的使用中并不建議,原因是每次寫入的時候都會 refresh。當我們面對大量的數據時,這樣的操作會造成效率的底下。
檢查 IndexRequest() API 方法調用是否返回任何錯誤
在文檔數組上進行迭代的最后一步是從 API 調用中獲取響應,并檢查是否存在錯誤:
// Return an API response object from requestres, err := req.Do(ctx, client)if err != nil {log.Fatalf("IndexRequest ERROR: %s", err)}defer res.Body.Close()在下面顯示的代碼中,如果沒有錯誤返回,我們將解析 API 響應返回的結果對象:
if res.IsError() {log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)} else {// Deserialize the response into a map.var resMap map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {log.Printf("Error parsing the response body: %s", err)} else {log.Printf("\nIndexRequest() RESPONSE:")// Print the response status and indexed document version.fmt.Println("Status:", res.Status())fmt.Println("Result:", resMap["result"])fmt.Println("Version:", int(resMap["_version"].(float64)))fmt.Println("resMap:", resMap)fmt.Println("\n")}}} }每個文檔迭代都應打印出一個map[string]?interface{}?對象響應,如下所示:
resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:32 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:2 forced_refresh:true result:updated]
在上面,我們講了很多代碼。為了方便大家練習,我把整個 main.go 的代碼貼出來:
main.go
package mainimport ( "context" "encoding/json" "fmt" "log" "reflect" "strconv" "strings"// Import the Elasticsearch library packages "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" )// Declare a struct for Elasticsearch fields type ElasticDocs struct {SomeStr stringSomeInt intSomeBool bool }// A function for marshaling structs to JSON string func jsonStruct(doc ElasticDocs) string {// Create struct instance of the Elasticsearch fields struct objectdocStruct := &ElasticDocs{SomeStr: doc.SomeStr,SomeInt: doc.SomeInt,SomeBool: doc.SomeBool,}fmt.Println("\ndocStruct:", docStruct)fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))// Marshal the struct to JSON and check for errorsb, err := json.Marshal(docStruct)if err != nil {fmt.Println("json.Marshal ERROR:", err)return string(err.Error())}return string(b) }func main() {// Allow for custom formatting of log outputlog.SetFlags(0)// Create a context object for the API callsctx := context.Background()// Create a mapping for the Elasticsearch documentsvar (docMap map[string]interface{})fmt.Println("docMap:", docMap)fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))// Declare an Elasticsearch configurationcfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200",},Username: "user",Password: "pass",}// Instantiate a new Elasticsearch client object instanceclient, err := elasticsearch.NewClient(cfg)if err != nil {fmt.Println("Elasticsearch connection error:", err)}// Have the client instance return a responseres, err := client.Info()// Deserialize the response into a map.if err != nil {log.Fatalf("client.Info() ERROR:", err)} else {log.Printf("client response:", res)}// Declare empty array for the document stringsvar docs []string// Declare documents to be indexed using structdoc1 := ElasticDocs{}doc1.SomeStr = "Some Value"doc1.SomeInt = 123456doc1.SomeBool = truedoc2 := ElasticDocs{}doc2.SomeStr = "Another Value"doc2.SomeInt = 42doc2.SomeBool = false // Marshal Elasticsearch document struct objects to JSON stringdocStr1 := jsonStruct(doc1)docStr2 := jsonStruct(doc2)// Append the doc strings to an arraydocs = append(docs, docStr1)docs = append(docs, docStr2)// Iterate the array of string documentsfor i, bod := range docs {fmt.Println("\nDOC _id:", i+1)fmt.Println(bod)// Instantiate a request objectreq := esapi.IndexRequest {Index: "some_index",DocumentID: strconv.Itoa(i + 1),Body: strings.NewReader(bod),Refresh: "true",}fmt.Println(reflect.TypeOf(req))// Return an API response object from requestres, err := req.Do(ctx, client)if err != nil {log.Fatalf("IndexRequest ERROR: %s", err)}defer res.Body.Close()if res.IsError() {log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)} else {// Deserialize the response into a map.var resMap map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {log.Printf("Error parsing the response body: %s", err)} else {log.Printf("\nIndexRequest() RESPONSE:")// Print the response status and indexed document version.fmt.Println("Status:", res.Status())fmt.Println("Result:", resMap["result"])fmt.Println("Version:", int(resMap["_version"].(float64)))fmt.Println("resMap:", resMap)fmt.Println("\n")}}} }運行上面的代碼,我們將看到如下的輸出:
$ go run main.go go: finding github.com/elastic/go-elasticsearch latest docMap: map[] docMap TYPE: map[string]interface {} client response:%!(EXTRA *esapi.Response=[200 OK] {"name" : "es01","cluster_name" : "docker-cluster","cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A","version" : {"number" : "7.10.0","build_flavor" : "default","build_type" : "docker","build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96","build_date" : "2020-11-09T21:30:33.964949Z","build_snapshot" : false,"lucene_version" : "8.7.0","minimum_wire_compatibility_version" : "6.8.0","minimum_index_compatibility_version" : "6.0.0-beta1"},"tagline" : "You Know, for Search" } )docStruct: &{Some Value 123456 true} docStruct TYPE: *main.ElasticDocsdocStruct: &{Another Value 42 false} docStruct TYPE: *main.ElasticDocsDOC _id: 1 {"SomeStr":"Some Value","SomeInt":123456,"SomeBool":true} esapi.IndexRequestIndexRequest() RESPONSE: Status: 200 OK Result: updated Version: 4 resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:36 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:4 forced_refresh:true result:updated]DOC _id: 2 {"SomeStr":"Another Value","SomeInt":42,"SomeBool":false} esapi.IndexRequestIndexRequest() RESPONSE: Status: 200 OK Result: updated Version: 18 resMap: map[_id:2 _index:some_index _primary_term:1 _seq_no:37 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:18 forced_refresh:true result:updated]我們可以在 Kibana 中使用如下的命令來進行查看被導入的文檔:
GET some_index/_search搜索文檔
我們接下來搜索已經建立好的文檔。我們接下來搜索在 SomeStr? 這個字段含有 Another 的文檔。在 main.go 里添加如下的代碼:
// Search for the indexed document// Build the request bodyvar buf bytes.Bufferquery := map[string]interface{}{"query": map[string]interface{}{"match": map[string]interface{}{"SomeStr": "Another",},},}if err := json.NewEncoder(&buf).Encode(query); err != nil {log.Fatalf("Error encoding query: %s", err)}// Perform the search request.res, err = client.Search(client.Search.WithContext(context.Background()),client.Search.WithIndex("some_index"),client.Search.WithBody(&buf),client.Search.WithTrackTotalHits(true),client.Search.WithPretty(),)if err != nil {log.Fatalf("Error getting response: %s", err)}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {log.Fatalf("Error parsing the response body: %s", err)} else {// Print the response status and error information.log.Fatalf("[%s] %s: %s",res.Status(),e["error"].(map[string]interface{})["type"],e["error"].(map[string]interface{})["reason"],)}}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {log.Fatalf("Error parsing the response body: %s", err)}// Print the response status, number of results, and request duration.log.Printf("[%s] %d hits; took: %dms",res.Status(),int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),int(r["took"].(float64)),)// Print the ID and document source for each hit.for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])}同時由于我們使用了 bytes 模塊,我們需要在文檔的開始部分添加:
import ("context""encoding/json""fmt""log""reflect""strconv""strings""bytes"// Import the Elasticsearch library packages"github.com/elastic/go-elasticsearch/v7""github.com/elastic/go-elasticsearch/v7/esapi" )運行上面的代碼。我們可以看到如下新添加的結果:
[200 OK] 1 hits; took: 1ms
* ID=2, map[SomeBool:%!s(bool=false) SomeInt:%!s(float64=42) SomeStr:Another Value]
刪除文檔
刪除一個文檔非常容易。在 main.go 文件中,我們添加如下的代碼來刪除文檔 id 為 1 的文檔:
// Set up the request object.req := esapi.DeleteRequest{Index: "some_index",DocumentID: strconv.Itoa(1),}res, err = req.Do(context.Background(), client)if err != nil {log.Fatalf("Error getting response: %s", err)}重新運行 main.go 應用。我們再到 Kibana 中去查詢一下:
這次查詢我們會發現只有一個文檔存在。那個 id 為 2 的文檔雖然也被導入,但是又被刪除了。
為了方便大家的學習,我把代碼放在 github 上:https://github.com/liu-xiao-guo/go-elasticsearch-demo
總結
以上是生活随笔為你收集整理的Elasticsearch:Elasticsearch 开发入门 - Golang的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenCV中的图像处理 —— 霍夫线
- 下一篇: Synopsys AXI VIP wst