YoshiHotta
3/18/2017 - 3:36 AM

learningConcurrentProgrammingInScala.scala

learningConcurrentProgrammingInScala.scala

# Prokopec, Learning Concurrent Programming in Scalaのお勉強メモ
# Ch2
// 便利な関数を定義しておく。
def thread(body: => Unit): Thread = {
  val t = new Thread {
    override def run(): Unit = body
  }
  t.start()
  t
}

def log(msg: Any): Unit = {
  println(s"${Thread.currentThread.getName}:${msg.toString}")
}

// java.lang.Thread
{
  val t = Thread.currentThread
  val name = t.getName
  println(name)
}

// Threadの基本的な使い方
// 1. runをoverride
// 2. threadをnew
// 3. threadをstart
// 4. threadをjoin
{
  class MyThread extends Thread {
    override def run(): Unit = { // 1
      println("My Thread")
    }
  }

  val t = new MyThread // 2
  println("start a new thread")
  t.start()  // 3
  t.join()   // 4
  println("the thread joined.")
}

// 自作したthread, log関数の使い方
{
  log(1)
  val t = thread{
    log(2)
    Thread.sleep(100)
    log(3)
  }
  t.join()
  log(4) // 1234と印刷される
}

// synchronizedの使い方
{
  val uidmax = 1000
  var uidCount = 0
  var uids = List[Int]()
  def getUID: Int = uids.synchronized{
    val freshId =  uidCount + 1
    uidCount = freshId
    freshId
  }

  val t1 = thread {
    (1 to uidmax).foreach(_ => uids = getUID :: uids)
  }
  val t2= thread {
    (1 to uidmax).foreach(_ => uids = getUID :: uids)
  }
  t1.join()
  t2.join()

  uids.toSet == (1 to uidmax * 2).toSet
}

// thread pool
{
  val tasks = scala.collection.mutable.Queue[() => Unit]()
  val worker = new Thread {
    def poll(): Option[() => Unit] = tasks.synchronized {
      if (tasks.nonEmpty) Some(tasks.dequeue())
      else None
    }

    // budy-waitする。これは駄目 ***************:
    override def run(): Unit = while (true) poll() match {
      case Some(task) => task()
      case None => ()
    }
  }

  worker.setName("Worker")
  worker.setDaemon(true)
  worker.start()

  def asynchronous(body: => Unit) = tasks.synchronized {
    tasks.enqueue(() => body)
  }
  asynchronous{ log("Hello") }
  asynchronous{ log("World") }
  // Thread.currentThreadが終わるとデーモンスレッドも終了する。
  // 全てが出力されるようにするためにsleepする。
  Thread.sleep(2000)
}

// busy-waitしないバージョン。こっちを使うべき。
{
  val lock = new AnyRef
  var message: Option[String] = None
  val greeter = thread {
    lock.synchronized {
      while (message.isEmpty) { // guarded block
        lock.wait()
      }
      log(message.get)
    }
  }
  lock.synchronized {
    message = Some("Hello")
    lock.notify()
  }
  greeter.join()
}

// volatile
// 変化がすぐに他のスレッドに見えるようになる
{
  case class Page(txt: String, var position: Int)

  val pages = for(i <- 1 to 5) yield Page("Na" * (100 - 20 * i) + " Bat man!", -1)
  var lock = new AnyRef
  @volatile var found = false
  for (p <- pages) yield thread {
    var i = 0
    while (i < p.txt.length && !found) {
      if (p.txt(i) == '!') {
        p.position = i
        found = true
        lock.synchronized(lock.notify())
      } else {
        i += 1
      }
    }
  }
  lock.synchronized{
    while (!found) lock.wait()
    log(pages.map(_.position))
  }
}

# Ch4
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.{Try, Success, Failure}
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils._

// Future[Option[Try]]

// FutureにはFuture computationとFuture valueの二種類がある
// Future computation
{
  // future computationをFutureのコンパニオンオブジェクトで作成
  Future {log("the future is here")}
  log("the future is coming")
}

