Commit fe529b2c authored by yiming's avatar yiming

initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hdp.piplus</groupId>
<artifactId>piplus-lts</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<lts.version>1.6.5</lts.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.lts</groupId>
<artifactId>lts-startup-tasktracker</artifactId>
<version>${lts.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>lts-startup-jobtracker</artifactId>
<version>${lts.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>lts-jobclient</artifactId>
<version>${lts.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.hdp.piplus.lts;
import com.google.common.base.Splitter;
import com.hdp.piplus.lts.jobclient.JobClientFactory;
import com.hdp.piplus.lts.jobtracker.JobTrackerLauncher;
import com.hdp.piplus.lts.tasktracker.TaskTrackerLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
@EnableConfigurationProperties
public class LtsApp implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(LtsApp.class);
@Autowired
private ApplicationContext context;
@Value("${piplus.lts.services}")
private String ltsServices;
@Autowired
private JobTrackerLauncher jobTrackerLauncher;
@Autowired
private TaskTrackerLauncher taskTrackerLauncher;
public static void main(String[] args) throws Exception {
SpringApplication.run(LtsApp.class, args);
}
@Override
public void run(String... args) throws Exception {
for (String service: Splitter.on(",").split(ltsServices)) {
if (service.equalsIgnoreCase("jobtracker")) {
logger.info("=====启动jobtracker=====");
jobTrackerLauncher.launch();
} else if (service.equalsIgnoreCase("tasktracker")) {
logger.info("=====启动tasktracker=====");
taskTrackerLauncher.launch();
} else if (service.equalsIgnoreCase("jobClient")) {
logger.info("=====启动jobClient=====");
JobClientFactory.init(context);
JobClientFactory.getJobClient();
} else {
logger.warn("未知服务:" + service);
}
}
}
}
package com.hdp.piplus.lts.jobclient;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.listener.MasterChangeListener;
import com.lts.jobclient.JobClient;
import com.lts.spring.JobClientFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Properties;
@Component
@Lazy
public class BootJobClientFactoryBean extends JobClientFactoryBean {
@Autowired
private Environment env;
/**
* 使用Spring Boot配置机制配置JobClientFactoryBean
* @see JobClientFactoryBean
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
setClusterName(env.getProperty("clusterName"));
setRegistryAddress(env.getProperty("registryAddress"));
setNodeGroup(env.getProperty("jobclient.nodeGroup"));
String values = env.getProperty("jobclient.masterChangeListeners");
if (StringUtils.isNotEmpty(values)) {
List<MasterChangeListener> listeners = Lists.newArrayList();
for (String clsName: Splitter.on(",").split(values)) {
listeners.add((MasterChangeListener) Class.forName(clsName).newInstance());
}
setMasterChangeListeners(listeners.toArray(new MasterChangeListener[listeners.size()]));
}
Properties configs = new Properties();
configs.setProperty("job.fail.store", env.getProperty("configs.job.fail.store"));
setConfigs(configs);
super.afterPropertiesSet();
}
}
package com.hdp.piplus.lts.jobclient;
import com.google.common.base.Preconditions;
import com.lts.jobclient.JobClient;
import org.springframework.context.ApplicationContext;
public class JobClientFactory {
private static ApplicationContext context;
private static JobClient jobClient;
public static synchronized void init(ApplicationContext ctx) {
context = ctx;
}
public static synchronized JobClient getJobClient() {
Preconditions.checkNotNull(context, "JobClientFactory未初始化");
if (jobClient == null) {
jobClient = context.getBean(JobClient.class);
jobClient.start();
}
return jobClient;
}
}
package com.hdp.piplus.lts.jobclient;
import com.lts.core.domain.Job;
import com.lts.jobclient.JobClient;
import com.lts.jobclient.domain.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("job")
public class TestJobClientController {
private static Logger logger = LoggerFactory.getLogger(TestJobClientController.class);
@RequestMapping(value = "submit", method = RequestMethod.POST)
public Response testJobSubmit(@RequestBody Job job) {
JobClient jobClient = JobClientFactory.getJobClient();
return jobClient.submitJob(job);
}
}
package com.hdp.piplus.lts.jobclient;
import com.lts.core.cluster.Node;
import com.lts.core.listener.MasterChangeListener;
public class TestMasterChangeListener implements MasterChangeListener {
@Override
public void change(Node master, boolean isMaster) {
}
}
package com.hdp.piplus.lts.jobtracker;
import com.lts.core.commons.utils.StringUtils;
import com.lts.jobtracker.JobTracker;
import com.lts.jobtracker.support.policy.OldDataDeletePolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
public class JobTrackerLauncher {
@Autowired
private Environment env;
public void launch() {
JobTracker jobTracker = new JobTracker();
jobTracker.setRegistryAddress(env.getProperty("registryAddress"));
jobTracker.setClusterName(env.getProperty("clusterName"));
jobTracker.setListenPort(Integer.parseInt(env.getProperty("jobtracker.listenPort")));
String value = env.getProperty("jobtracker.bindIp");
if (StringUtils.isNotEmpty(value)) {
jobTracker.setBindIp(value);
}
// 删除feedbackQueue中超过30天的数据
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
// 不能枚举,悲剧
jobTracker.addConfig("job.logger", env.getProperty("configs.job.logger"));
jobTracker.addConfig("zk.client", env.getProperty("configs.zk.client"));
jobTracker.addConfig("job.queue", env.getProperty("configs.job.queue"));
jobTracker.addConfig("jdbc.url", env.getProperty("configs.jdbc.url"));
jobTracker.addConfig("jdbc.username", env.getProperty("configs.jdbc.username"));
jobTracker.addConfig("jdbc.password", env.getProperty("configs.jdbc.password"));
jobTracker.addConfig("mongo.addresses", env.getProperty("configs.mongo.addresses"));
jobTracker.addConfig("mongo.database", env.getProperty("configs.mongo.database"));
jobTracker.addConfig("mongo.username", env.getProperty("configs.mongo.username"));
jobTracker.addConfig("mongo.password", env.getProperty("configs.mongo.password"));
jobTracker.start();
Runtime.getRuntime().addShutdownHook(new Thread(jobTracker::stop));
}
}
package com.hdp.piplus.lts.tasktracker;
import com.lts.spring.TaskTrackerAnnotationFactoryBean;
import com.lts.startup.TaskTrackerFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
@Lazy
public class BootTaskTrackerFactoryBean extends TaskTrackerAnnotationFactoryBean {
@Autowired
private Environment env;
/**
* 使用Spring Boot配置机制配置TaskTrackerAnnotationFactoryBean
* @see TaskTrackerFactoryBean
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
setClusterName(env.getProperty("clusterName"));
setRegistryAddress(env.getProperty("registryAddress"));
setNodeGroup(env.getProperty("tasktracker.nodeGroup"));
setWorkThreads(Integer.parseInt(env.getProperty("tasktracker.workThreads")));
setJobRunnerClass(Class.forName(env.getProperty("tasktracker.jobRunnerClass")));
setBizLoggerLevel(env.getProperty("tasktracker.bizLoggerLevel"));
Properties props = new Properties();
props.put("job.fail.store", env.getProperty("configs.job.fail.store"));
setConfigs(props);
super.afterPropertiesSet();
}
}
package com.hdp.piplus.lts.tasktracker;
import com.lts.tasktracker.TaskTracker;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class TaskTrackerLauncher {
@Autowired
private ApplicationContext context;
public void launch() {
TaskTracker taskTracker = context.getBean(TaskTracker.class);
taskTracker.start();
Runtime.getRuntime().addShutdownHook(new Thread(taskTracker::stop));
}
}
package com.hdp.piplus.lts.tasktracker;
import com.lts.core.domain.Action;
import com.lts.core.domain.Job;
import com.lts.spring.TaskTrackerAnnotationFactoryBean;
import com.lts.tasktracker.Result;
import com.lts.tasktracker.runner.JobRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
/**
* TaskTrackerAnnotationFactoryBean会用适合的方式在spring中注册该类,不需要再用@Component注册了。
* @see TaskTrackerAnnotationFactoryBean#registerRunnerBeanDefinition()
*/
public class TestJobRunner implements JobRunner {
private static final Logger logger = LoggerFactory.getLogger(TestJobRunner.class);
@Value("${spring.application.name}")
private String applicationName;
@Override
public Result run(Job job) throws Throwable {
logger.info(applicationName + " 正在运行任务" + job.getTaskId());
Thread.sleep(20000);
return new Result(Action.EXECUTE_SUCCESS, "执行成功啦~");
}
}
spring.application.name=piplus-lts
##################################### 共用配置 ###############################
# 注册中心地址,可以是zk,也可以是redis
registryAddress=zookeeper://127.0.0.1:2181
# 集群名称
clusterName=test_cluster
# LTS业务日志, 可选值 console, mysql, mongo
configs.job.logger=mysql
# zk客户端,可选值 zkclient, curator
configs.zk.client=zkclient
# ---------以下是任务队列配置-----------
# 任务队列,可选值 mysql, mongo
configs.job.queue=mysql
# ------ 1. 如果是mysql作为任务队列 ------
configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
configs.jdbc.username=root
configs.jdbc.password=123456
# ------ 2. 如果是mongo作为任务队列 ------
configs.mongo.addresses=127.0.0.1:27017
configs.mongo.database=lts
# configs.mongo.username=xxx #如果有的话
# configs.mongo.password=xxx #如果有的话
# FailStore 存储引擎
configs.job.fail.store=leveldb
########################################## jobtracker #######################################
# JobTracker的监听端口
jobtracker.listenPort=3502
######################################## tasktracker ########################################
# 节点组名称
tasktracker.nodeGroup=testTaskTrackerGroup
# JobRunner 任务执行类
tasktracker.jobRunnerClass=com.hdp.piplus.lts.tasktracker.TestJobRunner
# 运行线程数
tasktracker.workThreads=10
# 通过BizLogger 记录的日志
tasktracker.bizLoggerLevel=INFO
# 使用TaskTracker使用Spring,使用的话也要自己引入Spring相关jar
#useSpring=false
# 配置 spring配置文件路径,用逗号隔开,如: classpath*:spring/*.xml
#springXmlPaths=
##################################### jobclient ##############################################
# 节点组名词
jobclient.nodeGroup=test_jobClient
# master节点变化监听器
jobclient.masterChangeListeners=com.hdp.piplus.lts.jobclient.TestMasterChangeListener
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment