sqoop2 java api实现_Sqoop2 Java客户端API指南
原文連接:http://sqoop.apache.org/docs/1.99.6/ClientAPI.html
Sqoop Java客戶端API指南
這篇文章秒描述了額如何在外部應用中使用sqoop java 客戶端API。通過客戶端API可以實現sqoop命令的功能。使用Sqoop客戶端API需要Sqoop 客戶端Jar文件及其依賴。
提供了支持sqool操作的封裝方法的主要類是:
public classSqoopClient {
...
}
Java Client API isexplained using Generic JDBC Connector example. Before executing theapplication using the sqoop client API, check whether sqoop server is running.
JAVA客戶端API會被通用的 JDBC連接器實例解釋。在執行使用sqoop客戶端API的應用前要確認sqoop服務器已經啟動了。
流程
在sqoop服務器中執行一個sqoop任務的流程如下:
1.?通過使用connectorId來建立一個LINK對象-創建Link對象并發返回linkId(lid)
2.?使用 from linkId和to linkId來創建一個JOB對象 – 創建已給Job對象并返回jobId(jid)
3.?啟動指定jobId的任務。在服務器上啟動任務并生成一條提交記錄。
工程依賴
maven 依賴
org.apache.sqoop
sqoop-client
${requestedVersion}
初始化啊
First initializethe SqoopClient class with server URL as argument.
首先使用服務器URL來初始化SqoopClient類實例
String url ="http://localhost:12000/sqoop/";
SqoopClient client= new SqoopClient(url);
可以使用客戶端對象的setServerUrl(String)方法來修改服務器的URL
client.setServerUrl(newUrl);
Link
連接器提供了和多個數據源進行交互的功能,因此在sqoop中用來作為在不同數據源中傳輸數據的方法。注冊的注冊器需要提供從其所代表的數據源中讀取數據和寫入數據的邏輯實現。連接器可以被一個或多個連接關聯。Java客戶端API允許用戶為某個注冊的 連接器創建,更新和刪除一個連接。創建和更新連接要求取出針對特定連接器的連接配置。因此最開始的事情就是獲取注冊的連接器的列表并選擇用來創建連接的連接器。然后就可以通過使用Display Config and Input Names For Connector來獲取該連接器的所有配置和輸入列表
保存連接
首先通過調用createLink(cid)方法,傳入一個連接器ID創建一個新的 連接并返回一個MLink對象,該對象包含一個id和對應連接器未設定的 連接配置。接著使用相關輸入填充配置。最后傳遞配置好的MLink調用saveLink方法保存連接
// create aplaceholder for link
long connectorId =1;
MLink link =client.createLink(connectorId);
link.setName("Vampire");
link.setCreationUser("Buffy");
MLinkConfig linkConfig = link.getConnectorLinkConfig();
// fill in the linkconfig values
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/my");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("root");
linkConfig.getStringInput("linkConfig.password").setValue("root");
// save the linkobject that was filled
Status status =client.saveLink(link);
if(status.canProceed()){
System.out.println("Created Link withLink Id : " + link.getPersistenceId());
} else {
System.out.println("Something went wrongcreating the link");
}
如果狀態是OK或WARNING,status.canProceed()方法返回true.在發布狀態之前,連接的配置會通過使用對應的驗證器進行驗證。
當savelink方法執行成功后,新的連接ID會分配到連接對象上,否則會拋出異常。
link.getPersistenceId()方法返回保存在sqoop倉庫中的link對象的唯一標識。
用戶可以通過以下方法來獲取一個連接
Method
Description
getLink(lid)
Returns a link by id
getLinks()
Returns list of links in the sqoop
Job
一個sqoop任務包含From和To部分,將數據從From數據源傳輸到To數據源。From和To部分都由其對應的連接器Ids來唯一標識。如創建一個任務我們必須指定FromLinkId和ToLinkId.因此創建一個任務的先決條件就是我們上面描述的創建連接。
當From和To的連接Id給定后,連接對象相關的連接器的任務配置需要配置。可以通過連接器的DisplayConfig and Input Names For Connector來回去所有的任務配置和輸入。一個連接器可以有一個或多個連接。我們使用MFromConfig和MToConfig對象分別對應于From和To連接。
除了配置From和To連接的任務配置外,我們還需要提供控制任務執行引擎環境的驅動配置。例如如果任務執行引擎是mapreduce,我們需要指定用來從From讀取數據的Map的數量。
保存任務
下面是創建和保存一個任務的代碼:
String url ="http://localhost:12000/sqoop/";
SqoopClient client= new SqoopClient(url);
//Creating dummyjob object
long fromLinkId =1;// for jdbc connector
long toLinkId = 2;// for HDFS connector
MJob job =client.createJob(fromLinkId, toLinkId);
job.setName("Vampire");
job.setCreationUser("Buffy");
// set the"FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
// set the"TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
// set the driverconfig values
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
Status status =client.saveJob(job);
if(status.canProceed()){
System.out.println("Created Job with JobId: "+ job.getPersistenceId());
} else {
System.out.println("Something went wrongcreating the job");
}
用戶可以通過下面的方法來獲取任務對象
Method
Description
getJob(jid)
Returns a job by id
getJobs()
Returns list of jobs in the sqoop
狀態碼列表
Function
Description
OK
沒有問題,沒有警告
WARNING
實體可以被正確的處理,沒有致命錯誤
ERROR
驗證實體有驗證的問題,這些問題不解決無法繼續處理
查看錯誤或警告驗證信息
當出現任何的警告和錯誤狀態時,用戶需要獲取驗證信息列表
printMessage(link.getConnectorLinkConfig().getConfigs());
private static voidprintMessage(List configs) {
for(MConfig config : configs) {
List> inputlist =config.getInputs();
if (config.getValidationMessages() != null){
// print every validation message
for(Message message :config.getValidationMessages()) {
System.out.println("Configvalidation message: " + message.getMessage());
}
}
for (MInput minput : inputlist) {
if (minput.getValidationStatus() ==Status.WARNING) {
for(Message message :config.getValidationMessages()) {
System.out.println("Config InputValidation Warning: " + message.getMessage());
}
}
else if (minput.getValidationStatus() ==Status.ERROR) {
for(Message message :config.getValidationMessages()) {
System.out.println("Config InputValidation Error: " + message.getMessage());
}
}
}
}
更新連接和任務
在倉庫中創建連接和任務之后,我們可以使用下面的方法來更新或刪除一個連接或任務。
Method
Description
updateLink(link)
通過傳遞link進行更新,會檢查任何的錯誤和警告
deleteLink(lid)
刪除連接,只有該連接當前沒有被任何任務使用時才可以刪除
updateJob(job)
新任務,會檢查任何的錯誤和警告
deleteJob(jid)
刪除任務
啟動任務
啟動任務需要指定任務Id,成功啟動之后,getStatus()方法會返回“BOOTING” or“RUNNING”.
//Job start
long jobId = 1;
MSubmission submission = client.startJob(jobId);
System.out.println("JobSubmission Status : " + submission.getStatus());
if(submission.getStatus().isRunning()&& submission.getProgress() != -1) {
System.out.println("Progress : " +String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoopjob id :" + submission.getExternalId());
System.out.println("Joblink : " + submission.getExternalLink());
Counters counters =submission.getCounters();
if(counters !=null) {
System.out.println("Counters:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo()!= null) {
System.out.println("Exception info :" +submission.getExceptionInfo());
}
//Check job statusfor a running job
MSubmissionsubmission = client.getJobStatus(jobId);
if(submission.getStatus().isRunning()&& submission.getProgress() != -1) {
System.out.println("Progress : " +String.format("%.2f %%", submission.getProgress() * 100));
}
//Stop a runningjob
submission.stopJob(jobId);
上面的代碼塊中任務的啟動時異步的。如果需要同步啟動任務,需要調用startJob(jid,callback,pollTime)方法。如果對獲取任務狀態不感興趣,可以指定callback參數為空,該方法將返回任務的最終狀態。pollTime用來指定從sqoop服務器獲取任務狀態的時間間隔,指定的值必須大于0.如果該值較小的話,我們會頻繁的連接sqoop服務器。如果同步任務中指定了非空的callback,會在成功啟動時首先調用callback的submitted(MSubmission)方法,然后每隔pollTime時間調用callback的updated(MSubmission)方法,最后在任務完成時調用callback.finished(MSubmission)方法。
總結
以上是生活随笔為你收集整理的sqoop2 java api实现_Sqoop2 Java客户端API指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java inputreader_Jav
- 下一篇: java美元兑换,(Java实现) 美元