{
  val buildFile = Future {
    val f = Source.fromFile("/Users/yoshi/Dropbox/Dropbox_Documents/dropboxWorkspace/scalaProkopecBook/build.sbt")
    try
      f.getLines().mkString("\n")
    finally
    f.close()
  }
  log(buildFile.isCompleted)
  Thread.sleep(100)
  log(buildFile.isCompleted)
  log(buildFile.value)

}

// callback
// 成功した場合のcallback
// foreachでSuccessした場合のcallbackを登録できる
// foreachはonSuccessと同じだが、onSuccessはdeprecated

// 失敗した場合のcallback
// Future(Some(Failure(???)))をFuture(Some(Success(???)))に返還する。
// つまりfailedでFuture[Throwable]という「成功した」Futureに変換する。
// その上でforeachでcallbackを登録する

// 成功した場合と失敗した場合を同時に登録するにはonCompleteを使う。
{
  def getUrlSpec: Future[List[String]] = Future {
    // val url = "http://www.w3.org/Addressing/URL/url-spec.txt"
    val url = "http://www.w3.org/Invalid/URL/url-spec.txt"
    val f = Source.fromURL(url)
    try f.getLines().toList finally f.close()
  }

  def find(lines: List[String], keyword: String) = {
    lines.zipWithIndex collect {
      case (line, n) if line.contains(keyword) => (n, line)
    } mkString "\n"
  }

  val urlSpec: Future[List[String]] = getUrlSpec
  // 成功した場合にのみ実行される。
  urlSpec foreach { lines => log(find(lines, "telnet"))}
  // 複数個のcallbackを登録することも出来る
  urlSpec foreach { lines => log(find(lines, "password"))}
  // 失敗した場合は.failed.foreachで登録する。
  urlSpec.failed.foreach { t => log(s"exception occured - $t") }
  // 成功した場合と失敗した場合を両方登録する。
  urlSpec onComplete{
    case Success(x) => println(s"success: $x")
    case Failure(x) => println(s"fail: $x")
  }
}

// Tryのfor-comprehension
{
  val t1 = Try("Hello")
  val t2 = Try("World")
  val t3 = for(a <- t1; b <- t2) yield s"$a ${b}!"

  t3 match {
    case Success(x) => println(x)
    case Failure(x) => println("failed...")
  }
}

// fatal exception
// InterruptedExceptionのようなfatalエラーはFutureに包まれず、生のままのエラーになる。
// この場合、try, catchを使うしかない。
{
  try {
    Future { throw new InterruptedException }
  } catch {
    case _: InterruptedException => println("fatal exception occured")
  }
}

// functional composition (e.g. map)
// 基本的にFutureはfunc. comp.を使って処理を加えていく。callbackはあまり使わない。
{
  val x = Future{1}.map(_ * 2) // Future(Some(Success(2)))
  val y = Future{10}
  // for-comprehensionも使える。
  val z = for(a <- x; b <- y) yield a + b
  z foreach(println)
}

// recoverを用いた例外処理
// Successのときは何もしない。Failureの時は部分関数でSuccessを作る。
{
  for (f <- List(Future{1}, Future{1/0})) yield {
    val fr = f.recover{case _: Throwable => 10000}
    Await.result(fr, 15.seconds) // 結果が出るまでブロックする。
  } // List(1, 1000)
}

// Promise
// PromiseはFutureの前駆体である。
// .future.{foreach, failed.foreach, onComplete}で実際の実行前に事後処理を登録できる。
// .success, .failureでFutureオブジェクトを作る (約束を果たす)。すると事後処理が自動的に始まる。
{
  val p = Promise[Int]
  p.future.foreach { x => x * 10} // 事前登録
  p.success(2) // 値を割り付ける
  Await.result(p.future, 10.seconds)

  val q = Promise[Int]
  // Failureを作ってから処理を割り当てることも可能
  q.failure(new Exception)
  q.future.failed.foreach { x => log(s"failed with $x")}

  // 一度しかpromiseにfutureを割り当てることは出来ない。
  // p.success(20) --> Error
}

