mysqlsqoophdfshadoop
package com.hadoop.recommend;
import org.apache.sqoop.client.sqoopclient;import org.apache.sqoop.model.mdriverconfig;
import org.apache.sqoop.model.mfromconfig;import org.apache.sqoop.model.mjob;
import org.apache.sqoop.model.mlink;
import org.apache.sqoop.model.mlinkconfig;
import org.apache.sqoop.model.msubmission;
import org.apache.sqoop.model.mtoconfig;
import org.apache.sqoop.submission.counter.counter;
import org.apache.sqoop.submission.counter.countergroup;
import org.apache.sqoop.submission.counter.counters;
import org.apache.sqoop.validation.status;
public class mysqltohdfs {
public static void main(string[] args) {
sqooptransfer();
}
public static void sqooptransfer() {
//初始化
string url = "http://master:12000/sqoop/";
sqoopclient client = new sqoopclient(url);
//创建一个源链接 jdbc
long fromconnectorid = 2;
mlink fromlink = client.createlink(fromconnectorid);
fromlink.setname("jdbc connector");
fromlink.setcreationuser("hadoop");
mlinkconfig fromlinkconfig = fromlink.getconnectorlinkconfig();
fromlinkconfig.getstringinput("linkconfig.connectionstring").setvalue("jdbc:mysql://master:3306/hive");
fromlinkconfig.getstringinput("linkconfig.jdbcdriver").setvalue("com.mysql.jdbc.driver");
fromlinkconfig.getstringinput("linkconfig.username").setvalue("root");
fromlinkconfig.getstringinput("linkconfig.password").setvalue("");
status fromstatus = client.savelink(fromlink);
if(fromstatus.canproceed()) {
system.out.println("创建jdbc link成功,id为: " + fromlink.getpersistenceid());
} else {
system.out.println("创建jdbc link失败");
}
//创建一个目的地链接hdfs
long toconnectorid = 1;
mlink tolink = client.createlink(toconnectorid);
tolink.setname("hdfs connector");
tolink.setcreationuser("hadoop");
mlinkconfig tolinkconfig = tolink.getconnectorlinkconfig();
tolinkconfig.getstringinput("linkconfig.uri").setvalue("hdfs://master:9000/");
status tostatus = client.savelink(tolink);
if(tostatus.canproceed()) {
system.out.println("创建hdfs link成功,id为: " + tolink.getpersistenceid());
} else {
system.out.println("创建hdfs link失败");
}
//创建一个任务
long fromlinkid = fromlink.getpersistenceid();
long tolinkid = tolink.getpersistenceid();
mjob job = client.createjob(fromlinkid, tolinkid);
job.setname("mysql to hdfs job");
job.setcreationuser("hadoop");
//设置源链接任务配置信息
mfromconfig fromjobconfig = job.getfromjobconfig();
fromjobconfig.getstringinput("fromjobconfig.schemaname").setvalue("sqoop");
fromjobconfig.getstringinput("fromjobconfig.tablename").setvalue("sqoop");
fromjobconfig.getstringinput("fromjobconfig.partitioncolumn").setvalue("id");
mtoconfig tojobconfig = job.gettojobconfig();
tojobconfig.getstringinput("tojobconfig.outputdirectory").setvalue("/user/hdfs/recommend");
mdriverconfig driverconfig = job.getdriverconfig();
driverconfig.getstringinput("throttlingconfig.numextractors").setvalue("3");
status status = client.savejob(job);
if(status.canproceed()) {
system.out.println("job创建成功,id为: "+ job.getpersistenceid());
} else {
system.out.println("job创建失败。");
}
//启动任务
long jobid = job.getpersistenceid();
msubmission submission = client.startjob(jobid);
system.out.println("job提交状态为 : " + submission.getstatus());
while(submission.getstatus().isrunning() && submission.getprogress() != -1) {
system.out.println("进度 : " + string.format("%.2f %%", submission.getprogress() * 100));
//三秒报告一次进度
try {
thread.sleep(3000);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
system.out.println("job执行结束... ...");
system.out.println("hadoop任务id为 :" + submission.getexternalid());
counters counters = submission.getcounters();
if(counters != null) {
system.out.println("计数器:");
for(countergroup group : counters) {
system.out.print("\t");
system.out.println(group.getname());
for(counter counter : group) {
system.out.print("\t\t");
system.out.print(counter.getname());
system.out.print(": ");
system.out.println(counter.getvalue());
}
}
}
if(submission.getexceptioninfo() != null) {
system.out.println("job执行异常,异常信息为 : " +submission.getexceptioninfo());
}
system.out.println("mysql通过sqoop传输数据到hdfs统计执行完毕");
}
}
报了这个错失咋回事??