shun91
1/19/2015 - 3:28 AM

Stormで,テキストファイルから読み込むSpoutのサンプル.

Stormで,テキストファイルから読み込むSpoutのサンプル.

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.List;
import java.util.Map;

import java.util.concurrent.LinkedBlockingQueue;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * テキストファイルから読み込むSpoutのサンプル.
 * 
 * 【注意】動作テストしてません.多分動くと思いますが...
 * 利用する時は27行目の INPUT_FILEPATH を指定するようにしてください.
 * 
 * @author shun91
 * 
 */
public class FileReadSampleSpout extends BaseRichSpout {
	
	// 読み込むテキストファイルの絶対パス
	private static final String INPUT_FILEPATH = "";
	
	private static final long serialVersionUID = 1L; // 警告回避用
	SpoutOutputCollector _collector;
	LinkedBlockingQueue<List<Object>> queue = null; // 読み込んだものを一時的に格納するキュー
	private Thread readThread;// テキストファイルを読み込むスレッド

	/**
	 * Spout起動時に1度だけ実行される初期化メソッド
	 */
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
		queue = new LinkedBlockingQueue<List<Object>>(); // キューのインスタンス生成

		// スレッドの起動
		startThread();
	}

	/**
	 * タプルを送出するメソッド.繰り返し実行し続けている.
	 * キューにオブジェクトが格納されるとそれを送出する.
	 */
	@Override
	public void nextTuple() {
		try {
			if(queue.size() != 0){ // 環境によってはチェックしないとエラーになる?
				_collector.emit(queue.take());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 送出タプルのフィールドを定義するメソッド.
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("text"));
	}

	/**
	 * Threadを起動するメソッド
	 */
	private void startThread() {
		readThread = new Thread(new Reader()); // スレッド生成
		try {
			readThread.start();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * スレッドで実行されるクラス本体.run()メソッドが実行される.
	 * テキストファイルを1行ずつ読み込み,タプルに変換した後キューに格納する.
	 */
	class Reader implements Runnable {
		@Override
		public final void run() {
			BufferedReader br;
			String line;
			try {
				br = new BufferedReader(new FileReader(INPUT_FILEPATH));
				while ((line = br.readLine()) != null) { // ファイルを1行ずつ読み込む
					queue.offer(new Values(line)); // タプルに変換してキューに格納
				}
				br.close();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				
			}
		}
	}
}