kettle调用java程序_Kettle ETL调用 java代码来进行数据库的增删改查
发布日期:2021-06-24 11:21:33 浏览次数:3 分类:技术文章

本文共 10575 字,大约阅读时间需要 35 分钟。

1 packagecom.zxyp.kettle;2

3 importorg.pentaho.di.cluster.ClusterSchema;4 importorg.pentaho.di.cluster.SlaveServer;5 importorg.pentaho.di.core.KettleEnvironment;6 importorg.pentaho.di.core.database.DatabaseMeta;7 importorg.pentaho.di.core.exception.KettleException;8 importorg.pentaho.di.core.logging.LogLevel;9 importorg.pentaho.di.core.util.EnvUtil;10 importorg.pentaho.di.job.Job;11 importorg.pentaho.di.job.JobExecutionConfiguration;12 importorg.pentaho.di.job.JobMeta;13 importorg.pentaho.di.repository.RepositoryDirectoryInterface;14 importorg.pentaho.di.repository.kdr.KettleDatabaseRepository;15 importorg.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;16 importorg.pentaho.di.trans.Trans;17 importorg.pentaho.di.trans.TransExecutionConfiguration;18 importorg.pentaho.di.trans.TransMeta;19

20 public classKettleUtil {21

22 private String connetionName = "carte";23 private String databaseType = "MYSQL";24 private String connectionType = "Native(JDBC)";25 private String hostAddress = "192.168.10.147";26 private String databaseName = "kettle";27 private String databasePort = "3306";28 private String userName = "root";29 private String password = "root";30 private String repoName = "repo";31 private String repoUserName = "admin";32 private String repoPassword = "admin";33 private String repoJobDir = "/";34 private String repoTransDir = "/";35 private String slaveName = "master";36 private String slaveHostname = "192.168.10.147";37 private String slavePort = "8080";38 private String slaveUsername = "cluster";39 private String slavePassword = "cluster";40

41 publicString getConnetionName() {42 returnconnetionName;43 }44

45 public voidsetConnetionName(String connetionName) {46 this.connetionName =connetionName;47 }48

49 publicString getDatabaseType() {50 returndatabaseType;51 }52

53 public voidsetDatabaseType(String databaseType) {54 this.databaseType =databaseType;55 }56

57 publicString getConnectionType() {58 returnconnectionType;59 }60

61 public voidsetConnectionType(String connectionType) {62 this.connectionType =connectionType;63 }64

65 publicString getHostAddress() {66 returnhostAddress;67 }68

69 public voidsetHostAddress(String hostAddress) {70 this.hostAddress =hostAddress;71 }72

73 publicString getDatabaseName() {74 returndatabaseName;75 }76

77 public voidsetDatabaseName(String databaseName) {78 this.databaseName =databaseName;79 }80

81 publicString getDatabasePort() {82 returndatabasePort;83 }84

85 public voidsetDatabasePort(String databasePort) {86 this.databasePort =databasePort;87 }88

89 publicString getUserName() {90 returnuserName;91 }92

93 public voidsetUserName(String userName) {94 this.userName =userName;95 }96

97 publicString getPassword() {98 returnpassword;99 }100

101 public voidsetPassword(String password) {102 this.password =password;103 }104

105 publicString getRepoName() {106 returnrepoName;107 }108

109 public voidsetRepoName(String repoName) {110 this.repoName =repoName;111 }112

113 publicString getRepoUserName() {114 returnrepoUserName;115 }116

117 public voidsetRepoUserName(String repoUserName) {118 this.repoUserName =repoUserName;119 }120

121 publicString getRepoPassword() {122 returnrepoPassword;123 }124

125 public voidsetRepoPassword(String repoPassword) {126 this.repoPassword =repoPassword;127 }128

129 publicString getRepoJobDir() {130 returnrepoJobDir;131 }132

133 public voidsetRepoJobDir(String repoJobDir) {134 this.repoJobDir =repoJobDir;135 }136

137 publicString getRepoTransDir() {138 returnrepoTransDir;139 }140

141 public voidsetRepoTransDir(String repoTransDir) {142 this.repoTransDir =repoTransDir;143 }144

145 publicString getSlaveName() {146 returnslaveName;147 }148

149 public voidsetSlaveName(String slaveName) {150 this.slaveName =slaveName;151 }152

153 publicString getSlaveHostname() {154 returnslaveHostname;155 }156

157 public voidsetSlaveHostname(String slaveHostname) {158 this.slaveHostname =slaveHostname;159 }160

161 publicString getSlavePort() {162 returnslavePort;163 }164

165 public voidsetSlavePort(String slavePort) {166 this.slavePort =slavePort;167 }168

169 publicString getSlaveUsername() {170 returnslaveUsername;171 }172

173 public voidsetSlaveUsername(String slaveUsername) {174 this.slaveUsername =slaveUsername;175 }176

177 publicString getSlavePassword() {178 returnslavePassword;179 }180

181 public voidsetSlavePassword(String slavePassword) {182 this.slavePassword =slavePassword;183 }184

185 /**

186 * 调用trans文件187 *@paramtransFileName188 *@throwsException189 */

190 public static void callNativeTrans(String transFileName) throwsException{191 callNativeTransWithParams(null, transFileName);192 }193

194 /**

195 * 调用trans文件 带参数的196 *@paramparams197 *@paramtransFileName198 *@throwsException199 */

200 public static void callNativeTransWithParams(String[] params ,String transFileName) throwsException{201 //初始化

202 KettleEnvironment.init();203 EnvUtil.environmentInit();204 TransMeta transMeta = newTransMeta(transFileName);205 //转换

206 Trans trans = newTrans(transMeta);207 //执行

208 trans.execute(params);209 //等待结束

210 trans.waitUntilFinished();211 //抛出异常

212 if(trans.getErrors() > 0){213 throw new Exception("There are errors during transformation exception!(传输过程中发生异常)");214 }215 }216

217 /**

218 * 调用job文件219 *@paramjobName220 *@throwsException221 */

222 public static void callNativeJob(String jobName) throwsException{223 //初始化

224 KettleEnvironment.init();225

226 JobMeta jobMeta = new JobMeta(jobName,null);227 Job job = new Job(null, jobMeta);228 //向Job 脚本传递参数,脚本中获取参数值:${参数名}229 //job.setVariable(paraname, paravalue);

230 job.start();231 job.waitUntilFinished();232 if (job.getErrors() > 0) {233 throw new Exception("There are errors during job exception!(执行job发生异常)");234 }235 }236 /**

237 * 资源库连接238 *@return连接到的资源库239 *@throwsKettleException240 */

241 public static Object RepositoryCon() throwsKettleException {242 //初始化243 //EnvUtil.environmentInit();

244 KettleEnvironment.init();245 //数据库连接元对象

246 DatabaseMeta dataMeta = new DatabaseMeta(new KettleUtil().getConnetionName(), new KettleUtil().getDatabaseType(), new KettleUtil().getConnetionName(),new KettleUtil().getHostAddress(), new KettleUtil().getDatabaseName(), newKettleUtil().getDatabasePort(),247 new KettleUtil().getUserName(), newKettleUtil().getPassword());248 //数据库形式的资源库元对象

249 KettleDatabaseRepositoryMeta repInfo = newKettleDatabaseRepositoryMeta();250 //251 repInfo.setConnection(dataMeta);252 repInfo.setName(newKettleUtil().getRepoName());253 //数据库形式的资源库对象

254 KettleDatabaseRepository rep = newKettleDatabaseRepository();255 //用资源库元对象初始化资源库对象

256 rep.init(repInfo);257 //连接到资源库

258 rep.connect(new KettleUtil().getRepoUserName(), new KettleUtil().getRepoPassword());//默认的连接资源库的用户名和密码

259 if(rep.isConnected()) {260 System.out.println("连接成功");261 returnrep;262 } else{263 System.out.println("连接失败");264 return null;265 }266 }267 /**

268 * 以子服务方式执行资源库中的job269 *@paramrep270 *@paramjobName271 */

272 public static voidrunJob(KettleDatabaseRepository rep, String jobName) {273 try{274 RepositoryDirectoryInterface dir = rep.findDirectory(new KettleUtil().getRepoJobDir());//根据指定的字符串路径 找到目录275 //加载指定的job

276 JobMeta jobMeta = rep.loadJob(rep.getJobId(jobName, dir), null);277 Job job = newJob(rep, jobMeta);278

279 //设置参数280 //jobMeta.setParameterValue("method", "update");281 //jobMeta.setParameterValue("tsm5", "07bb40f7200448b3a544786dc5e28845");282 //jobMeta.setParameterValue("args"," {'fkid':'07bb40f7200448b3a544786dc5e28845','svctype':'Diffwkrlifehelp','content':'更新3','sysuuid':'01ee0e61f357476b8dbb4be49ddecc77','uid':'1033','role':'3999','posi':'2999'}");

283

284 job.setLogLevel(LogLevel.BASIC);285 //设置slaveserver信息

286 SlaveServer ssi = newSlaveServer();287 ssi.setHostname(newKettleUtil().getSlaveHostname());288 ssi.setPort(newKettleUtil().getSlavePort());289 ssi.setName(newKettleUtil().getSlaveName());290 ssi.setUsername(newKettleUtil().getSlaveUsername());291 ssi.setPassword(newKettleUtil().getSlavePassword());292 //为job设置slaveserve

293 job.setExecutingServer(newKettleUtil().getSlaveName());294 //ClusterSchema cluster = jobgetTransMeta().findFirstUsedClusterSchema();

295 JobExecutionConfiguration jobExecutionConfiguration = newJobExecutionConfiguration();296 jobExecutionConfiguration.setExecutingLocally(false);297 jobExecutionConfiguration.setExecutingRemotely(true);298 jobExecutionConfiguration.setRemoteServer(ssi);299 jobExecutionConfiguration.setRepository(rep);300

301 String carteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, rep, null);302 System.out.println(carteObjectId);303

304 /*普通执行305 job.run();306 job.waitUntilFinished();// 等待job执行完;307 job.setFinished(true);308 System.out.println(job.getResult());309 */

310 } catch(Exception e) {311 e.printStackTrace();312 }313 }314

