GlassyWing
4/14/2018 - 6:04 AM

spring boot提交mapreduce任务

spring boot提交mapreduce任务(任务用于导入文件内容到HBase)

package org.manlier.srapp.config;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.manlier.srapp.index.RebuildIndexProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.util.Arrays;
import java.util.List;

@org.springframework.context.annotation.Configuration
public class HDPConfiguration {

    private static final String DEP_LOCATION = "apprepo/";
    private static final String INDEXER_MP_JAR = "hbase-indexer-mr-1.6-SNAPSHOT-job.jar";
    private static final List<String> INDEXER_REBUILD_LIBS = Arrays.asList(
            "hbase-indexer-phoenix-mapper-1.0.0.jar"
            , "phoenix-core-4.13.1-HBase-1.2.jar"
    );

    private final RebuildIndexProperties properties;

    @Autowired
    public HDPConfiguration(RebuildIndexProperties properties) {
        this.properties = properties;
    }

    @Bean
    public Configuration hbaseConfiguration() {
        return HBaseConfiguration.create();
    }

    @Bean
    @Lazy
    public ToolRunner rebuildIndexJarRunner() throws IOException {
        ToolRunner toolRunner = new ToolRunner();

        String tmpDir = System.getProperty("java.io.tmpdir");
        Path indexerPath = Paths.get(tmpDir, INDEXER_MP_JAR);
        Resource indexerRes = new ClassPathResource(DEP_LOCATION + INDEXER_MP_JAR);
        deployDep(indexerRes, indexerPath);
        toolRunner.setJar(new FileSystemResource(indexerPath.toFile()));
        toolRunner.setArguments(properties.getParams());
        toolRunner.setRunAtStartup(false);
        toolRunner.setConfiguration(hbaseConfiguration());

        List<Resource> resources = new LinkedList<>();
        for (String localPath : INDEXER_REBUILD_LIBS) {
            Path path1 = Paths.get(tmpDir, localPath);
            Resource resource = new ClassPathResource(DEP_LOCATION + localPath);
            deployDep(resource, path1);
            resources.add(new FileSystemResource(path1.toFile()));
        }
        toolRunner.setLibs(resources.toArray(new Resource[]{}));
        return toolRunner;
    }

    private void deployDep(Resource resource, Path path) throws IOException {
        Files.copy(resource.getInputStream(), path, StandardCopyOption.REPLACE_EXISTING);
        path.toFile().deleteOnExit();
    }
}
package org.manlier.srapp.job;

import org.apache.hadoop.mapreduce.Job;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.data.hadoop.mapreduce.JobRunner;
import org.springframework.stereotype.Service;

import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.*;
import java.util.List;

@Service
public class JobExecutionServiceImpl implements JobExecutionService, DisposableBean {

    private static final String APP_DEP_ARCHIVE_NAME = "application-dep.jar";

    private final List<Job> jobs;

    private final JobRunner compsImporter;

    @Inject
    public JobExecutionServiceImpl(List<Job> jobs
            , JobRunner compsImporter) {
        this.jobs = jobs;
        this.compsImporter = compsImporter;
    }

    public void init() throws IOException {
        String tmpDir = System.getProperty("java.io.tmpdir");
        Resource resource = new ClassPathResource("apprepo/" + APP_DEP_ARCHIVE_NAME);
        Path path = Paths.get(tmpDir, APP_DEP_ARCHIVE_NAME);
        Files.copy(resource.getInputStream(), path, StandardCopyOption.REPLACE_EXISTING);
        jobs.parallelStream().forEach(job -> job.setJar(path.toUri().toString()));
    }

    @Override
    public void importComponents(org.apache.hadoop.fs.Path path) throws Exception {
        compsImporter.call();
    }

    @Override
    public void rebuildComponentsIndex(org.apache.hadoop.fs.Path path) throws Exception {

    }


    @Override
    public void destroy() throws Exception {
        String tmpDir = System.getProperty("java.io.tmpdir");
        Path path = Paths.get(tmpDir, APP_DEP_ARCHIVE_NAME);
        Files.delete(path);
    }
}
package org.manlier.srapp.config;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.manlier.srapp.mapreduce.HBaseCompsDirectImporter;
import org.manlier.srapp.mapreduce.WholeFileInputFormat;
import org.manlier.srapp.storage.StorageProperties;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.*;
import org.springframework.data.hadoop.mapreduce.JobRunner;

import javax.inject.Singleton;
import java.io.IOException;

/**
 * Hadoop 的配置文件
 */
@Configuration
public class HDPConfiguration {

    private StorageProperties storageProperties;

    public HDPConfiguration(StorageProperties storageProperties) {
        this.storageProperties = storageProperties;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public org.apache.hadoop.conf.Configuration configuration() {
        org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
        return org.apache.hadoop.hbase.HBaseConfiguration.create(config);
    }

    @Bean
    public FileSystem fileSystem() throws IOException {
        return FileSystem.get(configuration());
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Connection connection() throws IOException {
        return ConnectionFactory.createConnection(configuration());
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Job compsImportJob() throws IOException {
        Job job = Job.getInstance(configuration());
        FileInputFormat.addInputPath(job, new Path(storageProperties.getDefaultComponentsDir()));
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setMapperClass(HBaseCompsDirectImporter.HBaseCompsMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(NullOutputFormat.class);
        return job;
    }

    @Bean
    @Singleton
    public JobRunner jobRunner() throws IOException {
        JobRunner jobRunner = new JobRunner();
        jobRunner.setJob(compsImportJob());
        return jobRunner;
    }

}