メモ置き場

メモ置き場です.開発したものや調べたことについて書きます.

[tex: ]

Javaの勉強(5):スレッド

Javaのスレッドについてまとめておく

スレッドとは

Javaのプログラムはまず,mainメソッドの頭から実行されmainメソッドが終了したらそこでプログラムも終了する.途中で他のメソッドの呼び出しがあっても最終的にはmainメソッドに戻ってきて終了する.このような一連の処理の流れをスレッドという.
スレッドを実行しているのはThreadオブジェクト(Threadクラスのインスタンス)である.

Javaのプログラムは以下のような流れで実行される.

  1. プログラムが起動される
  2. mainメソッドを実行するためのスレッドオブジェクトがJava Virtual Machine(JVM)で生成される
  3. スレッドオブジェクトがmainメソッドの処理をする.このようなプログラムの起点となるメソッドをエントリポイントという.
  4. mainメソッドの処理が終わる
  5. スレッドオブジェクトが破棄される

mainメソッドを実行するスレッドオブジェクトはJVMで自動的に生成されるが,プログラム内で新たに生成することもできる.スレッドを複数生成すれば,それぞれのスレッドオブジェクトに別々の処理をさせることができ,複数の処理を並行して実行することができる.これをマルチスレッドプログラミングという.

スレッドの状態

スレッドを複数生成して複数の処理を並列に行うといっても,本当に複数の処理が「厳密に同時に」行われているわけではない.プログラミングを実行し処理を行うのはCPUである.CPU(の1つのコア)で実行可能な処理は1個である.マルチスレッドでは,CPUで行う処理を高速で切り替えながら実行している.我々人間にとって,処理が切り替わるタイミングが早すぎる(1msecとか)ため,あたかも同時に実行されているように見えるだけである.つまりスレッドの立場からすると,CPUで実行されている時とCPUで実行されるのを待っている,といった状態があるわけである.

スレッドには主に3つの状態がある.

  • 実行可能状態

スレッドの処理が実行可能な状態になり,実行の割り当てを待っている状態

  • 実行状態

スレッドが実行されている状態

  • 待機状態

割り当て以外の状況で待機している状態.例えばキーボードからの入力を待っている状態などがある.後述するが,sleepやwaitといったメソッドで待機している状態もこれに含まれる.

sleepメソッド

Threadクラスにはsleepというメソッドが用意されている.実行を開始したスレッドを一定時間一時停止させたい場合はsleepメソッドを使う.
https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Thread.html#sleep-long-
sleepメソッドはstaticメソッドで,引数にはスレッドを停止したい時間をミリ秒で与える.またsleepメソッドはInterruptExceptionというチェック例外を投げうるので,例外処理をしておく.

public class ThreadMain {
    public static void main(String[] args) {
        for(int i=0;i<3;i++){
            try{
                Thread.sleep(1000);
                System.out.println((i+1) + " sec");
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }

        System.out.println("Finish main");
    }
}

実行結果は以下のようになる.

1 sec
2 sec
3 sec
Finish main

スレッドの作成

スレッドを生成して処理を並列に行うには以下の2つの方法がある.

  • Threadクラスのサブクラスを作成して,runメソッドをオーバーライドする.

runメソッドの中に実行させたい処理をオーバーライドしておく.

  • Runnableインターフェースを実装する

Runnableインターフェースを実装したクラスを作成し,runメソッドを実行したい処理でオーバーライドしておく.
Threadクラスを継承すると他のクラスの継承ができなくなるため,Runnableインターフェースを実装することが多いらしい.

スレッドを作成して実行するには次のようにする

