您好,欢迎访问一九零五行业门户网

sqoop client java api将mysql的数据导到hdfs

mysqlsqoophdfshadoop
package com.hadoop.recommend; import org.apache.sqoop.client.sqoopclient;import org.apache.sqoop.model.mdriverconfig; import org.apache.sqoop.model.mfromconfig;import org.apache.sqoop.model.mjob; import org.apache.sqoop.model.mlink; import org.apache.sqoop.model.mlinkconfig; import org.apache.sqoop.model.msubmission; import org.apache.sqoop.model.mtoconfig; import org.apache.sqoop.submission.counter.counter; import org.apache.sqoop.submission.counter.countergroup; import org.apache.sqoop.submission.counter.counters; import org.apache.sqoop.validation.status; public class mysqltohdfs { public static void main(string[] args) { sqooptransfer(); } public static void sqooptransfer() { //初始化 string url = "http://master:12000/sqoop/"; sqoopclient client = new sqoopclient(url); //创建一个源链接 jdbc long fromconnectorid = 2; mlink fromlink = client.createlink(fromconnectorid); fromlink.setname("jdbc connector"); fromlink.setcreationuser("hadoop"); mlinkconfig fromlinkconfig = fromlink.getconnectorlinkconfig(); fromlinkconfig.getstringinput("linkconfig.connectionstring").setvalue("jdbc:mysql://master:3306/hive"); fromlinkconfig.getstringinput("linkconfig.jdbcdriver").setvalue("com.mysql.jdbc.driver"); fromlinkconfig.getstringinput("linkconfig.username").setvalue("root"); fromlinkconfig.getstringinput("linkconfig.password").setvalue(""); status fromstatus = client.savelink(fromlink); if(fromstatus.canproceed()) { system.out.println("创建jdbc link成功,id为: " + fromlink.getpersistenceid()); } else { system.out.println("创建jdbc link失败"); } //创建一个目的地链接hdfs long toconnectorid = 1; mlink tolink = client.createlink(toconnectorid); tolink.setname("hdfs connector"); tolink.setcreationuser("hadoop"); mlinkconfig tolinkconfig = tolink.getconnectorlinkconfig(); tolinkconfig.getstringinput("linkconfig.uri").setvalue("hdfs://master:9000/"); status tostatus = client.savelink(tolink); if(tostatus.canproceed()) { system.out.println("创建hdfs link成功,id为: " + tolink.getpersistenceid()); } else { system.out.println("创建hdfs link失败"); } //创建一个任务 long fromlinkid = fromlink.getpersistenceid(); long tolinkid = tolink.getpersistenceid(); mjob job = client.createjob(fromlinkid, tolinkid); job.setname("mysql to hdfs job"); job.setcreationuser("hadoop"); //设置源链接任务配置信息 mfromconfig fromjobconfig = job.getfromjobconfig(); fromjobconfig.getstringinput("fromjobconfig.schemaname").setvalue("sqoop"); fromjobconfig.getstringinput("fromjobconfig.tablename").setvalue("sqoop"); fromjobconfig.getstringinput("fromjobconfig.partitioncolumn").setvalue("id"); mtoconfig tojobconfig = job.gettojobconfig(); tojobconfig.getstringinput("tojobconfig.outputdirectory").setvalue("/user/hdfs/recommend"); mdriverconfig driverconfig = job.getdriverconfig(); driverconfig.getstringinput("throttlingconfig.numextractors").setvalue("3"); status status = client.savejob(job); if(status.canproceed()) { system.out.println("job创建成功,id为: "+ job.getpersistenceid()); } else { system.out.println("job创建失败。"); } //启动任务 long jobid = job.getpersistenceid(); msubmission submission = client.startjob(jobid); system.out.println("job提交状态为 : " + submission.getstatus()); while(submission.getstatus().isrunning() && submission.getprogress() != -1) { system.out.println("进度 : " + string.format("%.2f %%", submission.getprogress() * 100)); //三秒报告一次进度 try { thread.sleep(3000); } catch (interruptedexception e) { e.printstacktrace(); } } system.out.println("job执行结束... ..."); system.out.println("hadoop任务id为 :" + submission.getexternalid()); counters counters = submission.getcounters(); if(counters != null) { system.out.println("计数器:"); for(countergroup group : counters) { system.out.print("\t"); system.out.println(group.getname()); for(counter counter : group) { system.out.print("\t\t"); system.out.print(counter.getname()); system.out.print(": "); system.out.println(counter.getvalue()); } } } if(submission.getexceptioninfo() != null) { system.out.println("job执行异常,异常信息为 : " +submission.getexceptioninfo()); } system.out.println("mysql通过sqoop传输数据到hdfs统计执行完毕"); } }
报了这个错失咋回事??
其它类似信息

推荐信息