spring boot提交mapreduce任务(任务用于导入文件内容到HBase)
package org.manlier.srapp.config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.manlier.srapp.index.RebuildIndexProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
import java.util.List;
@org.springframework.context.annotation.Configuration
public class HDPConfiguration {
private static final String DEP_LOCATION = "apprepo/";
private static final String INDEXER_MP_JAR = "hbase-indexer-mr-1.6-SNAPSHOT-job.jar";
private static final List<String> INDEXER_REBUILD_LIBS = Arrays.asList(
"hbase-indexer-phoenix-mapper-1.0.0.jar"
, "phoenix-core-4.13.1-HBase-1.2.jar"
);
private final RebuildIndexProperties properties;
@Autowired
public HDPConfiguration(RebuildIndexProperties properties) {
this.properties = properties;
}
@Bean
public Configuration hbaseConfiguration() {
return HBaseConfiguration.create();
}
@Bean
@Lazy
public ToolRunner rebuildIndexJarRunner() throws IOException {
ToolRunner toolRunner = new ToolRunner();
String tmpDir = System.getProperty("java.io.tmpdir");
Path indexerPath = Paths.get(tmpDir, INDEXER_MP_JAR);
Resource indexerRes = new ClassPathResource(DEP_LOCATION + INDEXER_MP_JAR);
deployDep(indexerRes, indexerPath);
toolRunner.setJar(new FileSystemResource(indexerPath.toFile()));
toolRunner.setArguments(properties.getParams());
toolRunner.setRunAtStartup(false);
toolRunner.setConfiguration(hbaseConfiguration());
List<Resource> resources = new LinkedList<>();
for (String localPath : INDEXER_REBUILD_LIBS) {
Path path1 = Paths.get(tmpDir, localPath);
Resource resource = new ClassPathResource(DEP_LOCATION + localPath);
deployDep(resource, path1);
resources.add(new FileSystemResource(path1.toFile()));
}
toolRunner.setLibs(resources.toArray(new Resource[]{}));
return toolRunner;
}
private void deployDep(Resource resource, Path path) throws IOException {
Files.copy(resource.getInputStream(), path, StandardCopyOption.REPLACE_EXISTING);
path.toFile().deleteOnExit();
}
}
package org.manlier.srapp.job;
import org.apache.hadoop.mapreduce.Job;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.data.hadoop.mapreduce.JobRunner;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.*;
import java.util.List;
@Service
public class JobExecutionServiceImpl implements JobExecutionService, DisposableBean {
private static final String APP_DEP_ARCHIVE_NAME = "application-dep.jar";
private final List<Job> jobs;
private final JobRunner compsImporter;
@Inject
public JobExecutionServiceImpl(List<Job> jobs
, JobRunner compsImporter) {
this.jobs = jobs;
this.compsImporter = compsImporter;
}
public void init() throws IOException {
String tmpDir = System.getProperty("java.io.tmpdir");
Resource resource = new ClassPathResource("apprepo/" + APP_DEP_ARCHIVE_NAME);
Path path = Paths.get(tmpDir, APP_DEP_ARCHIVE_NAME);
Files.copy(resource.getInputStream(), path, StandardCopyOption.REPLACE_EXISTING);
jobs.parallelStream().forEach(job -> job.setJar(path.toUri().toString()));
}
@Override
public void importComponents(org.apache.hadoop.fs.Path path) throws Exception {
compsImporter.call();
}
@Override
public void rebuildComponentsIndex(org.apache.hadoop.fs.Path path) throws Exception {
}
@Override
public void destroy() throws Exception {
String tmpDir = System.getProperty("java.io.tmpdir");
Path path = Paths.get(tmpDir, APP_DEP_ARCHIVE_NAME);
Files.delete(path);
}
}
package org.manlier.srapp.config;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.manlier.srapp.mapreduce.HBaseCompsDirectImporter;
import org.manlier.srapp.mapreduce.WholeFileInputFormat;
import org.manlier.srapp.storage.StorageProperties;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.*;
import org.springframework.data.hadoop.mapreduce.JobRunner;
import javax.inject.Singleton;
import java.io.IOException;
/**
* Hadoop 的配置文件
*/
@Configuration
public class HDPConfiguration {
private StorageProperties storageProperties;
public HDPConfiguration(StorageProperties storageProperties) {
this.storageProperties = storageProperties;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
return org.apache.hadoop.hbase.HBaseConfiguration.create(config);
}
@Bean
public FileSystem fileSystem() throws IOException {
return FileSystem.get(configuration());
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Connection connection() throws IOException {
return ConnectionFactory.createConnection(configuration());
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job compsImportJob() throws IOException {
Job job = Job.getInstance(configuration());
FileInputFormat.addInputPath(job, new Path(storageProperties.getDefaultComponentsDir()));
job.setInputFormatClass(WholeFileInputFormat.class);
job.setMapperClass(HBaseCompsDirectImporter.HBaseCompsMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
return job;
}
@Bean
@Singleton
public JobRunner jobRunner() throws IOException {
JobRunner jobRunner = new JobRunner();
jobRunner.setJob(compsImportJob());
return jobRunner;
}
}