日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

sqoop2 java api实现_Sqoop2 Java客户端API指南

發布時間:2024/1/23 java 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sqoop2 java api实现_Sqoop2 Java客户端API指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文連接:http://sqoop.apache.org/docs/1.99.6/ClientAPI.html

Sqoop Java客戶端API指南

這篇文章秒描述了額如何在外部應用中使用sqoop java 客戶端API。通過客戶端API可以實現sqoop命令的功能。使用Sqoop客戶端API需要Sqoop 客戶端Jar文件及其依賴。

提供了支持sqool操作的封裝方法的主要類是:

public classSqoopClient {

...

}

Java Client API isexplained using Generic JDBC Connector example. Before executing theapplication using the sqoop client API, check whether sqoop server is running.

JAVA客戶端API會被通用的 JDBC連接器實例解釋。在執行使用sqoop客戶端API的應用前要確認sqoop服務器已經啟動了。

流程

在sqoop服務器中執行一個sqoop任務的流程如下:

1.?通過使用connectorId來建立一個LINK對象-創建Link對象并發返回linkId(lid)

2.?使用 from linkId和to linkId來創建一個JOB對象 – 創建已給Job對象并返回jobId(jid)

3.?啟動指定jobId的任務。在服務器上啟動任務并生成一條提交記錄。

工程依賴

maven 依賴

org.apache.sqoop

sqoop-client

${requestedVersion}

初始化啊

First initializethe SqoopClient class with server URL as argument.

首先使用服務器URL來初始化SqoopClient類實例

String url ="http://localhost:12000/sqoop/";

SqoopClient client= new SqoopClient(url);

可以使用客戶端對象的setServerUrl(String)方法來修改服務器的URL

client.setServerUrl(newUrl);

Link

連接器提供了和多個數據源進行交互的功能,因此在sqoop中用來作為在不同數據源中傳輸數據的方法。注冊的注冊器需要提供從其所代表的數據源中讀取數據和寫入數據的邏輯實現。連接器可以被一個或多個連接關聯。Java客戶端API允許用戶為某個注冊的 連接器創建,更新和刪除一個連接。創建和更新連接要求取出針對特定連接器的連接配置。因此最開始的事情就是獲取注冊的連接器的列表并選擇用來創建連接的連接器。然后就可以通過使用Display Config and Input Names For Connector來獲取該連接器的所有配置和輸入列表

保存連接

首先通過調用createLink(cid)方法,傳入一個連接器ID創建一個新的 連接并返回一個MLink對象,該對象包含一個id和對應連接器未設定的 連接配置。接著使用相關輸入填充配置。最后傳遞配置好的MLink調用saveLink方法保存連接

// create aplaceholder for link

long connectorId =1;

MLink link =client.createLink(connectorId);

link.setName("Vampire");

link.setCreationUser("Buffy");

MLinkConfig linkConfig = link.getConnectorLinkConfig();

// fill in the linkconfig values

linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/my");

linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");

linkConfig.getStringInput("linkConfig.username").setValue("root");

linkConfig.getStringInput("linkConfig.password").setValue("root");

// save the linkobject that was filled

Status status =client.saveLink(link);

if(status.canProceed()){

System.out.println("Created Link withLink Id : " + link.getPersistenceId());

} else {

System.out.println("Something went wrongcreating the link");

}

如果狀態是OK或WARNING,status.canProceed()方法返回true.在發布狀態之前,連接的配置會通過使用對應的驗證器進行驗證。

當savelink方法執行成功后,新的連接ID會分配到連接對象上,否則會拋出異常。

link.getPersistenceId()方法返回保存在sqoop倉庫中的link對象的唯一標識。

用戶可以通過以下方法來獲取一個連接

Method

Description

getLink(lid)

Returns a link by id

getLinks()

Returns list of links in the sqoop

Job

一個sqoop任務包含From和To部分,將數據從From數據源傳輸到To數據源。From和To部分都由其對應的連接器Ids來唯一標識。如創建一個任務我們必須指定FromLinkId和ToLinkId.因此創建一個任務的先決條件就是我們上面描述的創建連接。

當From和To的連接Id給定后,連接對象相關的連接器的任務配置需要配置。可以通過連接器的DisplayConfig and Input Names For Connector來回去所有的任務配置和輸入。一個連接器可以有一個或多個連接。我們使用MFromConfig和MToConfig對象分別對應于From和To連接。

除了配置From和To連接的任務配置外,我們還需要提供控制任務執行引擎環境的驅動配置。例如如果任務執行引擎是mapreduce,我們需要指定用來從From讀取數據的Map的數量。

保存任務

下面是創建和保存一個任務的代碼:

String url ="http://localhost:12000/sqoop/";

SqoopClient client= new SqoopClient(url);

//Creating dummyjob object

long fromLinkId =1;// for jdbc connector

long toLinkId = 2;// for HDFS connector

MJob job =client.createJob(fromLinkId, toLinkId);

job.setName("Vampire");

job.setCreationUser("Buffy");

// set the"FROM" link job config values

MFromConfig fromJobConfig = job.getFromJobConfig();

fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");

fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");

fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");

// set the"TO" link job config values

MToConfig toJobConfig = job.getToJobConfig();

toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");

// set the driverconfig values

MDriverConfig driverConfig = job.getDriverConfig();

driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

Status status =client.saveJob(job);

if(status.canProceed()){

System.out.println("Created Job with JobId: "+ job.getPersistenceId());

} else {

System.out.println("Something went wrongcreating the job");

}

用戶可以通過下面的方法來獲取任務對象

Method

Description

getJob(jid)

Returns a job by id

getJobs()

Returns list of jobs in the sqoop

狀態碼列表

Function

Description

OK

沒有問題,沒有警告

WARNING

實體可以被正確的處理,沒有致命錯誤

ERROR

驗證實體有驗證的問題,這些問題不解決無法繼續處理

查看錯誤或警告驗證信息

當出現任何的警告和錯誤狀態時,用戶需要獲取驗證信息列表

printMessage(link.getConnectorLinkConfig().getConfigs());

private static voidprintMessage(List configs) {

for(MConfig config : configs) {

List> inputlist =config.getInputs();

if (config.getValidationMessages() != null){

// print every validation message

for(Message message :config.getValidationMessages()) {

System.out.println("Configvalidation message: " + message.getMessage());

}

}

for (MInput minput : inputlist) {

if (minput.getValidationStatus() ==Status.WARNING) {

for(Message message :config.getValidationMessages()) {

System.out.println("Config InputValidation Warning: " + message.getMessage());

}

}

else if (minput.getValidationStatus() ==Status.ERROR) {

for(Message message :config.getValidationMessages()) {

System.out.println("Config InputValidation Error: " + message.getMessage());

}

}

}

}

更新連接和任務

在倉庫中創建連接和任務之后,我們可以使用下面的方法來更新或刪除一個連接或任務。

Method

Description

updateLink(link)

通過傳遞link進行更新,會檢查任何的錯誤和警告

deleteLink(lid)

刪除連接,只有該連接當前沒有被任何任務使用時才可以刪除

updateJob(job)

新任務,會檢查任何的錯誤和警告

deleteJob(jid)

刪除任務

啟動任務

啟動任務需要指定任務Id,成功啟動之后,getStatus()方法會返回“BOOTING” or“RUNNING”.

//Job start

long jobId = 1;

MSubmission submission = client.startJob(jobId);

System.out.println("JobSubmission Status : " + submission.getStatus());

if(submission.getStatus().isRunning()&& submission.getProgress() != -1) {

System.out.println("Progress : " +String.format("%.2f %%", submission.getProgress() * 100));

}

System.out.println("Hadoopjob id :" + submission.getExternalId());

System.out.println("Joblink : " + submission.getExternalLink());

Counters counters =submission.getCounters();

if(counters !=null) {

System.out.println("Counters:");

for(CounterGroup group : counters) {

System.out.print("\t");

System.out.println(group.getName());

for(Counter counter : group) {

System.out.print("\t\t");

System.out.print(counter.getName());

System.out.print(": ");

System.out.println(counter.getValue());

}

}

}

if(submission.getExceptionInfo()!= null) {

System.out.println("Exception info :" +submission.getExceptionInfo());

}

//Check job statusfor a running job

MSubmissionsubmission = client.getJobStatus(jobId);

if(submission.getStatus().isRunning()&& submission.getProgress() != -1) {

System.out.println("Progress : " +String.format("%.2f %%", submission.getProgress() * 100));

}

//Stop a runningjob

submission.stopJob(jobId);

上面的代碼塊中任務的啟動時異步的。如果需要同步啟動任務,需要調用startJob(jid,callback,pollTime)方法。如果對獲取任務狀態不感興趣,可以指定callback參數為空,該方法將返回任務的最終狀態。pollTime用來指定從sqoop服務器獲取任務狀態的時間間隔,指定的值必須大于0.如果該值較小的話,我們會頻繁的連接sqoop服務器。如果同步任務中指定了非空的callback,會在成功啟動時首先調用callback的submitted(MSubmission)方法,然后每隔pollTime時間調用callback的updated(MSubmission)方法,最后在任務完成時調用callback.finished(MSubmission)方法。

總結

以上是生活随笔為你收集整理的sqoop2 java api实现_Sqoop2 Java客户端API指南的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。