// 関数内でfutureを作りたいときはpromiseのメソッドで作る。
// Future.apply()で直接作らない。
// 例)future1 or future2をpimp my libraryを使って実装する。
{
  implicit class FutureOps[T](val self: Future[T]) {
    def or(that: Future[T]): Future[T] = {
      val p = Promise[T]
      self onComplete(x => p tryComplete x)
      that onComplete(x => p tryComplete x)
      p.future
    }
  }

  val f1 = Future{"a" * 100000}
  val f2 = Future{"b" * 100}

  val ans = Await.result(f1 or f2, 10.seconds)
  ans(0) // b
}

// blocking clause
// ブロッキングが起きている箇所をblocking {} で明示すると、自動的にCPU数以上のスレッドを立ててくれる
{
  // 2コアしかないMacBook Proでも10秒で終わる。blockingを使わないと500秒かかる。
  for (i <- 1 to 100) {
    Future {
      blocking {
        Thread.sleep(10000)
        log(i)
      }
    }
  }
}

# Ch5

import scala.collection.GenSet
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.{Combiner, ForkJoinTaskSupport, ParSet, SeqSplitter}
// 基本的にはsequentialなコレクションにparをつけるだけで並列化される。
{
  import scala.util.Random
  val r = Random.shuffle(1 to 10000)
  r.par.max
}

// parallelなコレクションにもsequentialなコレクションにも使えるようにする。
// GenSeq等のGen*を型にする。
{
  import scala.collection.GenSeq
  def findMin(xs: GenSeq[Int]) = -xs.map(x => -x).max

  findMin(List(1,2,3,4,5)) == findMin(List(1,2,3,4,5).par) // true
}

// 並列コレクション毎にプロセッサ数を変えられる。
{
  import java.util.concurrent.ForkJoinPool
  val fjpool = new ForkJoinPool(3) // 3スレッドを使う
  val customTaskSupport = new ForkJoinTaskSupport(fjpool)
  val r = (1 to 10000).par
  r.tasksupport = customTaskSupport
  r.max
}

/* parでのparallel collectionへの変換速度
*  vector等の定数時間でランダムアクセスできるデータ構造はO(log N)でparallel collectionへ
*  変換できる。Listのようにsequentialなデータ構造は変換にO(N)かかる。
*/
{
  List.fill(10000)(0).par // ~ 10000 steps
  Vector.fill(10000)(0).par // ~ log(10000) steps
}

/* 並列化可能な操作と並列化不可能な操作
* レゴブロックのアナロジーで様々な方法でくみ上げられるsignatureなら並列化可能。
* そうでなければ並列化不可能。ただし並列化可能であるとしても結合律が成立しないと
* 非決定論的になる。
 */
{
  //  foldLeftは(S, T) => Sなので並列化不可能. シングルスレッドと同じ時間がかかる
  (1 to 10000).foldLeft(0)(_ - _) ==
    (1 to 10000).par.foldLeft(0)(_ - _) // true
  // foldは(S, S) => Sなので並列化可能
  (1 to 10000).fold(0)(_ + _) ==
    (1 to 10000).par.fold(0)(_ + _) // true
  (1 to 10000).fold(0)(_ - _) ==
    (1 to 10000).par.fold(0)(_ - _) // false. 結合律が不成立
}

// aggregate
// 並列化不可能な場合はaggregateを使えば並列化できる
{
  (1 to 10000).aggregate(0)((acc: Int, x:Int) => acc + x, (acc1: Int, acc2: Int) => acc1 + acc2) ==
    (1 to 10000).fold(0)(_ + _) // true
}

// 基本的にはmutableな変数は使わず、Par*のメソッドを使う。安全に並列化できる。
// どうしてもmutableな状態を使わないといけないときは、java.util.concurrent._の平行コレクションを使う。
{
  import java.util.concurrent.ConcurrentSkipListSet
  import scala.collection.JavaConverters._
  // 並列コレクションのメソッドを使う。
  def intersectionSets[T](s1: GenSet[T], s2: GenSet[T] ) = s1 & s2
  // 平行コレクションを使う。
  def intersectionSetsConcurrent[T](s1: GenSet[T], s2: GenSet[T]):
  scala.collection.mutable.Set[T]= {
    val skiplist = new ConcurrentSkipListSet[T]
    val (sx, st) = if (s1.size < s2.size) (s1, s2) else (s2, s1)
    for (x <- sx.par) {
      if (sx.contains(x)) skiplist.add(x)
    }
    skiplist.asScala
  }

  val s1 = (1 to 1000).toSet
  val s2 = Range(1, 1000, 4).toSet
  intersectionSets(s1.par, s2.par) ==  intersectionSets(s1, s2) // true
  intersectionSetsConcurrent(s1.par, s2.par) == intersectionSets(s1, s2) // true
}

