Apache Kafka API AdminClient Scram账户的操作(增删改查)
前言
很久沒有更新Kafka API相關(guān)的文檔了,因?yàn)楣P者工作變動(dòng)Kafka這部分內(nèi)容在工作中接觸的就相對(duì)于之前少了一些。但架不住kafka官方還是一如既往的勤奮,官方操作Scram賬戶的創(chuàng)建與刪除這部分已經(jīng)更新了好久了,這次也算是填坑吧,主要就是針對(duì)alterUserScramCredentials方法做一個(gè)功能說明和demo。給網(wǎng)上少之又少的Kafka API中文使用教程做個(gè)增補(bǔ),本次基于Kafka API 2.8.0,同時(shí)適用于2.7.0版本,但是該版本API并不是很完善,創(chuàng)建的賬戶可能會(huì)有無法讀寫Topic的問題。如果你使用的Kafka還沒有升級(jí)到這么高的版本,請(qǐng)參考【Apache Kafka API AdminClient Scram賬戶的創(chuàng)建與刪除】這篇博客,其是針對(duì)2.7.0及其以下的Scram賬戶操作。更多內(nèi)容請(qǐng)點(diǎn)擊【Apache Kafka API AdminClient 目錄】。
操作Scram賬戶的方法
首先我們先看官方文檔中對(duì)于操作Scram賬戶是怎么說的,查詢官方的文檔只提供了一個(gè)方法:
| default AlterUserScramCredentialsResult | alterUserScramCredentials(List< UserScramCredentialAlteration> alterations) | Alter SASL/SCRAM credentials for the given users. |
| AlterUserScramCredentialsResult | alterUserScramCredentials(List< UserScramCredentialAlteration> alterations, AlterUserScramCredentialsOptions options) | Alter SASL/SCRAM credentials. |
根據(jù)官方給的文檔描述,上表中第二個(gè)方法是對(duì)第一個(gè)方法的擴(kuò)展,所以這里不做討論。我們主要集中于alterUserScramCredentials()方法的使用上。根據(jù)描述,該方法只接受一個(gè)泛型為UserScramCredentialAlteration 的List參數(shù),接著看下UserScramCredentialAlteration是什么樣子。
Field Summary
| protected | String user |
Constructor Summary
| protected | UserScramCredentialAlteration(String user) |
Method Summary
| String | user() |
根據(jù)官方文檔描述里面只有一個(gè)String類型的user字段,顯然不可能只根據(jù)這一個(gè)字段構(gòu)造一個(gè)能夠使用的賬戶。其實(shí)UserScramCredentialAlteration是一個(gè)抽象的類,它還有兩個(gè)子類UserScramCredentialDeletion和UserScramCredentialUpsertion。從名字就可以看出來,創(chuàng)建大概是UserScramCredentialUpsertion,刪除則應(yīng)該是UserScramCredentialDeletion。所以我們傳入的List中的泛型,也就是這兩個(gè)類的具體實(shí)現(xiàn)了。
創(chuàng)建一個(gè)Scram賬戶
UserScramCredentialUpsertion
官方文檔對(duì)UserScramCredentialUpsertion類的解釋是:
A request to update/insert a SASL/SCRAM credential for a user.
看來這個(gè)類不僅僅能做創(chuàng)建,而且也能做更新,大方向是沒問題了。首先我們還是看下創(chuàng)建UserScramCredentialUpsertion這個(gè)類要怎么構(gòu)造:
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password) | Constructor that generates a random salt |
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password, byte[] salt) | Constructor that accepts an explicit salt |
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, String password) | Constructor that generates a random salt |
從上述表格中可以看到這個(gè)類有三個(gè)構(gòu)造方法,構(gòu)建這個(gè)類除了需要父類提供的user名字以外,還需要另外兩個(gè)參數(shù)ScramCredentialInfo和password??梢钥吹狡鋵?shí)最后一個(gè)構(gòu)造方法password是String,這個(gè)可以說是最方便的了。第二個(gè)中為什么還要有一個(gè)salt參數(shù),筆者深入代碼只發(fā)現(xiàn)了一句話new Field("salt", Type.COMPACT_BYTES, "A random salt generated by the client.")??赡苁墙o用戶自定義什么功能用的,但是kafka自己也會(huì)生成這個(gè)東西,由于官方文檔和源碼都沒有找到更多的解釋,我們暫且忽略,如果哪位筆者知道這點(diǎn),也行評(píng)論區(qū)不吝賜教。
ScramCredentialInfo
那么我們就轉(zhuǎn)向ScramCredentialInfo這個(gè)類的構(gòu)造:
| ScramCredentialInfo(ScramMechanism mechanism, int iterations) | – |
可以看到這個(gè)類的構(gòu)建就比較簡單明了,兩個(gè)參數(shù)一個(gè)是ScramMechanism也就是Scram認(rèn)證機(jī)制,這是一個(gè)enum類型內(nèi)置SCRAM_SHA_256、SCRAM_SHA_512、UNKNOWN三個(gè)類型,UNKNOWN類型會(huì)在創(chuàng)建時(shí)報(bào)異常。另外一個(gè)iterations是循環(huán)次數(shù)the number of iterations used when creating the credential,代表你要?jiǎng)?chuàng)建的賬戶個(gè)數(shù),也就是List.size()吧。
示例代碼
public void createAccount(String name, String pwd, String salt) throws ExecutionException, InterruptedException {//創(chuàng)建User列表List<UserScramCredentialAlteration> alterations = new ArrayList<>();//構(gòu)造Scram認(rèn)證機(jī)制信息,這里筆者選擇了SCRAM_SHA_512,大家也可以選擇 ScramMechanism.SCRAM_SHA_256//alterations.size()此時(shí)為0,或許會(huì)報(bào)錯(cuò),可以試下傳入數(shù)字構(gòu)造,比如下面添加了一個(gè)認(rèn)證信息,那么這里傳入數(shù)字1。// ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, alterations.size()); //這里時(shí)間久遠(yuǎn),忘記當(dāng)時(shí)寫例子的場(chǎng)景了ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 10000);//三個(gè)UserScramCredentialAlteration構(gòu)造方法,三選一筆者選了一個(gè)最簡單的//UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes());//UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes(),salt.getBytes());UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd);//添加認(rèn)證信息到列表alterations.add(userScramCredentialAdd);//執(zhí)行方法,并拿到返回結(jié)果AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);//阻塞等待結(jié)果完成result.all().get(); }通過官網(wǎng)給出的解釋,我們知道對(duì)賬戶的更新操作也是使用UserScramCredentialUpsertion類進(jìn)行,因此只要保證user name一致即可對(duì)相應(yīng)的user中的內(nèi)容進(jìn)行更新,這點(diǎn)大家自己驗(yàn)證吧。
刪除一個(gè)Scram賬戶
刪除賬戶用的則是子類UserScramCredentialDeletion,一樣我們先看它的構(gòu)造方法:
| UserScramCredentialDeletion(String user, ScramMechanism mechanism) |
果然刪除就不會(huì)像增加或者修改一樣墨跡,只有一個(gè)構(gòu)造方法可用,參數(shù)上面也都說過了,直接上示例代碼:
public void deleteAccount(String name) throws ExecutionException, InterruptedException {//創(chuàng)建刪除列表List<UserScramCredentialAlteration> alterations = new ArrayList<>();//構(gòu)建刪除用的UserScramCredentialAlterationUserScramCredentialAlteration userScramCredentialDel=new UserScramCredentialDeletion(name,ScramMechanism.SCRAM_SHA_512);//添加認(rèn)證信息到列表alterations.add(userScramCredentialDel);//執(zhí)行方法,并拿到返回結(jié)果AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);//阻塞等待結(jié)果完成result.all().get(); }查詢Scram賬戶信息
說完對(duì)Scram賬戶的增刪改以后,剩下的自然就剩下查詢了,官方也提供了一個(gè)方法用于查詢:
| default DescribeUserScramCredentialsResult | describeUserScramCredentials() | Describe all SASL/SCRAM credentials. |
| default DescribeUserScramCredentialsResult | describeUserScramCredentials(List< String> users) | Describe SASL/SCRAM credentials for the given users. |
| DescribeUserScramCredentialsResult | describeUserScramCredentials(List< String> users, DescribeUserScramCredentialsOptions options) | Describe SASL/SCRAM credentials. |
可以看到大體上也是只有一個(gè)方法describeUserScramCredentials(),只不過對(duì)這個(gè)方法做了一個(gè)重載,用于指定查詢某些用戶的信息,參數(shù)很簡單不用多說,直接上示例代碼:
public void describeAccount() throws ExecutionException, InterruptedException {//***************************************查詢所有用戶信息*****************************************************//查詢所有的賬戶,這也是默認(rèn)方法DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials();//執(zhí)行方法,并拿到返回結(jié)果Map<String, UserScramCredentialsDescription> future = result.all().get();//輸出future.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]"));//***************************************這里是分割線*****************************************************//***************************************查詢指定的用戶信息*****************************************************//構(gòu)造指定的用戶列表List<String> userScramList=new ArrayList<>();//添加兩個(gè)用戶userScramList.add("user1");userScramList.add("user2");//傳入特定用戶列表執(zhí)行方法,并拿到返回結(jié)果DescribeUserScramCredentialsResult targetResult = adminClient.describeUserScramCredentials(userScramList);//執(zhí)行方法,并拿到返回結(jié)果Map<String, UserScramCredentialsDescription> targetFuture = targetResult.all().get();//輸出targetFuture.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]")); }結(jié)語
這一篇終于把之前的坑填上了,自從筆者不再需要對(duì)Kafka平臺(tái)進(jìn)行維護(hù)以后,Kafka的版本也發(fā)生了好多巨大的更新,一個(gè)最明顯的變化就是Kafka的去zookeeper操作。筆者之前操縱2.7.0的時(shí)候,Kafka官方還是警告2.8版本最好不要用于生產(chǎn)環(huán)境,而今天官方的態(tài)度已經(jīng)有了一個(gè)模糊的轉(zhuǎn)變,可見Kafka已經(jīng)大體上完成了對(duì)Zookeeper的解耦工作。由于筆者工作的變動(dòng)不再管理Kafka相關(guān)的平臺(tái)內(nèi)容,所以這部分內(nèi)容就會(huì)更新的比較慢,畢竟懶癌晚期的筆者能補(bǔ)上這個(gè)坑已經(jīng)算是祖墳冒煙了。不過好在筆者已經(jīng)夠把大多數(shù)關(guān)鍵的Kafka管理操作的API使用方法更新完畢,希望這些內(nèi)容對(duì)網(wǎng)上寥寥無幾的Kafka文檔有所補(bǔ)充,也希望這些博客能夠幫助到那些正在管理Kafka平臺(tái)的小伙伴們,希望以后還有機(jī)會(huì)對(duì)更新的版本進(jìn)行更深一步的探究吧。
總結(jié)
以上是生活随笔為你收集整理的Apache Kafka API AdminClient Scram账户的操作(增删改查)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 思岚科技机器人底盘价格揭秘
- 下一篇: 山东大学网络考试的计算机试题及答案,专科