  1. Threadクラスか,そのサブクラスをインスタンス化する.ThreadクラスのコンストラクタにはRunnable型の引数を渡せる.引数なしでも良い.
  2. Thraedオブジェクトのstartメソッドを呼び出して処理を開始する.スレッドオブジェクトの生成時にRunnable型の引数が渡された場合,そのRunnableオブジェクトのrunメソッドが実行される.引数なしで生成された場合は,Threadオブジェクトでオーバーライドされたrunメソッドの処理が実行される.

以下に,2種類の方法で実装したスレッドをmainメソッドから実行するコードを示す.mainメソッドを含めて3つのスレッドが並列で動作する.

// MyThraed.java
public class MyThread extends Thread{
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        // currentThread()は現在実行中のスレッドオブジェクトを返すstaticメソッド
        // getName()はこのスレッドの名前を返すメソッド
        System.out.println("Start: " + name);
        for(int i=0;i<3;i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "  " + (i + 1) + " sec");
        }
    }
}
// MyRunnable.java
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println("Start: " + name);
        for(int i=0;i<5;i++){
            try{
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "  " + (2 * i) + " sec");
        }
    }
}
// ThreadMain.java
public class ThreadMain {
    public static void main(String[] args) {
        String name = Thread.currentThread().getName();
        System.out.println("Start:  " + name);
        MyThread myThread = new MyThread();
	// Threadクラスを継承したMyThreadクラスを生成する

        Thread runnableThread = new Thread(new MyRunnable());
	// Runnableを実装したMyRunnableのオブジェクトを引数にしてThreadを作成

        myThread.start(); // MyThreadでオーバーライドしたrunの処理
        runnableThread.start(); // MyRunnableでオーバーライドしたrunの処理

        for(int i=0;i<6;i++){
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "  " + (i + 1) + " sec");
        }

        System.out.println("Finish main");
    }

実行結果は以下のようになる

Start:  main
Start: Thread-0
Start: Thread-1
main  1 sec
Thread-0  1 sec
Thread-1  0 sec
main  2 sec
Thread-0  2 sec
Thread-0  3 sec
main  3 sec
Finish Thread-0
Thread-1  2 sec
main  4 sec
main  5 sec
Thread-1  4 sec
main  6 sec
Finish main
Thread-1  6 sec
Thread-1  8 sec
Finish Thread-1

Process finished with exit code 0

出力される順番は,どのスレッドが実行状態になるかで変わるため,プログラムを実行するごとに変わる.またmainスレッドが終了した後にThread-1の結果が表示されていることからわかるように,mainメソッドの処理が終了しそのスレッドオブジェクトが破棄されたあとでも,mainスレッドから生成したスレッドオブジェクトの処理が実行されていたりする.全てのスレッドでの処理が完了してからプログラムは終了する.

Threadクラスでよく使うメソッド

以下に示したメソッドの一部は全て引数なし.

メソッド名 戻り値 説明
currentThread Thread 現在実行中のスレッドオブジェクトへの参照を返す.staticメソッド
getId long このスレッドの識別番号を返す.識別番号はスレッド生成時に自動的に割り振られ,変更はできない
getName String このスレッドの名前を返す.名前も自動的に割り振られる
start void このスレッドの処理を開始する
run void 実行すべき処理のためのメソッド.オーバーライドして使う.Threadクラスでは何もしない(抽象メソッドではないので注意)
interrupt void スレッドに割り込む.sleepなどで待機状態にあるスレッドにInterruptExceptionを発生させる
join void スレッドが終了するのを待ち合わせる

ExecutorServiceインターフェース

スレッドは複数の処理を並列して実行できるようになる反面,同じデータに複数のスレッドがアクセスしてデータの破壊が起きる.処理の順番に矛盾があって処理が停止してしまうといったことが起きる.複数スレッドが存在していても矛盾が生じずに正しく動作するようなプログラムを「スレッドセーフ」という.スレッドセーフなプログラムを作るよう細心の注意を払わなければならない.

ExecutorServiceは標準ライブラリに含まれるインターフェースである.ExecutorServiceでは,スレッドを安全に生成し実行するための機能が含まれており,スレッドの生成や開始といったスレッドに対する直接的な操作を一括して行ってくれる.また,あらかじめ同時実行されるスレッドの最大値を決め,実行待機中のスレッドをためておくことができる「スレッドプール」も簡単にできる.

