learningConcurrentProgrammingInScala.scala
# Prokopec, Learning Concurrent Programming in Scalaのお勉強メモ
# Ch2
// 便利な関数を定義しておく。
def thread(body: => Unit): Thread = {
val t = new Thread {
override def run(): Unit = body
}
t.start()
t
}
def log(msg: Any): Unit = {
println(s"${Thread.currentThread.getName}:${msg.toString}")
}
// java.lang.Thread
{
val t = Thread.currentThread
val name = t.getName
println(name)
}
// Threadの基本的な使い方
// 1. runをoverride
// 2. threadをnew
// 3. threadをstart
// 4. threadをjoin
{
class MyThread extends Thread {
override def run(): Unit = { // 1
println("My Thread")
}
}
val t = new MyThread // 2
println("start a new thread")
t.start() // 3
t.join() // 4
println("the thread joined.")
}
// 自作したthread, log関数の使い方
{
log(1)
val t = thread{
log(2)
Thread.sleep(100)
log(3)
}
t.join()
log(4) // 1234と印刷される
}
// synchronizedの使い方
{
val uidmax = 1000
var uidCount = 0
var uids = List[Int]()
def getUID: Int = uids.synchronized{
val freshId = uidCount + 1
uidCount = freshId
freshId
}
val t1 = thread {
(1 to uidmax).foreach(_ => uids = getUID :: uids)
}
val t2= thread {
(1 to uidmax).foreach(_ => uids = getUID :: uids)
}
t1.join()
t2.join()
uids.toSet == (1 to uidmax * 2).toSet
}
// thread pool
{
val tasks = scala.collection.mutable.Queue[() => Unit]()
val worker = new Thread {
def poll(): Option[() => Unit] = tasks.synchronized {
if (tasks.nonEmpty) Some(tasks.dequeue())
else None
}
// budy-waitする。これは駄目 ***************:
override def run(): Unit = while (true) poll() match {
case Some(task) => task()
case None => ()
}
}
worker.setName("Worker")
worker.setDaemon(true)
worker.start()
def asynchronous(body: => Unit) = tasks.synchronized {
tasks.enqueue(() => body)
}
asynchronous{ log("Hello") }
asynchronous{ log("World") }
// Thread.currentThreadが終わるとデーモンスレッドも終了する。
// 全てが出力されるようにするためにsleepする。
Thread.sleep(2000)
}
// busy-waitしないバージョン。こっちを使うべき。
{
val lock = new AnyRef
var message: Option[String] = None
val greeter = thread {
lock.synchronized {
while (message.isEmpty) { // guarded block
lock.wait()
}
log(message.get)
}
}
lock.synchronized {
message = Some("Hello")
lock.notify()
}
greeter.join()
}
// volatile
// 変化がすぐに他のスレッドに見えるようになる
{
case class Page(txt: String, var position: Int)
val pages = for(i <- 1 to 5) yield Page("Na" * (100 - 20 * i) + " Bat man!", -1)
var lock = new AnyRef
@volatile var found = false
for (p <- pages) yield thread {
var i = 0
while (i < p.txt.length && !found) {
if (p.txt(i) == '!') {
p.position = i
found = true
lock.synchronized(lock.notify())
} else {
i += 1
}
}
}
lock.synchronized{
while (!found) lock.wait()
log(pages.map(_.position))
}
}
# Ch4
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.{Try, Success, Failure}
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils._
// Future[Option[Try]]
// FutureにはFuture computationとFuture valueの二種類がある
// Future computation
{
// future computationをFutureのコンパニオンオブジェクトで作成
Future {log("the future is here")}
log("the future is coming")
}
{
val buildFile = Future {
val f = Source.fromFile("/Users/yoshi/Dropbox/Dropbox_Documents/dropboxWorkspace/scalaProkopecBook/build.sbt")
try
f.getLines().mkString("\n")
finally
f.close()
}
log(buildFile.isCompleted)
Thread.sleep(100)
log(buildFile.isCompleted)
log(buildFile.value)
}
// callback
// 成功した場合のcallback
// foreachでSuccessした場合のcallbackを登録できる
// foreachはonSuccessと同じだが、onSuccessはdeprecated
// 失敗した場合のcallback
// Future(Some(Failure(???)))をFuture(Some(Success(???)))に返還する。
// つまりfailedでFuture[Throwable]という「成功した」Futureに変換する。
// その上でforeachでcallbackを登録する
// 成功した場合と失敗した場合を同時に登録するにはonCompleteを使う。
{
def getUrlSpec: Future[List[String]] = Future {
// val url = "http://www.w3.org/Addressing/URL/url-spec.txt"
val url = "http://www.w3.org/Invalid/URL/url-spec.txt"
val f = Source.fromURL(url)
try f.getLines().toList finally f.close()
}
def find(lines: List[String], keyword: String) = {
lines.zipWithIndex collect {
case (line, n) if line.contains(keyword) => (n, line)
} mkString "\n"
}
val urlSpec: Future[List[String]] = getUrlSpec
// 成功した場合にのみ実行される。
urlSpec foreach { lines => log(find(lines, "telnet"))}
// 複数個のcallbackを登録することも出来る
urlSpec foreach { lines => log(find(lines, "password"))}
// 失敗した場合は.failed.foreachで登録する。
urlSpec.failed.foreach { t => log(s"exception occured - $t") }
// 成功した場合と失敗した場合を両方登録する。
urlSpec onComplete{
case Success(x) => println(s"success: $x")
case Failure(x) => println(s"fail: $x")
}
}
// Tryのfor-comprehension
{
val t1 = Try("Hello")
val t2 = Try("World")
val t3 = for(a <- t1; b <- t2) yield s"$a ${b}!"
t3 match {
case Success(x) => println(x)
case Failure(x) => println("failed...")
}
}
// fatal exception
// InterruptedExceptionのようなfatalエラーはFutureに包まれず、生のままのエラーになる。
// この場合、try, catchを使うしかない。
{
try {
Future { throw new InterruptedException }
} catch {
case _: InterruptedException => println("fatal exception occured")
}
}
// functional composition (e.g. map)
// 基本的にFutureはfunc. comp.を使って処理を加えていく。callbackはあまり使わない。
{
val x = Future{1}.map(_ * 2) // Future(Some(Success(2)))
val y = Future{10}
// for-comprehensionも使える。
val z = for(a <- x; b <- y) yield a + b
z foreach(println)
}
// recoverを用いた例外処理
// Successのときは何もしない。Failureの時は部分関数でSuccessを作る。
{
for (f <- List(Future{1}, Future{1/0})) yield {
val fr = f.recover{case _: Throwable => 10000}
Await.result(fr, 15.seconds) // 結果が出るまでブロックする。
} // List(1, 1000)
}
// Promise
// PromiseはFutureの前駆体である。
// .future.{foreach, failed.foreach, onComplete}で実際の実行前に事後処理を登録できる。
// .success, .failureでFutureオブジェクトを作る (約束を果たす)。すると事後処理が自動的に始まる。
{
val p = Promise[Int]
p.future.foreach { x => x * 10} // 事前登録
p.success(2) // 値を割り付ける
Await.result(p.future, 10.seconds)
val q = Promise[Int]
// Failureを作ってから処理を割り当てることも可能
q.failure(new Exception)
q.future.failed.foreach { x => log(s"failed with $x")}
// 一度しかpromiseにfutureを割り当てることは出来ない。
// p.success(20) --> Error
}
// 関数内でfutureを作りたいときはpromiseのメソッドで作る。
// Future.apply()で直接作らない。
// 例)future1 or future2をpimp my libraryを使って実装する。
{
implicit class FutureOps[T](val self: Future[T]) {
def or(that: Future[T]): Future[T] = {
val p = Promise[T]
self onComplete(x => p tryComplete x)
that onComplete(x => p tryComplete x)
p.future
}
}
val f1 = Future{"a" * 100000}
val f2 = Future{"b" * 100}
val ans = Await.result(f1 or f2, 10.seconds)
ans(0) // b
}
// blocking clause
// ブロッキングが起きている箇所をblocking {} で明示すると、自動的にCPU数以上のスレッドを立ててくれる
{
// 2コアしかないMacBook Proでも10秒で終わる。blockingを使わないと500秒かかる。
for (i <- 1 to 100) {
Future {
blocking {
Thread.sleep(10000)
log(i)
}
}
}
}
# Ch5
import scala.collection.GenSet
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.{Combiner, ForkJoinTaskSupport, ParSet, SeqSplitter}
// 基本的にはsequentialなコレクションにparをつけるだけで並列化される。
{
import scala.util.Random
val r = Random.shuffle(1 to 10000)
r.par.max
}
// parallelなコレクションにもsequentialなコレクションにも使えるようにする。
// GenSeq等のGen*を型にする。
{
import scala.collection.GenSeq
def findMin(xs: GenSeq[Int]) = -xs.map(x => -x).max
findMin(List(1,2,3,4,5)) == findMin(List(1,2,3,4,5).par) // true
}
// 並列コレクション毎にプロセッサ数を変えられる。
{
import java.util.concurrent.ForkJoinPool
val fjpool = new ForkJoinPool(3) // 3スレッドを使う
val customTaskSupport = new ForkJoinTaskSupport(fjpool)
val r = (1 to 10000).par
r.tasksupport = customTaskSupport
r.max
}
/* parでのparallel collectionへの変換速度
* vector等の定数時間でランダムアクセスできるデータ構造はO(log N)でparallel collectionへ
* 変換できる。Listのようにsequentialなデータ構造は変換にO(N)かかる。
*/
{
List.fill(10000)(0).par // ~ 10000 steps
Vector.fill(10000)(0).par // ~ log(10000) steps
}
/* 並列化可能な操作と並列化不可能な操作
* レゴブロックのアナロジーで様々な方法でくみ上げられるsignatureなら並列化可能。
* そうでなければ並列化不可能。ただし並列化可能であるとしても結合律が成立しないと
* 非決定論的になる。
*/
{
// foldLeftは(S, T) => Sなので並列化不可能. シングルスレッドと同じ時間がかかる
(1 to 10000).foldLeft(0)(_ - _) ==
(1 to 10000).par.foldLeft(0)(_ - _) // true
// foldは(S, S) => Sなので並列化可能
(1 to 10000).fold(0)(_ + _) ==
(1 to 10000).par.fold(0)(_ + _) // true
(1 to 10000).fold(0)(_ - _) ==
(1 to 10000).par.fold(0)(_ - _) // false. 結合律が不成立
}
// aggregate
// 並列化不可能な場合はaggregateを使えば並列化できる
{
(1 to 10000).aggregate(0)((acc: Int, x:Int) => acc + x, (acc1: Int, acc2: Int) => acc1 + acc2) ==
(1 to 10000).fold(0)(_ + _) // true
}
// 基本的にはmutableな変数は使わず、Par*のメソッドを使う。安全に並列化できる。
// どうしてもmutableな状態を使わないといけないときは、java.util.concurrent._の平行コレクションを使う。
{
import java.util.concurrent.ConcurrentSkipListSet
import scala.collection.JavaConverters._
// 並列コレクションのメソッドを使う。
def intersectionSets[T](s1: GenSet[T], s2: GenSet[T] ) = s1 & s2
// 平行コレクションを使う。
def intersectionSetsConcurrent[T](s1: GenSet[T], s2: GenSet[T]):
scala.collection.mutable.Set[T]= {
val skiplist = new ConcurrentSkipListSet[T]
val (sx, st) = if (s1.size < s2.size) (s1, s2) else (s2, s1)
for (x <- sx.par) {
if (sx.contains(x)) skiplist.add(x)
}
skiplist.asScala
}
val s1 = (1 to 1000).toSet
val s2 = Range(1, 1000, 4).toSet
intersectionSets(s1.par, s2.par) == intersectionSets(s1, s2) // true
intersectionSetsConcurrent(s1.par, s2.par) == intersectionSets(s1, s2) // true
}
// TrieMap (CTrie)を使うとループ開始時のスナップショットを使ってスイープできる。
{
val cache = new scala.collection.concurrent.TrieMap[Int, String]()
// 明示的に.snapshot()を使っても良い。
for(i <- 1 to 100) cache(i) = i.toString
for ((num, str) <- cache.par) cache(-num) = (-num).toString
cache.toSet.size == 200 // true
}
// parallel collectionのカスタム実装
/* Splitter
* Splitterはaccessor (foldLeft, find, exists, etc. 単一の値を返す関数)を実装するために
* 使う。
* Iteratorの定義するhasNext, nextに加えてsplit, dup, remaningをメソッドに持つ。
*/
/* Combiner
* transformer (map, filter, groupBy, etc. コレクションを生成する関数)を実装するために
* ScalaはBuilderを使う。Builder は+=, result, clear, newBuilderを定義する。
* trait Combiner extends Builderは+=, result, clear に加えてcombineとnewCombinerを定義する。
* combineは二つのCombinerを結合して新しいCombinerを返す。
* 並列のtransformerを定義するためにはSplitterとCombinerの両方が必要。
*/
{
class ParString(val str: String) extends scala.collection.parallel.immutable.ParSeq[Char] {
def apply(i: Int): Char = str.charAt(i)
def length = str.length
def splitter = new ParStringSplitter(str, 0, str.length) // accessorを使うためにオーバーライドする
def seq = new collection.immutable.WrappedString(str)
override def newCombiner = new ParStringCombiner // transformerを使うためにオーバーライドする
}
class ParStringSplitter(val s: String, private var i: Int, val limit: Int) extends SeqSplitter[Char] {
final def hasNext = i < limit
final def next = {
val r = s.charAt(i)
i += 1
r
}
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem/2)
else Seq(this)
}
def dup = new ParStringSplitter(s, i, limit)
def remaining = limit - i
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
val ss = for (sz <- sizes) yield {
val nlimit = (i + sz) min limit
val ps = new ParStringSplitter(s, i, nlimit)
i = nlimit
ps
}
if (i == limit) ss
else ss :+ new ParStringSplitter(s, i, limit)
}
}
// two-phase evolutionで実装する。
// 次の例ではchunksという中間的な状態を保持する場所を用意し、resultが呼ばれると
// chunksを結合するという二段階を踏む。
class ParStringCombiner extends Combiner[Char, ParString] {
private val chunks = new ArrayBuffer += new StringBuilder
private var lastc = chunks.last
var size = 0
def +=(elem: Char) = {
lastc += elem
size += 1
this
}
def result: ParString = {
val rsb = new StringBuilder
for (sb <- chunks) rsb.append(sb)
new ParString(rsb.toString)
}
def clear = {
chunks.clear
chunks += new StringBuilder
lastc = chunks.last
size = 0
}
override def combine[N <: Char, NewTo >: ParString](that: Combiner[N, NewTo]) = {
if (this eq that) this else that match {
case that: ParStringCombiner =>
size += that.size
chunks ++= that.chunks
lastc = chunks.last
this
}
}
}
// Splitterの使用例。accessorが使えるようになった!
val txt = "A custom text " * 250000
val partxt = new ParString(txt)
partxt.aggregate(0)((n: Int, c:Char) => if (Character.isUpperCase(c)) n + 1 else n, _ + _ )
// Combinerrの使用例。transformerが使えるようになった!
partxt.filter(_ != ' ').length
}