使用 KubernetesClient 操作 kubernetes
使用 KubernetesClient 操作 kubernetes
Intro
我們的應(yīng)用都是部署在 Kubernetes 上的,我們有一個(gè)服務(wù)內(nèi)部有一層 MemoryCache,之前會(huì)依賴(lài) Redis 的 Pub/Sub 來(lái)做緩存的更新,而 Redis 的 Pub/Sub 是一種不可靠的更新機(jī)制,容易發(fā)生消息丟失從而導(dǎo)致數(shù)據(jù)不一致的情況,之后用Stream代替了PUB/SUB。
之前我們有一次問(wèn)題就是因?yàn)檫@個(gè)導(dǎo)致的,在 k8s 集群里有幾個(gè) Pod 的數(shù)據(jù)還是老數(shù)據(jù),導(dǎo)致接口拿到的數(shù)據(jù)有時(shí)候是正確的有時(shí)候是錯(cuò)誤的,這就很尷尬了,發(fā)現(xiàn)有問(wèn)題,每次都去一個(gè)一個(gè)Pod 去檢查就很煩,于是就想寫(xiě)一個(gè)腳本或者小工具來(lái)自動(dòng)檢查所有集群所有 Pod 的返回值,于是就有了這篇文章的探索。
實(shí)現(xiàn)原理
Kubernetes 集群通過(guò) API Server 對(duì)外提供了 REST API 以方便通過(guò) API 來(lái)操作 Kubernetes 集群
Components of KubernetesA diagram showing how the parts of a Kubernetes cluster relate to one another
kubectl 實(shí)際工作方式就是一個(gè)和 API Server 進(jìn)行交互的命令行工具,所以我們完全可以自己根據(jù) Kubernetes 提供的 API 來(lái)實(shí)現(xiàn)我們需要的功能,而 Kubernetes 官方也維護(hù)了一個(gè) dotnet 的客戶(hù)端 ?KubernetesClient,從而我們可以少寫(xiě)很多代碼,直接使用這個(gè) SDK 就可以比較方便的對(duì) Kubernetes 進(jìn)行操作了。
想一下,如果我們使用 kubectl 的話要如何檢查一個(gè)集群所有的 pod 的返回結(jié)果呢?
首先我們可以通過(guò) kubectl get pod 來(lái)獲取一個(gè) pod 列表,拿到 pod 列表之后就可以依次訪問(wèn)各個(gè) pod 的 API 拿返回結(jié)果了,這里我想到的有兩種方式,一種是在 pod 里執(zhí)行 curl 命令,訪問(wèn) API 拿到返回的數(shù)據(jù),另一種方式是針對(duì) pod 進(jìn)行 port-forward,然后訪問(wèn) localhost 就可以請(qǐng)求接口拿到返回?cái)?shù)據(jù)了。
最后選擇的是 port-forward 的方式,因?yàn)橛械娜萜骼锟赡懿](méi)有 curl,不夠通用,所以放棄了在容器里 curl 的方式。
InspectSample
首先我們來(lái)看一下 KubernetesClient 基本的使用吧,來(lái)看一個(gè)簡(jiǎn)單的示例,遍歷所有的 namespace,依次獲取每個(gè) namespace 下的 pod
首先我們需要構(gòu)建一個(gè) IKubernetes 實(shí)例以和 Kubenetes 進(jìn)行通信,在此之前我們需要先構(gòu)建 KubernetesClientConfiguration,基本使用如下:
//?使用默認(rèn)的配置,默認(rèn)的?kubernetes?配置文件,Windows?是?`%PROFILE%/.kube/config`,Linux?是?`%HOME%/.kube/config` var?config?=?KubernetesClientConfiguration.BuildDefaultConfig(); //?使用指定的配置文件 //var?config?=?KubernetesClientConfiguration.BuildConfigFromConfigFile(file); IKubernetes?kubernetes?=?new?Kubernetes(config);InspectSample:
var?namespaces?=?_kubernetes.ListNamespace(); foreach?(var?ns?in?namespaces.Items) {var?namespaceName?=?ns.Metadata.Name;Console.WriteLine($"Namespace:{namespaceName}");var?pods?=?_kubernetes.ListNamespacedPod(namespaceName);foreach?(var?pod?in?pods.Items){var?podName?=?pod.Metadata.Name;Console.WriteLine($"??Pod:?{podName},?Labels:?{pod.Metadata.Labels.ToJson()}");var?containers?=?pod.Spec.Containers;foreach?(var?container?in?containers){Console.WriteLine($"????Container:?{container.Name}");}} }輸出結(jié)果如下:
需要注意的是,如果用戶(hù)沒(méi)有權(quán)限訪問(wèn)所有的命名空間時(shí),遍歷命名空間的時(shí)候就會(huì)報(bào)錯(cuò)
CreatePodSample
上面是一個(gè)簡(jiǎn)單列出 pod 的使用,接著我們來(lái)看一個(gè)創(chuàng)建 Pod 的示例,我們執(zhí)行 kubectl delete po/reservation 先把之前的 pod 刪掉,然后再通過(guò)代碼創(chuàng)建一個(gè) pod,創(chuàng)建 pod 的代碼如下:
const?string?namespaceName?=?"default"; const?string?podName?=?"reservation"; const?string?containerName?=?"reservation"; const?string?image?=?"weihanli/activityreservation:standalone";//?//?try?delete?pod?if?exits //?try //?{ //?????await?_kubernetes.DeleteNamespacedPodAsync(podName,?namespaceName); //?????Console.WriteLine($"Pod:{podName}?deleted"); //?} //?catch //?{ //?????// //?} //?await?ListPods();var?pod?=?new?V1Pod {Metadata?=?new?V1ObjectMeta{Name?=?podName,?NamespaceProperty?=?namespaceName,Labels?=?new?Dictionary<string,?string>(){{?"app",?"reservation"?}}},Spec?=?new?V1PodSpec(new?List<V1Container>(){new?V1Container(containerName){Image?=?image,Ports?=?new?List<V1ContainerPort>?{new(80)}}}), }; await?_kubernetes.CreateNamespacedPodAsync(pod,?namespaceName);await?ListPods();async?Task?ListPods() {var?pods?=?await?_kubernetes.ListNamespacedPodAsync(namespaceName);foreach?(var?item?in?pods.Items){Console.WriteLine($"{item.Metadata.Name},?{item.Metadata.Labels.ToJson()}");} }輸出結(jié)果如下:
reservation,?{"app":"reservation"}Port-Forward Sample
最后來(lái)看一下我們的 Port-Forward 的示例,示例代碼如下:
首先定義一個(gè)通用一點(diǎn)的 Port-Forward 的方法,根據(jù)官方給出的示例做了一些改動(dòng),更好的支持了 CancellationToken:
//?Port-forward,?modified?from?https://github.com/kubernetes-client/csharp/blob/master/examples/portforward/PortForward.cs#L24 private?static?async?Task?PortForward(IKubernetes?client,?V1Pod?pod,?CancellationToken?cancellationToken,int?hostPort) {Console.WriteLine($"Port-forward?started?for?pod?{pod.Metadata.Name}");//?Note?this?is?single-threaded,?it?won't?handle?concurrent?requests?well...var?webSocket?=?await?client.WebSocketNamespacedPodPortForwardAsync(pod.Metadata.Name,?pod.Namespace(),new[]?{80},?"v4.channel.k8s.io",?cancellationToken:?cancellationToken);var?demux?=?new?StreamDemuxer(webSocket,?StreamType.PortForward);demux.Start();var?stream?=?demux.GetStream((byte?)?0,?(byte?)?0);var?ipAddress?=?IPAddress.Loopback;var?localEndPoint?=?new?IPEndPoint(ipAddress,?hostPort);var?listener?=?new?Socket(ipAddress.AddressFamily,?SocketType.Stream,?ProtocolType.Tcp);listener.Bind(localEndPoint);listener.Listen(100);var?handler?=?listener.Accept();cancellationToken.Register(()?=>{try{handler.Close();listener.Close();handler.Dispose();listener.Dispose();demux.Dispose();webSocket.Dispose();}catch{//}Console.WriteLine("Port-forward?closed");});cancellationToken.ThrowIfCancellationRequested();//?Note?this?will?only?accept?a?single?connectionvar?accept?=?Task.Run(()?=>{var?bytes?=?new?byte[4096];while?(!cancellationToken.IsCancellationRequested){var?bytesRec?=?handler.Receive(bytes);stream.Write(bytes,?0,?bytesRec);if?(bytesRec?==?0?||?Encoding.ASCII.GetString(bytes,?0,?bytesRec).IndexOf("<EOF>",?StringComparison.OrdinalIgnoreCase)?>?-1)?break;}},?cancellationToken);var?copy?=?Task.Run(()?=>{var?buff?=?new?byte[4096];while?(!cancellationToken.IsCancellationRequested){var?read?=?stream.Read(buff,?0,?4096);handler.Send(buff,?read,?0);}},?cancellationToken);await?Task.WhenAny(accept,?copy); }使用示例如下,使用上面創(chuàng)建的 pod 來(lái)演示 port-forward:
var?pod?=?(await?_kubernetes.ListNamespacedPodAsync("default")).Items.First(x?=>?x.Name().Equals("reservation",?StringComparison.Ordinal));using?var?cts?=?new?CancellationTokenSource(); var?portForwardTask?=?PortForward(_kubernetes,?pod,?cts.Token,?8000);try {using?var?httpClient?=?new?HttpClient?{Timeout?=?TimeSpan.FromSeconds(10)};while?(true){try{var?response?=?await?httpClient.GetAsync("http://localhost:8000/api/notice",?cts.Token);Console.WriteLine(response.StatusCode);if?(response.IsSuccessStatusCode){Console.WriteLine(await?response.Content.ReadAsStringAsync(cts.Token));break;}}catch{//}Console.WriteLine("Waiting?for?port-forward?ready...");await?Task.Delay(1000,?cts.Token);} } catch?(Exception?e) {Console.WriteLine(e); } finally {cts.Cancel(); }//?wait?for?portForward?exit try {await?portForwardTask; } catch {//?ignore?port-forward?exit?exception }輸出結(jié)果如下:
More
通過(guò)上面的代碼,我們已經(jīng)可以實(shí)現(xiàn)訪問(wèn) pod 里容器的接口了,只需要將找到 pod 的代碼和 port-forward 的代碼組合一下就可以達(dá)到我們的目標(biāo)了,對(duì)于多個(gè)集群可以使用多個(gè)配置文件,遍歷一下就可以了,如果是在一個(gè)配置文件中也可以先獲取所有的 cluster,然后在構(gòu)建 config 的時(shí)候指定一個(gè) currentContext 就可以了
有了這個(gè)工具下次想檢查每個(gè) Pod 返回結(jié)果就只需要跑一下就可以比較方便的拿到所有?Pod 的返回結(jié)果了
更多 KubernetesClient 使用示例可以參考官方給出的示例:https://github.com/kubernetes-client/csharp/tree/master/examples
上面的代碼也可以從我的 Github 上獲取:https://github.com/WeihanLi/SamplesInPractice/tree/master/KubernetesClientSample
希望對(duì)你有所幫助~
References
https://github.com/kubernetes-client/csharp
https://kubernetes.io/docs/reference/using-api/client-libraries/
https://kubernetes.io/docs/reference/using-api/
https://github.com/WeihanLi/SamplesInPractice/tree/master/KubernetesClientSample
總結(jié)
以上是生活随笔為你收集整理的使用 KubernetesClient 操作 kubernetes的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: EFCore之增删改查
- 下一篇: 重磅!微软发布新一代 Teams 开发工