スレッドを作成するのはコンピュータ的には負荷の高い処理になる.他の処理中に別のスレッドを作成すると負荷が予想よりも高くなってしまい不具合につながることもある.したがって,あらかじめ使いそうなスレッドを作成しておいて,必要に応じてスレッドに処理を渡して実行するという方針が考えられる.このようにあらかじめスレッドを生成して保持しておくことをスレッドプールという.

以下のように使う.

  1. Runnableインターフェースを実装したクラスを定義

runメソッドをオーバーライドしておく

  1. ExecutorServiceオブジェクトを生成する.
  2. submitメソッドで処理をスレッドに実行させる

引数にRunnableオブジェクトを渡すと,その中のrunメソッドを実行してくれる.

以下にコード例を示す.

// MyRunnable.java
public class MyRunnable implements Runnable {
    private int time;

    public MyRunnable(int time) {
        this.time = time;
    }

    public MyRunnable() {
        this(1);
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println("Start: " + name);
        for(int i=0;i<3;i++){
            try{
                Thread.sleep(1000 * time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "  " + (time * (i+1) + " sec"));
        }
        System.out.println("Finish " + name);
    }
}
// ThreadMain.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadMain {
    public static void main(String[] args) {
        String name = Thread.currentThread().getName();
        System.out.println("Start:  " + name);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 最大数3のスレッドプールを作成
        executorService.submit(new MyRunnable(2));
        executorService.submit(new MyRunnable(3));
        // MyRunnableオブジェクトを新規に生成して引数として渡す.
        executorService.shutdown();
        // shutdownでスレッドの受付を終了する.書いていないとスレッド受付状態のままになってしまい,プログラムが終了しなくなるので注意.

        try {
            executorService.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e){
            e.printStackTrace();
        }
        // awaitTerminationでタイムアウトの時間を設定する.

        for(int i=0;i<10;i++){
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + "  " + (i + 1) + " sec");
        }

        System.out.println("Finish main");
    }
}

実行結果.

Start:  main
Start: pool-1-thread-1
Start: pool-1-thread-2
main  1 sec
pool-1-thread-1  2 sec
main  2 sec
pool-1-thread-2  3 sec
main  3 sec
pool-1-thread-1  4 sec
main  4 sec
main  5 sec
pool-1-thread-2  6 sec
pool-1-thread-1  6 sec
Finish pool-1-thread-1
main  6 sec
main  7 sec
main  8 sec
pool-1-thread-2  9 sec
Finish pool-1-thread-2
main  9 sec
main  10 sec
Finish main

ExecutorServiceでよく使うメソッド

ExecutorServiceインスタンスを返すExecutorクラスのメソッドを以下に示す.戻り値は全てExecutorServiceのインスタンスである.

メソッド名 引数 説明
newSingleThreadExecutor なし シングルスレッドに対するExecutorServiceオブジェクトを生成して返す.このオブジェクトではスレッドを同時に1つしか生成できない
newFixedThreadPool int nThread スレッドプールに対するExecutorServiceオブジェクトを生成して返す.このオブジェクトではスレッドを同時に最高nThread個まで生成できる

次にExecutorServiceインターフェースでよく使うメソッドをまとめる.

メソッド名 引数 戻り値 説明
submit Runnable task Future Runnableオブジェクトtaskのrunメソッドの処理を管理しているスレッドオブジェクトに実行させる.戻り値のFutureは非同期処理の結果を表し,処理が完了したかどうかのチェックなどができる.Futureオブジェクトは使ったことがないので詳細は不明
shutdown なし void 処理の受付を終了するメソッド.これを呼び出すより前にsubmitされたタスクは実行される.shutdownメソッド呼ばないとExecutorServiceオブジェクトが処理受付のままになってしまいプログラムが終了しなくなるので注意
awaitTermination long time, TimeUnit unit boolean タイムアウトの時間を設定する.timeはタイムアウトの時間で,unitは単位.(10, TimeUnit.SECONDS)としたら10秒でタイムアウト