// TrieMap (CTrie)を使うとループ開始時のスナップショットを使ってスイープできる。
{
  val cache = new scala.collection.concurrent.TrieMap[Int, String]()
  // 明示的に.snapshot()を使っても良い。
  for(i <- 1 to 100) cache(i) = i.toString
  for ((num, str) <- cache.par) cache(-num) = (-num).toString
  cache.toSet.size == 200 // true
}

// parallel collectionのカスタム実装
/* Splitter
* Splitterはaccessor (foldLeft, find, exists, etc. 単一の値を返す関数)を実装するために
* 使う。
* Iteratorの定義するhasNext, nextに加えてsplit, dup, remaningをメソッドに持つ。
 */
/* Combiner
* transformer (map, filter, groupBy, etc. コレクションを生成する関数)を実装するために
* ScalaはBuilderを使う。Builder は+=, result, clear, newBuilderを定義する。
* trait Combiner extends Builderは+=, result, clear に加えてcombineとnewCombinerを定義する。
* combineは二つのCombinerを結合して新しいCombinerを返す。
* 並列のtransformerを定義するためにはSplitterとCombinerの両方が必要。
*/
{
  class ParString(val str: String) extends scala.collection.parallel.immutable.ParSeq[Char] {
    def apply(i: Int): Char = str.charAt(i)
    def length = str.length
    def splitter = new ParStringSplitter(str, 0, str.length) // accessorを使うためにオーバーライドする
    def seq = new collection.immutable.WrappedString(str)
    override def newCombiner = new ParStringCombiner // transformerを使うためにオーバーライドする
  }

  class ParStringSplitter(val s: String, private var i: Int, val limit: Int) extends SeqSplitter[Char] {
    final def hasNext = i < limit
    final def next = {
      val r = s.charAt(i)
      i += 1
      r
    }
    def split = {
      val rem = remaining
      if (rem >= 2) psplit(rem / 2, rem - rem/2)
      else Seq(this)
    }
    def dup = new ParStringSplitter(s, i, limit)
    def remaining = limit - i
    def psplit(sizes: Int*): Seq[ParStringSplitter] = {
      val ss = for (sz <- sizes) yield {
        val nlimit = (i + sz) min limit
        val ps = new ParStringSplitter(s, i, nlimit)
        i = nlimit
        ps
      }
      if (i == limit) ss
      else ss :+ new ParStringSplitter(s, i, limit)
    }
  }

  // two-phase evolutionで実装する。
  // 次の例ではchunksという中間的な状態を保持する場所を用意し、resultが呼ばれると
  // chunksを結合するという二段階を踏む。
  class ParStringCombiner extends Combiner[Char, ParString] {
    private val chunks = new ArrayBuffer += new StringBuilder
    private var lastc = chunks.last
    var size = 0
    def +=(elem: Char) = {
      lastc += elem
      size += 1
      this
    }
    def result: ParString = {
      val rsb = new StringBuilder
      for (sb <- chunks) rsb.append(sb)
      new ParString(rsb.toString)
    }
    def clear = {
      chunks.clear
      chunks += new StringBuilder
      lastc = chunks.last
      size = 0
    }

    override def combine[N <: Char, NewTo >: ParString](that: Combiner[N, NewTo]) = {
      if (this eq that) this else that match {
        case that: ParStringCombiner =>
          size += that.size
          chunks ++= that.chunks
          lastc = chunks.last
          this
      }
    }
  }

  // Splitterの使用例。accessorが使えるようになった!
  val txt = "A custom text " * 250000
  val partxt = new ParString(txt)
  partxt.aggregate(0)((n: Int, c:Char) => if (Character.isUpperCase(c)) n + 1 else n, _ + _ )

  // Combinerrの使用例。transformerが使えるようになった!
  partxt.filter(_ != ' ').length
}