315 public static voidrunTrans(KettleDatabaseRepository rep,String transName){316 try{317 RepositoryDirectoryInterface dir = rep.findDirectory(new KettleUtil().getRepoTransDir());//根据指定的字符串路径 找到目录

318 TransMeta tmeta = rep.loadTransformation(rep.getTransformationID(transName, dir), null);319 //设置参数320 //tmeta.setParameterValue("", "");

321 Trans trans = newTrans(tmeta);322 ClusterSchema cluster =trans.getTransMeta().findFirstUsedClusterSchema();323 if (cluster != null) {324 TransExecutionConfiguration executionConfiguration = newTransExecutionConfiguration();325 executionConfiguration.setExecutingLocally(false);326 executionConfiguration.setExecutingRemotely(false);327 executionConfiguration.setExecutingClustered(true); //如果有就设置以集群方式运行

328 executionConfiguration.setClusterPosting(true);329 executionConfiguration.setClusterPreparing(true);330 executionConfiguration.setClusterStarting(true);331 executionConfiguration.setClusterShowingTransformation(false);332 executionConfiguration.setSafeModeEnabled(false);333 executionConfiguration.setRepository(rep);334 executionConfiguration.setLogLevel(LogLevel.BASIC);335 executionConfiguration.setVariables(trans.getTransMeta());336 TransMeta transMeta =trans.getTransMeta();337 try{338 Trans.executeClustered(transMeta, executionConfiguration);339 System.out.println("执行完毕");340 } catch(Exception e) {341 e.printStackTrace();342 }343 } else {/*扩展元数据注入的转换可以集群运行--结束代码*/

344 trans.startThreads();345 while (!trans.isFinished() && !trans.isStopped()) {346

347 }348 if(trans.getErrors()>0){349 System.out.println("有异常");350 }351 System.exit(0);352 }353

354 /*普通执行355 trans.execute(null);//执行trans356 trans.waitUntilFinished();357 if(trans.getErrors()>0){358 System.out.println("有异常");359 }360 */

361 }catch(Exception e){362 e.printStackTrace();363 }364 }365

366

367 }

转载地址:https://blog.csdn.net/weixin_32467749/article/details/114303897 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:mysql 取两个时间差 php_在php和MySql中计算时间差的方法详解
下一篇:java坐标代码_java实现计算地理坐标之间的距离

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月07日 05时49分14秒