排他制御

マルチスレッドプログラミングで気をつける点の一つに,メモリ上の同一領域を別々のスレッドからアクセスする場合がある.例えば,「ある変数の値を確認してそれをインクリメントする」という処理を2つのスレッドから行うことを考えてみる.

// MyNumber.java
public class MyNumber {
    private int number;

    public MyNumber(int number) {
        this.number = number;
    }

    public void incrementNumber(){
        for(int i=0;i<100000;i++){
            number++;
        }
    }

    public int getNumber(){
        return number;
    }
}
// MyRunnable.java
public class MyRunnable implements Runnable {
    MyNumber myNumber;

    public MyRunnable(MyNumber myNumber){
        this.myNumber = myNumber;
    }

    @Override
    public void run() {
        myNumber.incrementNumber();
    }

}
// ThreadMain.java
public class ThreadMain {
    public static void main(String[] args) {
        String name = Thread.currentThread().getName();
        System.out.println("Start:  " + name);

        MyNumber myNumber = new MyNumber(0);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(new MyRunnable(myNumber));
        executorService.submit(new MyRunnable(myNumber));
        executorService.shutdown();

        try {
            executorService.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println("After thread :" + myNumber.getNumber());

        System.out.println("Finish main");
    }
}

MyNumberクラスはメンバ変数numberとメソッドincrementNumberを持つ.メソッドではメンバ変数を10000回インクリメントする.MyRunnableクラスはMyNumber型の変数を持ち,コンストラクタで渡されたMyNumberクラスの参照で初期化される.runメソッドではMyNumberフィールドのincrementNumberメソッドを呼び出す.

mainメソッドでは,まずMyNumberオブジェクトを引数0で生成している.myNumberがその参照となる.myNumberを引数としてMyRunnableオブジェクトを2つ作成しsubmitしている.MyRunnableのrunメソッドを呼び出すスレッドが2つ生成されることになるが,runで参照しているMyNumberオブジェクトはmyNumberであり,myNumberのincrementNumberメソッドが合計2回呼ばれることになる.
つまり,mainメソッドの最後でgetNumberを呼び出した際は20000が返ってくるのが正しい.
実行結果を以下に示す.

Start:  main
After thread :106842
Finish main

このように正しい結果を返していないことがわかる.これはmyNumberオブジェクトへ2つのスレッドが同時にアクセスするために起きるバグである.こういったバグを防ぐために排他制御が必要となる.

排他制御をすると,あるオブジェクトにアクセスしているスレッドが他のスレッドをブロックし,一定期間アクセスを独占できるようになる.他のスレッドがそのオブジェクトにアクセスしようとすると,スレッドによるブロックが解除されるまで待ち,その後実行される.1つのオブジェクトに同時にアクセスできるスレッドを1つに制限することで,上記のようなバグを含まないスレッドセーフなプログラムを実現できる.Java排他制御を行うには2つの方法がある

