パッケージjava.util.stream
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
ここでは、Collection<Widget>であるwidgetsをストリー のソースとして使用した後、そのストリー に対してフィルタ-マップ-リデュースを実行し、赤いウィジェットの重量の合計を取得しています。 (合計はリダクション操作の一例です。)
このパッケージで導入された主な抽象は、ストリー です。 クラスStream、IntStream、LongStreamおよびDoubleStreamは、オブジェクトおよびプリミティブ型int、long,およびdoubleに対するストリー です。 ストリー はいくつかの点でコレクションと異なります。
- ストレージなし。 ストリー は要 を 納するデータ構 ではありません。代わりに、なんらかのソース(データ構 、配列、ジェネレータ関数、入出力チャネルなど)に含まれる要 を計算操作のパイプラインを介して運搬します。
- 本質的に関数型。 ストリー に対して操作を行うと結果が生成されますが、そのソースは変更されません。 たとえば、コレクションから得られた
Streamをフィルタリングすると、ソースであるコレクションの要 が削除されるのではなく、フィルタリングされた要 を含まない新しいStreamが生成されます。 - 遅延指向。 フィルタリング、マッピング、重複削除など、多くのストリー 操作は遅延的に実装可能なため、最適化の余地があります。 たとえば、連続する3つの母音を含ん 最初の
Stringを探す 合、すべての入力文字列を検査する必要はありません。 ストリー 操作は中間操作(Streamを生成する操作)と終端操作(値または副作用を生成する操作)に分けられます。 中間操作は常に遅延的です。 - 無限である可能性。 コレクションのサイズは有限ですが、ストリー のサイズは有限である必要はありません。
limit(n)やfindFirst()などの短絡操作を使えば、無限ストリー に対する計算を有限の時間で終わらせることができます。 - 消費可能。 ストリー の存続期間中、ストリー の要 が使用されるのは一度 けです。
Iteratorと同様、ソースの同じ要 を再度使用するには、新しいストリー を生成する必要があります。
Collectionから(stream()およびparallelStream()メソッド経由)。- 配列から(
Arrays.stream(Object[])経由)。 - ストリー ・クラスのstaticファクトリ・メソッド(
Stream.of(Object[])、IntStream.range(int, int)、Stream.iterate(Object, UnaryOperator)など)から。 BufferedReader.lines()からファイルの行を取得できます。Filesのメソッドからファイル・パスのストリー を取得できます。Random.ints()から乱数のストリー を取得できます。- JDKに含まれる他のさまざまなストリー 生成メソッド(
BitSet.stream()、Pattern.splitAsStream(java.lang.CharSequence)、JarFile.stream()など)。
サードパーティー・ライブラリは、これらのテクニックを使って追 のストリー ・ソースを提供できます。
ストリー 操作とパイプライン
ストリー 操作は中間操作と終端操作に分かれますが、これらを組み合わせてストリー ・パイプラインが形成されます。 ストリー ・パイプラインは、1つのソース(Collection、配列、ジェネレータ関数、入出力チャネルなど)、それに続く0個以上の中間操作(Stream.filter、Stream.mapなど)および1つの終端操作(Stream.forEach、Stream.reduceなど)から構成されます。
中間操作は新しいストリー を返します。 これらの操作は常に遅延されます。filter()などの中間操作を実行しても、実際のフィルタリングは一切実行されず、代わりに、トラバース時に最初のストリー の要 のうち指定された述語に一致するものが 納される新しいストリー が作成されます。 パイプラインの終端操作が実行されるまで、パイプライン・ソースのトラバーサルは開始されません。
Stream.forEachやIntStream.sumなどの終端操作は、ストリー をトラバースして結果や副作用を生成できます。 終端操作の実行が完了するとそのストリー ・パイプラインは消費済とみなされ、以降使用できなくなります。同じデータ・ソースを再度トラバースする必要が生じた 合は、データ・ソースに戻って新しいストリー を取得する必要があります。 終端操作はほとんどすべての 合に積極的であり、データ・ソースのトラバーサルやパイプラインの処理を完了させた後でリターンします。 そうでない終端操作はiterator()とspliterator() けです。これらは、既存の操作では不十分でタスクを実行できない 合に任意のクライアント制御パイプライン・トラバーサルを行えるようにするための「エスケープ・ハッチ」として提供されています。
ストリー の遅延処理は効率性を大幅に向上させます。上記のフィルタ-マップ-合計の例のようなパイプラインでは、フィルタリング、マッピングおよび合計をデータに対する単一パスに融合することができ、中間状態も最小限に抑えられます。 遅延処理では、全データの検査が不要であればそれを回避することも可能となります。「1000文字を超える長さの最初の文字列を見つける」といった操作では、必要な特性を備えた文字列を見つけるのに十分な数の文字列を検査する けで済み、ソースから取得可能なすべての文字列を検査する必要もなくなります。 (この動作は、入力ストリー がた 大きい けでなく無限である 合にさらに重要となります。)
中間操作はさらにステートレス操作とステートフル操作に分けられます。 ステートレス操作(filterやmapなど)は、新しい要 を処理する際に、以前に参照した要 の状態を保持しません。各要 の処理は、他の要 の操作とは無関係に実行できます。 ステートフル操作(distinctやsortedなど)は、新しい要 を処理する際に、以前に参照した要 の状態を組み込む可能性があります。
ステートフル操作は、入力の全体を処理しないと結果を生成できない可能性があります。 たとえばストリー をソートする 合、ストリー のすべての要 が処理されるまで、結果を生成することができません。 このため、並列計算下では、ステートフルな中間操作を含む一部のパイプラインで、データに対するパスが複数必要になったり、大量のデータをバッファーに 納する必要が生じたりする可能性があります。 ステートレスな中間操作のみを含むパイプラインは、 次、並列のいずれであっても、最小限のデータ・バッファリングで単一パスで処理できます。
さらに、一部の操作は短絡操作とみなされます。 中間操作が無限の入力が与えられたときに有限のストリー を結果として生成する可能性がある 合、その操作は短絡操作になります。 終端操作が無限の入力が与えられたときに有限の時間で終了する可能性がある 合、その操作は短絡操作になります。 短絡操作をパイプラインに含めることは、無限ストリー の処理が有限時間で正常終了するための必要条件ではありますが、十分条件ではありません。
並列性
明示的なfor-ループで要 を処理する操作は、本質的に 次的です。 ストリー では並列実行をしやすいように、計算が、個々の要 ごとの命令型操作としてではなく、集約操作のパイプラインとして再構成されます。 すべてのストリー 操作は 次、並列のどちらでも実行できます。 JDKのストリー 実装は、並列性が明示的に要求されない限り、 次ストリー を作成します。 たとえば、Collectionに含まれるメソッドCollection.stream()、Collection.parallelStream()はそれぞれ 次ストリー 、並列ストリー を生成します。IntStream.range(int, int)などのその他のストリー 生成メソッドでは 次ストリー が生成されますが、それらのストリー は、BaseStream.parallel()メソッドを呼び出すことで効率的に並列化することができます。 前述の「ウィジェットの重さの合計」クエリーを並列で実行するには、次のようにします。
int sumOfWeights = widgets.parallelStream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
この例の 次版と並列版の違いは、初期ストリー の作成時に「stream()」ではなく「parallelStream()」を使用する点 けです。 ストリー ・パイプラインは、端末操作が呼び出されるストリー のモードに応じて、 次またはパラレルに実行されます。 ストリー の 次モードまたはパラレル・モードは、BaseStream.isParallel()メソッドを使用して決定でき、ストリー のモードは、BaseStream.sequential()およびBaseStream.parallel()操作を使用して変更できます。 最新の 次モードまたはパラレル・モード設定は、ストリー ・パイプライン全体の実行に適用されます。
findAny()のような明示的に非決定論的として識別されている操作を除き、ストリー が 次、並列のどちらで実行されるかによって計算結果が変わるべきではありません。
ほとんどのストリー 操作はユーザー指定の動作を記述するパラメータを受け取りますが、このパラメータは通常、ラ ダ式になります。 正しい動作を維持するには、これらの動作パラメータが非干渉的であり、かつほとんどの 合ステートレスでなければいけません。 そのようなパラメータは常に関数型インタフェース(Functionなど)のインスタンスであり、通常はラ ダ式やメソッド参照になります。
非干渉
ストリー では、ArrayListなどのスレッドセーフでないコレクションも含め、さまざまなデータ・ソースに対して並列の可能性がある集約操作を実行できます。 これが可能になるのは、ストリー ・パイプラインの実行時にデータ・ソースへの干渉を防げる 合 けです。 エスケープハッチ操作のiterator()とspliterator()を除き、実行は終端操作が呼び出された時点で始まり、終端操作が完了した時点で終わります。 ほとんどのデータ・ソースの 合、干渉を防ぐとは、ストリー ・パイプラインの実行中にデータ・ソースが一切変更されないことを意味します。 これに対する顕著な例外は、同時変更を扱えるように特別に設計された並行コレクションをソースに持つストリー です。 並行ストリー ・ソースは、SpliteratorからCONCURRENT特性が 告されるソースです。
したがって、ソースが並行的でない可能性のあるストリー ・パイプラインに含まれる動作パラメータは、ストリー のデータ・ソースを決して変更すべきではありません。 動作パラメータが非並行データ・ソースに干渉すると言われるのは、動作パラメータがストリー のデータ・ソースを変更するか、そのような変更の引き金となる 合です。 非干渉の必要性は、並行パイプライン けでなく、すべてのパイプラインに当てはまります。 ストリー ・ソースが並行的でない限り、ストリー ・パイプラインの実行中にストリー のデータ・ソースを変更すると、例外、不正な回答、または不適切な動作が発生する可能性があります。 適切に動作するストリー ・ソースの 合、終端操作が開始される前であればソースを変更でき、そうした変更は対象となる要 に反 されます。 たとえば、次のコードを考えてみましょう。
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
最初に、2つの文字列で構成されるリストを作成: "one"および"two"。 次に、そのリストからストリー が作成されます。 次に、リストを変更するために、3つ目の文字列threeが追 されます。 最後に、ストリー の要 が集められて連結されます。 リストが変更されたのは終端操作collectが開始される前 ったので、結果は文字列one two threeになります。 JDKコレクションや他の大部分のJDKクラスから返されるストリー はすべて、このように適切に動作します。その他のライブラリで生成されるストリー については、「低レベルのストリー 構築」を参照し、適切に動作するストリー を構築するための要件を確認してく さい。
ステートレス動作
ストリー ・パイプラインの結果が非決定論的または不正となる可能性があるのは、ストリー 操作の動作パラメータがステートフルの 合です。 ステートフルなラ ダ(または対応する関数型インタフェースを実装したその他のオブジェクト)とは、ストリー ・パイプラインの実行中に変化する可能性のある状態に結果が依存するようなラ ダのことです。 次のmap()に対するパラメータが、ステートフル・ラ ダの一例です。
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
ここで、マッピング操作が並列実行されると、スレッドのスケジューリングの違いにより、同じ入力に対する結果が実行のたびに変わる可能性があります。一方、ステートレスなラ ダ式では、結果は常に同じになります。
また、動作パラメータから可変状態へのアクセスを試みることは、安全性やパフォーマンスの点から良くない選択肢と言えます。その状態へのアクセスの同期を取らなかった 合、データ競合が発生するためにコードが中断してしまいますが、その状態へのアクセスの同期を取った 合、得ようとしている並列性のメリットが競合によって薄れてしまう危険性があります。 最良のアプローチは、ストリー 操作のステートフル動作パラメータを一切使用しないことです。通常は、ストリー ・パイプラインを再構成してステートフルになるのを避ける方法があります。
副作用
ストリー 操作の動作パラメータでの副作用は一般にお薦めできません。そのような副作用はしばしば、ステートレス要件への無意識の違反や、スレッドの安全性を脅かすその他の危険につながる可能性があるからです。行動パラメータに副作用がある 合、明示的に明記しない限り、以下の保証はありません:
- それらの副作用の他のスレッドへのvisibility。
- 同じストリー ・パイプライン内の"same"要 の異なる操作は同じスレッドで実行されます。そして
- ストリー 実装は、計算の結果に影響しないことを証明できる 合、ストリー ・パイプラインから操作(または全ステージ)を自由に除去できるため、動作パラメータは常に呼び出されます。
副作用の 序は驚くかもしれません。 ストリー ・ソースの検出 序と矛盾しない結果を生成するようにパイプラインが制約されている 合でも(たとえば、IntStream.range(0,5).parallel().map(x -> x*2).toArray()は[0, 2, 4, 6, 8]を生成する必要がある)、マッパー関数が個々の要 に適用される 序や、任意の動作パラメータがどのスレッド内で特定の要 に対して実行されるか、に関する保証は一切ありません。
副作用の解消も意外かもしれません。 端末操作forEachおよびforEachOrderedを除き、ストリー 実装で計算結果に影響を与えずに動作パラメータの実行を最適化できる 合、動作パラメータの副作用が常に実行されるとはかぎりません。 (具体的な例は、count操作に記載されているAPIノートを参照してく さい。)
副作用を使用するようにテンプレート設定されている多くの計算は、可変累計のかわりにreductionを使用するなど、副作用なしでより安全かつ効率的に表現できます。 た し、println()をデバッグ目的で使用するなどの副作用は、通常無害です。 副作用を介さないと動作できないストリー 操作は、forEach()やpeek()など、ごく少数です。これらは注意して使用すべきです。
副作用を不適切に使用しているストリー ・パイプラインを、副作用を使用しないパイプラインに変換する方法の一例として、指定された正規表現に一致する文字列を文字列ストリー 内で検索し、一致した文字列をリスト内に 納するコードを、次に示します。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
このコードでは副作用が不必要に使用されています。 並列実行時には、ArrayListがスレッドセーフでないために不正な結果が生成されますし、必要な同期を追 すれば競合が発生し、並列性のメリットが薄れます。 さらに言えば、ここで副作用を使用する必要はまったくありません。forEach()は単純に、より効率的で安全な、並列化により適したリダクション操作で置き換えることができます。
List<String> results =
stream.filter(s -> pattern.matcher(s).matches())
.toList(); // No side-effects!
序付け
ストリー は、定義された検出 序を持つことも持たないこともあります。 ストリー が検索 序を持つかどうかは、そのソースと中間操作によって決まります。 ストリー ・ソースの中には、本質的に 序付けされたもの(Listや配列など)もあれば、そうでないもの(HashSetなど)もあります。 中間操作の中には、sorted()のように、もともと 序付けされていないストリー に検出 序を課すものもあれば、BaseStream.unordered()のように、 序付けされたストリー を 序付けなしに変更するものもあります。 さらに終端操作の中には、forEach()のように検出 序を無視するものもあります。
序付けされたストリー では、ほとんどの操作は要 を検出 に処理するよう制約されます。ストリー のソースが[1, 2, 3]を含むListである 合、map(x -> x*2)の実行結果は[2, 4, 6]でなければいけません。 一方、ソースに検出 序が定義されていない 合は、値[2, 4, 6]の要 を任意に入れ替えたものも、有効な結果になります。
次ストリー の 合、検出 序の有無はパフォーマンスには影響せず、決定論にのみ影響します。 ストリー が 序付けされている 合は、同一ソースで同一のストリー ・パイプラインを繰り返し実行しても同一の結果が生成されます。 序付けされていない 合は、繰り返し実行すると異なる結果が生成される可能性があります。
並列ストリー の 合、 序付けの制約を緩和すると、実行の効率性を改善できることがあります。 重複フィルタリング(distinct())やグループ化リダクション(Collectors.groupingBy())といった特定の集約操作は、要 の 序付けを無視できれば、より効率的に実装できます。 同様に、limit()のような、検出 序と本質的な関連性を持つ操作では、適切な 序付けのためにバッファリングの必要性が生じ、並列性のメリットが薄れてしまう可能性があります。 ストリー に検出 序が含まれているが、ユーザーはその検出 序のことは特に気にしていない、という 合、unordered()でストリー の 序付けを明示的に解除することで、一部のステートフル操作や終端操作で並列時のパフォーマンスが改善される可能性があります。 た し、前述の「ブロックの重さの合計」例など、大部分のストリー ・パイプラインは、 序付け制約の下でも効率的に並列化されます。
リダクション操作
リダクション操作(畳み込みとも呼ばれる)は、一連の入力要 を受け取り、結合操作を繰り返し適用することでそれらを結合し、単一のサマリー結果を出力します(一連の数値の合計または最大値の検索や、リストへの要 の蓄積など) ストリー ・クラスには、reduce()、collect()と呼ばれる複数の形式の汎用リダクション操作と、sum()、max()、count()など、複数の特殊化されたリダクション形式が含まれています。
もちろん、そのような操作は、次のように単純な 次処理ループとして簡単に実装できます。
int sum = 0;
for (int x : numbers) {
sum += x;
}
た し、上のような推移的蓄積よりもリデュース操作を好む理由があります。 リダクションは抽象度がより高い(個々の要 ではなくストリー 全体に作用する) けでなく、適切に構築されたリデュース操作は本質的に並列化可能となります(た し、要 の処理に使用される関数が結合的かつステートレスである必要がある)。 たとえば、与えられた数値ストリー について、合計を計算する 合には、次のように記述できます。
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
または
int sum = numbers.stream().reduce(0, Integer::sum);
これらのリダクション操作は、ほとんど何も変更しなくても、安全に並列実行できます。
int sum = numbers.parallelStream().reduce(0, Integer::sum);
リダクションの並列化が良好に行えるのは、実装が、データのサブセットを並列処理した後、中間結果を結合して最終的な正しい答えを得ることができるからです。 (言語に"パラレルfor-each"構成がある 合でも、可変累積アプローチでは、開発者が共有累積変数sumにスレッド・セーフな更新を提供し、必要な同期によって、パラレル化によるパフォーマンスの向上が排除されます。) 代わりにreduce()を使用すれば、リダクション操作の並列化に関するあらゆる重荷が消失し、ライブラリは同期を追 しなくても効率的な並列実装を提供できます。
前述の"ウィジェット"の例は、削減を他の操作と組み合せてfor-loopsを一括操作に置き換える方法を示しています。 widgetsが、getWeightメソッドを含むWidgetオブジェクトのコレクションである 合、もっとも重いウィジェットを見つけるには次のようにします。
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
より汎用的な形式の、<T>型の要 に作用して<U>型の結果を生成するreduce操作では、次の3つのパラメータが必要になります。
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
ここで、identity要 は、リダクションの初期シード値であるとともに、入力要 が存在しない 合のデフォルト結果でもあります。 accumulator関数は、部分的な結果と次の要 を受け取り、新しい部分的な結果を生成します。 combiner関数は、2つの部分的な結果を結合し、新しい部分的な結果を生成します。 (コンバイナは並列リダクションで必要になります。並列リダクションでは入力が分割され、パーティションごとに部分的な蓄積値が計算された後、部分的な結果が結合されて最終結果が生成されます。)
より正式には、identity値はコンバイナ関数の単位元でなければいけません。 つまり、すべてのuについて、combiner.apply(identity, u)がuに等しくなります。 さらに、combiner関数は結合的である必要があり、かつaccumulator関数と互換性がある必要があります。すべてのuとtについて、combiner.apply(u, accumulator.apply(identity, t))とaccumulator.apply(u, t)がequals()で等しくなる必要があります。
3引数形式は2引数形式を一般化したものであり、マッピング・ステップが蓄積ステップに組み込まれています。 単純な重量合計の例を、より汎用的な形式を使って次のように書き直すことができます。
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight(),
Integer::sum);
た し、明示的なマップ-リデュース形式のほうが可読性が高いので、通常はそちらを使用することをお薦めします。 一般化された形式は、マッピングとリダクションを単一の関数にまとめることで、かなりの作業量を省いて最適化できる 合のために用意されています。
可変リダクション
可変リダクション操作は、ストリー 内の要 を処理する際に、可変結果コンテナ(CollectionやStringBuilderなど)に入力要 を蓄積します。
文字列のストリー を受け取り、文字列を連結して単一の長い文字列にする必要がある 合、通常のリダクションを使ってこれを実現できます。
String concatenated = strings.reduce("", String::concat)
これで必要な結果が得られますし、並列でも動作します。 た し、パフォーマンスに満足できない可能性があります。 そのような実装は大量の文字列コピーを行うため、実行時間はO(n^2)(nは文字数)になります。 より高パフォーマンスなアプローチは、文字列蓄積用の可変コンテナであるStringBuilder内に結果を蓄積することです。 可変リダクションを並列化する際には、通常のリダクションの 合と同じテクニックを使用できます。
可変リダクション操作にはcollect()という名前が付いていますが、これは、Collectionなどの結果コンテナ内に必要な結果を集めるからです。 collectは3つの関数を要求します。結果コンテナの新しいインスタンスを構築するサプライヤ関数、結果コンテナに入力要 を組み込むアキュ レータ関数、ある結果コンテナの内容を別のコンテナにマージする結合関数です。 この形式は、通常のリダクションの汎用形式に非常に似ています。
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
collectをこのような抽象的な方法で表現するメリットは、reduce()の 合と同じく、直接的な並列化が可能になる点にあります。いくつかの部分的な結果を並列して蓄積した後、それらを結合することができます(た し、蓄積関数と結合関数が該当する要件を満たす必要がある)。 たとえば、ストリー 内の要 のString表現をArrayList内に集めるには、次のような明らかな 次for-each形式を記述できます。
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
あるいは、並列化可能なcollect形式を使用することもできます。
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
あるいは、アキュ レータ関数からマッピング操作を抜き出し、次のようにより簡潔に表現することもできます。
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
ここで、サプライヤは単なるArrayListのコンストラクタであり、アキュ レータは文字列化された要 をArrayListに追 し、コンバイナは単純にaddAllを使って一方のコンテナの文字列を他方にコピーします。
collectの3つの側面であるサプライヤ、アキュ レータ、コンバイナは緊密に結合しています。 Collectorという抽象を使えば、3つすべての側面を捉えることができます。 List内に文字列を集める上の例は、標準のCollectorを使って次のように書き換えることができます。
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
可変リダクションをCollector内にパッケージ化することには、合成可能性というもう1つのメリットがあります。 クラスCollectorsに含まれるさまざまな定義済のコレクタ・ファクトリの中には、あるコレクタを別のコレクタに変換するコンビネータも含まれています。 たとえば、従業員ストリー の給料の合計を計算する次のようなコレクタがあるとします。
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(2番目の型パラメータの?は、このコレクタで使用される中間表現には興味がないことを示している けです。) 給料の部門別合計の表を作成するコレクタを作成する 合、groupingByを使えばsummingSalariesを再利用できます。
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
通常のリダクション操作の 合と同じく、collect()操作を並列化できるのは、該当の条件が満たされた 合 けです。 部分的に蓄積された任意の結果について、その結果を空の結果コンテナと結合したときに元と同じ結果が生成される必要があります。 つまり、一連のアキュ レータ呼出しやコンバイナ呼出しの結果として得られた、部分的に蓄積された結果pについて、pはcombiner.apply(p, supplier.get())と等しくなる必要があります。
さらに、計算を分割しても同じ結果が得られる必要があります。 任意の入力要 t1とt2について、以下の計算の結果r1とr2が等しくなる必要があります。
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
ここで、等価性の判定は一般にObject.equals(Object)によりますが、 序の違いを考慮するために等価性の条件が緩和されることもあります。
Extensibility
ファクトリ・メソッドjava.util.stream.Collector.of(...)を使用してCollectorを実装するか、Collectorsの事前定義済コレクタを使用すると、ユーザー定義、再利用可能な「ターミナル」操作を実行できます。
Gathererを実装します。ファクトリ・メソッドjava.util.stream.Gatherer.of(...)およびjava.util.stream.Gatherer.ofSequential(...)を使用する。または、Gatherersで事前定義されたギャザラを使用すると、ユーザー定義で再利用可能な「中間」操作が可能になります。
リダクション、並行性、および 序付け
ある複雑なリダクション操作、たとえば次のようなMapを生成するcollect()を考えます。
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
この操作を並列実行すると、実際には逆効果となる可能性があります。 それは、結合ステップ(あるMapをキーに基づいて別のマップにマージするステップ)のコストが、一部のMap実装で高くなる可能性があるからです。
ところが、このリダクションで使用される結果コンテナが、ConcurrentHashMapのような、同時変更可能なコレクションであったとします。 その 合、アキュ レータの並列呼出しでは、実際には同じ共有結果コンテナに結果を同時に蓄積できるため、コンバイナで複数の結果コンテナをマージする必要がなくなります。 これにより、並列実行のパフォーマンスが向上する可能性があります。 これを並行リダクションと呼びます。
並行リダクションをサポートするCollectorには、Collector.Characteristics.CONCURRENT特性のマークが付けられています。 た し、並行コレクションにはデメリットもあります。 複数のスレッドから同時に共有コンテナに結果が蓄積されるので、結果が蓄積される 番が非決定論的になります。 したがって、並行リダクションが可能なのは、処理対象のストリー で 序付けが重要でない 合のみになります。 Stream.collect(Collector)実装が並行リダクションを実行するのは、次の 合 けです。
- ストリー が並列的であり、
- コレクタが
Collector.Characteristics.CONCURRENT特性を持っていて、 - ストリー が 序付けされていないか、コレクタが
Collector.Characteristics.UNORDERED特性を持つ。
BaseStream.unordered()メソッドを使用します。 たとえば、
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(ここで、Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)はgroupingByの並行版です)。
ある特定のキーの各要 がソースと同じ 番で現れることが重要である 合には、並行リダクションは使用できません。 序付けは、並行挿入で 牲になるものの1つ からです。 その 合は、 次リダクションかマージベースの並列リダクションを実装するように制約されます。
結合性
演算子または関数opが結合的となるのは、次が成り立つ 合です。
(a op b) op c == a op (b op c)
ここで を4つに増やせば、並列評価でのこれの重要性が明確になります。
a op b op c op d == (a op b) op (c op d)
したがって、(a op b)と(c op d)を並列に評価した後、それらの結果に対してopを呼び出すことができます。
結合的な演算の例として、数値の 算、最小、最大、および文字列の連結が挙げられます。
低レベルのストリー 構築
ここまで、ストリー のすべての例で、Collection.stream()やArrays.stream(Object[])のようなメソッドを使ってストリー が取得されていました。 それらのストリー 生成メソッドはどのように実装されているのでしょうか。
クラスStreamSupportには、ストリー 作成用の低レベルのメソッドがいくつか含まれていますが、それらはすべて、なんらかの形式のSpliteratorを使用しています。 スプリッテレータはIteratorの並列版であり、(無限個の可能性もある)要 のコレクションを記述し、 次前進、一括トラバーサル、および入力内で並列処理可能な部分を抜き取って別のスプリッテレータ内に 納する操作をサポートします。 最低レベルでは、すべてのストリー がスプリッテレータによって駆動されます。
スプリッテレータを実装するうえでの実装上の選択肢は多数存在していますが、そのほとんどすべてが、実装の単純さと、そのスプリッテレータを使用するストリー の実行時パフォーマンスとのトレードオフの上に成り立っています。 もっとも単純 がもっとも低パフォーマンスなスプリッテレータの作成方法は、Spliterators.spliteratorUnknownSize(java.util.Iterator, int)を使ってイテレータから作成することです。 そのようなスプリッテレータは動作はするものの、並列パフォーマンスが低いことが予想されます。サイズ設定情 (基礎となるデータ・セットの大きさ)が失われているほか、単純すぎる分割アルゴリズ に制約されているからです。
より高品質のスプリッテレータは、バランスの取れた既知サイズのスプリット、正確なサイズ設定情 、およびスプリッテレータやデータのその他のいくつかのcharacteristicsを提供します(これらの情 は実装内で、実行を最適化するために使用できる)。
可変データ・ソースのスプリッテレータには、データへのバインドのタイミングという、もう1つの課題があります。スプリッテレータが作成されてからストリー ・パイプラインが実行されるまでの間に、データが変更される可能性があるからです。 ストリー のスプリッテレータは、IMMUTABLE、CONCURRENTのいずれかの特性を 告するのが理想ですが、そうでない 合は遅延バインディングであるべきです。 ソースが推奨のスプリッテレータを直接提供できない 合は、Supplierを使ってスプリッテレータを間接的に提供し、Supplierを受け取るバージョンのstream()経由でストリー を構築することができます。 ストリー ・パイプラインの終端操作が開始された後で初めて、サプライヤからスプリッテレータが取得されます。
これらの要件に従えば、ストリー ・ソースの変更とストリー ・パイプラインの実行の間の潜在的な干渉のスコープが大幅に減少します。 必要な特性を備えたスプリッテレータに基づくストリー 、またはSupplierベースのファクトリ形式を使用するストリー は、終端操作が開始される前にデータ・ソースが変更されても対応できます(た し、ストリー 操作の動作パラメータが、非干渉とステートレスに関する必要条件を満たしている必要があります)。 詳細については、「非干渉」を参照してく さい。
- 導入されたバージョン:
- 1.8
-
クラス説明BaseStream<T, S extends BaseStream<T,
S>> 次および並列の集約操作をサポートする要 シーケンスであるストリー の基底インタフェース。Collector<T,A, R> 可変結果コンテナに入力要 を蓄積し、オプションですべての入力要 が処理された後で蓄積された結果を最終的な表現に変換する可変リダクション操作。リダクション実装の最適化に使用可能な、Collectorのプロパティーを示す特性。要 をコレクションに蓄積したり、さまざまな条件に従って要 を要約するなど、有用な各種リダクション操作を実装したCollector実装。次および並列の集約操作をサポートするプリミティブdouble値要 のシーケンスです。DoubleStreamの可変ビルダーです。double値の引数およびDoubleConsumerを受け入れ、結果を返さない演算を表します。Gatherer<T,A, R> 入力要 のストリー を出力要 のストリー に変換する中間操作で、オプションでアップストリー の最後に達したときに最終アクションを適用します。ダウンストリー ・オブジェクトは、操作のパイプラインの次のステージで、要 を送信できます。Gatherer.Integrator<A,T, R> Integratorは要 を受信し、必要に応じて指定された状態を使用して要 を処理し、必要に応じて増分結果をダウンストリー に送信します。Gatherer.Integrator.Greedy<A,T, R> Greedyインテグレータは、すべての入力を消費し、ダウンストリー がより多くの要 を必要としないことをリレーすることができます。ウィンドウ関数、折りたたみ関数、要 の同時変換など、有用な中間操作を提供するGathererの実装。次および並列の集約操作をサポートするプリミティブint値要 のシーケンスです。IntStreamの可変ビルダーです。int値の引数およびIntConsumerを受け入れ、結果を返さない演算を表します。次および並列の集約操作をサポートするプリミティブlong値要 のシーケンスです。LongStreamの可変ビルダーです。long値の引数およびLongConsumerを受け入れ、結果を返さない演算を表します。Stream<T>次および並列の集約操作をサポートする要 のシーケンスです。Streamの可変ビルダーです。ストリー を作成および操作するための低レベルのユーティリティ・メソッドです。