  • synchronizedキーワードをつける
synchronized 戻り値 method(arguments) {
    do something;
  • メソッド中の排他制御を行いたいオブジェクトにsynchronizedをつける
MyClass myClass; // 排他制御を行いたいオブジェクト
synchronized(myClass){
    do something;
}

ブロックで囲まれた部分の処理が排他制御となる.上記プログラムに排他制御を施しスレッドセーフにするには,MyNumberクラスのincrementNumberメソッドにsynchronizedを付ければ良い.

// MyNumber.javaを変更
    public synchronized void incrementNumber(){
        for(int i=0;i<100000;i++){
            number++;
        }
    }

スレッドの制御

割り込みや同期制御といったスレッドを制御する方法もある

interrupt

割り込みのことである.Threadクラスのinterruptメソッドを呼び出すことで,スレッドに割り込みをかけることができる.以下にコード例を示す.

//MyInterrupt.java
public class MyInterrupt implements Runnable {
    private String name;

    public MyInterrupt() {
    }

    @Override
    public void run(){
        name = Thread.currentThread().getName();
        System.out.println(name + " starts!");
        try{
            Thread.sleep(4000);
            System.out.println(name + " finished");
        } catch (InterruptedException e){
            System.out.println(name + " interrupted!");
        }
    }
}
// ThreadMain.java
public class ThreadMain {
    public static void main(String[] args) {
        String name = Thread.currentThread().getName();
        System.out.println("Start:  " + name);

        Thread thread0 = new Thread(new MyInterrupt());
        Thread thread1 = new Thread(new MyInterrupt());

        thread0.start();
        thread1.start();

        try{
            Thread.sleep(1000);
        } catch (InterruptedException e){
            e.printStackTrace();
        }

        thread0.interrupt();

    }
}
実行結果
Start:  main
Thread-0 starts!
Thread-1 starts!
Thread-0 interrupted!
Thread-1 finished

thread0の方にはmainメソッドからinterruptをかける.runメソッド中のInterruptException例外の処理が実行されていることがわかる.

同期制御

同期制御とは複数のスレッド間でタイミングを取り合うことである.
例えば2つのスレッドA,Bが一つのインスタンスを共有する場合,Aの動作完了を待ってからスレッドBが処理を開始しなくてはいけない場合がある.その際スレッドBはスレッドAの動作完了のタイミングを知る必要がある.その際に使うメソッドがwaitnotifyメソッドである.
https://docs.oracle.com/javase/jp/6/api/java/lang/Object.html#notify()

waitメソッドでは,notifyメソッドかnotifyAllメソッドでスレッドが解放されるまで待機するメソッドになる.これらのメソッドを呼び出すメソッドにはsynchronizedをつけておく必要がある.
waitメソッドやnotifyメソッドはObjectクラスで定義されたfinalメソッドであり,オーバーライドして使うことはできない.さらにObjectクラスのメソッドであることから,どのクラスからでも呼び出すことができる.したがってこれらのメソッドは.を使わずにメソッド名のみで呼び出して使うことができる.

以下にnotify/waiyメソッドを使ったコード例を示す.

public class MyStack {
    int[] list;
    int next;

    public MyStack(){
        list = new int[3];
        next = 0;
    }

    public synchronized void push(int number){
        if(next >= list.length){
            try{
                System.out.println(Thread.currentThread().getName() + " wait starts");
                wait();
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        list[next] = number;
        next++;
        System.out.print("push complete. Current data: ");
        for(int i=0;i<next;i++){
            System.out.print(list[i] + " ");
        }
        System.out.println();
        notify();
    }

    public synchronized int pop() {
        if (next <= 0) {
            try{
                System.out.println(Thread.currentThread().getName() + " wait starts");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        next--;
        int number = list[next];
        System.out.println("pop complete. poped data is " + number);

        notify();
        return number;
    }
}
public class PushRunnable implements Runnable {
    MyStack myStack;

    public PushRunnable(MyStack myStack) {
        this.myStack = myStack;
    }

    @Override
    public void run() {
        for(int i=0; i<5;i++){
            try{
                myStack.push(i*10);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Push complete");
    }
}
public class PopRunnable implements Runnable {
    MyStack myStack;

    public PopRunnable(MyStack myStack){
        this.myStack = myStack;
    }

    @Override
    public void run() {
        for(int i=0;i<5;i++) {
            try{
                Thread.sleep(1000);
                myStack.pop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Pop complete");
    }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadMain {
    public static void main(String[] args) {
        String name = Thread.currentThread().getName();
        System.out.println("Start:  " + name);
        MyStack myStack = new MyStack();

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new PopRunnable(myStack));
        executorService.submit(new PushRunnable(myStack));
        executorService.shutdown();

        myStack.push(500);

        try{
            Thread.sleep(1000);
        } catch (InterruptedException e){
            e.printStackTrace();
        }

    }
}