Home > Articles

This chapter is from the book

16.9 Parallel Streams

The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance, but at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution. Although the API goes a long way to achieve its aim, it is important to understand the pitfalls to avoid when executing stream pipelines in parallel.

Building Parallel Streams

The execution mode of an existing stream can be set to parallel by calling the parallel() method on the stream (p. 933). The parallelStream() method of the Collection interface can be used to create a parallel stream with a collection as the data source (p. 897). No other code is necessary for parallel execution, as the data partitioning and thread management for a parallel stream are handled by the API and the JVM. As with any stream, the stream is not executed until a terminal operation is invoked on it.

The isParallel() method of the stream interfaces can be used to determine whether the execution mode of a stream is parallel (p. 933).

Parallel Stream Execution

The Stream API allows a stream to be executed either sequentially or in parallel— meaning that all stream operations can execute either sequentially or in parallel. A sequential stream is executed in a single thread running on one CPU core. The elements in the stream are processed sequentially in a single pass by the stream operations that are executed in the same thread (p. 891).

A parallel stream is executed by different threads, running on multiple CPU cores in a computer. The stream elements are split into substreams that are processed by multiple instances of the stream pipeline being executed in multiple threads. The partial results from processing of each substream are merged (or combined) into a final result (p. 891).

Parallel streams utilize the Fork/Join Framework (§23.3, p. 1447) under the hood for executing parallel tasks. This framework provides support for the thread management necessary to execute the substreams in parallel. The number of threads employed during parallel stream execution is dependent on the CPU cores in the computer.

Figure 16.12, p. 963, illustrates parallel functional reduction using the three-argument reduce(identity, accumulator, combiner) terminal operation (p. 962).

Figure 16.14, p. 967, illustrates parallel mutable reduction using the three-argument collect(supplier, accumulator, combiner) terminal operation (p. 966).

Factors Affecting Performance

There are no guarantees that executing a stream in parallel will improve the performance. In this subsection we look at some factors that can affect performance.

Benchmarking

In general, increasing the number of CPU cores and thereby the number of threads that can execute in parallel only scales performance up to a threshold for a given size of data, as some threads might become idle if there is no data left for them to process. The number of CPU cores boosts performance to a certain extent, but it is not the only factor that should be considered when deciding to execute a stream in parallel.

Inherent in the total cost of parallel processing is the start-up cost of setting up the parallel execution. At the onset, if this cost is already comparable to the cost of sequential execution, not much can be gained by resorting to parallel execution.

A combination of the following three factors can be crucial in deciding whether a stream should be executed in parallel:

  • Sufficiently large data size

    The size of the stream must be large enough to warrant parallel processing; otherwise, sequential processing is preferable. The start-up cost can be too prohibitive for parallel execution if the stream size is too small.

  • Computation-intensive stream operations

    If the stream operations are small computations, then the stream size should be proportionately large as to warrant parallel execution. If the stream operations are computation-intensive, the stream size is less significant, and parallel execution can boost performance.

  • Easily splittable stream

    If the cost of splitting the stream into substreams is higher than processing the substreams, employing parallel execution can be futile. Collections like Array-Lists, HashMaps, and simple arrays are efficiently splittable, whereas LinkedLists and IO-based data sources are less efficient in this regard.

Benchmarking—that is, measuring performance—is strongly recommended to decide whether parallel execution will be beneficial. Example 16.14 illustrates a simple scheme where reading the system clock before and after a stream is executed can be used to get a sense of how well a stream performs.

The class StreamBenchmarks in Example 16.14 defines five methods, at (1) through (5), that compute the sum of values from 1 to n. These methods compute the sum in various ways. Each method is executed with four different values of n; that is, the stream size is the number of values for summation. The program prints the benchmarks for each method for the different values of n, which of course can vary, as many factors can influence the results—the most significant one being the number of CPU cores on the computer.

  • The methods seqSumRangeClosed() at (1) and parSumRangeClosed() at (2) perform the computation on a sequential and a parallel stream, respectively, that are created with the closeRange() method.

    return LongStream.rangeClosed(1L, n).sum();                // Sequential stream
    ...
    return LongStream.rangeClosed(1L, n).parallel().sum();     // Parallel stream

    Benchmarks from Example 16.14:

      Size   Sequential Parallel
       1000   0.05681   0.11031
      10000   0.06698   0.13979
     100000   0.71274   0.52627
    1000000   7.02237   4.37249

    The terminal operation sum() is not computation-intensive. The parallel stream starts to show better performance when the number of values approaches 100000. The stream size is then significantly large for the parallel stream to show better performance. Note that the range of values defined by the arguments of the rangeClosed() method can be efficiently split into substreams, as its start and end values are provided.

  • The methods seqSumIterate() at (3) and parSumIterate() at (4) return a sequential and a parallel sequential stream, respectively, that is created with the iterate() method.

    return LongStream.iterate(1L, i -> i + 1).limit(n).sum();            // Sequential
    ...
    return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum(); // Parallel

    Benchmarks from Example 16.14:

      Size   Sequential Parallel
       1000   0.08645   0.34696
      10000   0.35687   1.27861
     100000   3.24083  11.38709
    1000000  29.92285 117.87909

    The method iterate() creates an infinite stream, and the limit() intermediate operation truncates the stream according to the value of n. The performance of both streams degrades fast when the number of values increases. However, the parallel stream performs worse than the sequential stream in all cases. The values generated by the iterate() method are not known before the stream is executed, and the limit() operation is also stateful, making the process of splitting the values into substreams inefficient in the case of the parallel stream.

  • The method iterSumLoop() at (5) uses a for(;;) loop to compute the sum.

    Benchmarks from Example 16.14:

      Size   Iterative
       1000   0.00586
      10000   0.02330
     100000   0.22352
    1000000   2.49677

    Using a for(;;) loop to calculate the sum performs best for all values of n compared to the streams, showing that significant overhead is involved in using streams for summing a sequence of numerical values.

In Example 16.14, the methods measurePerf() at (6) and xqtFunctions() at (13) create the benchmarks for functions passed as parameters. In the measurePerf() method, the system clock is read at (8) and the function parameter func is applied at (9). The system clock is read again at (10) after the function application at (9) has completed. The execution time calculated at (10) reflects the time for executing the function. Applying the function func evaluates the lambda expression or the method reference implementing the LongFunction interface. In Example 16.14, the function parameter func is implemented by method references that call methods, at (1) through (5), in the StreamBenchmarks class whose execution time we want to measure.

public static <R> double measurePerf(LongFunction<R> func, long n) { // (6)
  // ...
  double start = System.nanoTime();                                // (8)
  result = func.apply(n);                                          // (9)
  double duration = (System.nanoTime() - start)/1_000_000;         // (10) ms.
  // ...
}
Example 16.14 Benchmarking
import java.util.function.LongFunction;
import java.util.stream.LongStream;
/*
 * Benchmark the execution time to sum numbers from 1 to n values
 * using streams.
 */
public final class StreamBenchmarks {

  public static long seqSumRangeClosed(long n) {                       // (1)
    return LongStream.rangeClosed(1L, n).sum();
  }

  public static long paraSumRangeClosed(long n) {                      // (2)
    return LongStream.rangeClosed(1L, n).parallel().sum();
  }

  public static long seqSumIterate(long n) {                           // (3)
    return LongStream.iterate(1L, i -> i + 1).limit(n).sum();
  }

  public static long paraSumIterate(long n) {                          // (5)
    return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum();
  }

  public static long iterSumLoop(long n) {                             // (5)
    long result = 0;
    for (long i = 1L; i <= n; i++) {
      result += i;
    }
    return result;
  }

  /*
   * Applies the function parameter func, passing n as parameter.
   * Returns the average time (ms.) to execute the function 100 times.
   */
  public static <R> double measurePerf(LongFunction<R> func, long n) { // (6)
    int numOfExecutions = 100;
    double totTime = 0.0;
    R result = null;
    for (int i = 0; i < numOfExecutions; i++) {                        // (7)
      double start = System.nanoTime();                                // (8)
      result = func.apply(n);                                          // (9)
      double duration = (System.nanoTime() - start)/1_000_000;         // (10)
      totTime += duration;                                             // (11)
    }
    double avgTime = totTime/numOfExecutions;                          // (12)
    return avgTime;
  }

  /*
   * Executes the functions in the varargs parameter funcs
   * for different stream sizes.
   */
  public static <R> void xqtFunctions(LongFunction<R>... funcs) {      // (13)
    long[] sizes = {1_000L, 10_000L, 100_000L, 1_000_000L};            // (14)

    // For each stream size ...
    for (int i = 0; i < sizes.length; ++i) {                           // (15)
      System.out.printf("%7d", sizes[i]);
      // ... execute the functions passed in the varargs parameter funcs.
      for (int j = 0; j < funcs.length; ++j) {                         // (16)
        System.out.printf("%10.5f", measurePerf(funcs[j], sizes[i]));
      }
      System.out.println();
    }
  }

  public static void main(String[] args) {                             // (17)

    System.out.println("Streams created with the rangeClosed() method:");// (18)
    System.out.println("  Size   Sequential Parallel");
    xqtFunctions(StreamBenchmarks::seqSumRangeClosed,
                 StreamBenchmarks::paraSumRangeClosed);

    System.out.println("Streams created with the iterate() method:");  // (19)
    System.out.println("  Size   Sequential Parallel");
    xqtFunctions(StreamBenchmarks::seqSumIterate,
                 StreamBenchmarks::paraSumIterate);

    System.out.println("Iterative solution with an explicit loop:");   // (20)
    System.out.println("  Size   Iterative");
    xqtFunctions(StreamBenchmarks::iterSumLoop);
  }
}

Possible output from the program:

Streams created with the rangeClosed() method:
  Size   Sequential Parallel
   1000   0.05681   0.11031
  10000   0.06698   0.13979
 100000   0.71274   0.52627
1000000   7.02237   4.37249
Streams created with the iterate() method:
  Size   Sequential Parallel
   1000   0.08645   0.34696
  10000   0.35687   1.27861
 100000   3.24083  11.38709
1000000  29.92285 117.87909
Iterative solution with an explicit loop:
  Size   Iterative
   1000   0.00586
  10000   0.02330
 100000   0.22352
1000000   2.49677

Side Effects

Efficient execution of parallel streams that produces the desired results requires the stream operations (and their behavioral parameters) to avoid certain side effects.

  • Non-interfering behaviors

    The behavioral parameters of stream operations should be non-interfering (p. 909)—both for sequential and parallel streams. Unless the stream data source is concurrent, the stream operations should not modify it during the execution of the stream. See building streams from collections (p. 897).

  • Stateless behaviors

    The behavioral parameters of stream operations should be stateless (p. 909)— both for sequential and parallel streams. A behavioral parameter implemented as a lambda expression should not depend on any state that might change during the execution of the stream pipeline. The results from a stateful behavioral parameter can be nondeterministic or even incorrect. For a stateless behavioral parameter, the results are always the same.

    Shared state that is accessed by the behavior parameters of stream operations in a pipeline is not a good idea. Executing the pipeline in parallel can lead to race conditions in accessing the global state, and using synchronization code to provide thread-safety may defeat the purpose of parallelization. Using the three-argument reduce() or collect() method can be a better solution to encapsulate shared state.

    The intermediate operations distinct(), skip(), limit(), and sorted() are stateful (p. 915, p. 915, p. 917, p. 929). See also Table 16.3, p. 938. They can carry extra performance overhead when executed in a parallel stream, as such an operation can entail multiple passes over the data and may require significant data buffering.

Ordering

An ordered stream (p. 891) processed by operations that preserve the encounter order will produce the same results, regardless of whether it is executed sequentially or in parallel. However, repeated execution of an unordered stream— sequential or parallel—can produce different results.

Preserving the encounter order of elements in an ordered parallel stream can incur a performance penalty. The performance of an ordered parallel stream can be improved if the ordering constraint is removed by calling the unordered() intermediate operation on the stream (p. 932).

The three stateful intermediate operations distinct(), skip(), and limit() can improve performance in a parallel stream that is unordered, as compared to one that is ordered (p. 915, p. 915, p. 917). The distinct() operation need only buffer any occurrence of a duplicate value in the case of an unordered parallel stream, rather than the first occurrence. The skip() operation can skip any n elements in the case of an unordered parallel stream, not necessarily the first n elements. The limit() operation can truncate the stream after any n elements in the case of an unordered parallel stream, and not necessarily after the first n elements.

The terminal operation findAny() is intentionally nondeterministic, and can return any element in the stream (p. 952). It is specially suited for parallel streams.

The forEach() terminal operation ignores the encounter order, but the forEachOrdered() terminal operation preserves the order (p. 948). The sorted() stateful intermediate operation, on the other hand, enforces a specific encounter order, regardless of whether it executed in a parallel pipeline (p. 929).

Autoboxing and Unboxing of Numeric Values

As the Stream API allows both object and numeric streams, and provides support for conversion between them (p. 934), choosing a numeric stream when possible can offset the overhead of autoboxing and unboxing in object streams.

As we have seen, in order to take full advantage of parallel execution, composition of a stream pipeline must follow certain rules to facilitate parallelization. In summary, the benefits of using parallel streams are best achieved when:

  • The stream data source is of a sufficient size and the stream is easily splittable into substreams.

  • The stream operations have no adverse side effects and are computation-intensive enough to warrant parallelization.

InformIT Promotional Mailings & Special Offers

I would like to receive exclusive offers and hear about products from InformIT and its family of brands. I can unsubscribe at any time.

Overview


Pearson Education, Inc., 221 River Street, Hoboken, New Jersey 07030, (Pearson) presents this site to provide information about products and services that can be purchased through this site.

This privacy notice provides an overview of our commitment to privacy and describes how we collect, protect, use and share personal information collected through this site. Please note that other Pearson websites and online products and services have their own separate privacy policies.

Collection and Use of Information


To conduct business and deliver products and services, Pearson collects and uses personal information in several ways in connection with this site, including:

Questions and Inquiries

For inquiries and questions, we collect the inquiry or question, together with name, contact details (email address, phone number and mailing address) and any other additional information voluntarily submitted to us through a Contact Us form or an email. We use this information to address the inquiry and respond to the question.

Online Store

For orders and purchases placed through our online store on this site, we collect order details, name, institution name and address (if applicable), email address, phone number, shipping and billing addresses, credit/debit card information, shipping options and any instructions. We use this information to complete transactions, fulfill orders, communicate with individuals placing orders or visiting the online store, and for related purposes.

Surveys

Pearson may offer opportunities to provide feedback or participate in surveys, including surveys evaluating Pearson products, services or sites. Participation is voluntary. Pearson collects information requested in the survey questions and uses the information to evaluate, support, maintain and improve products, services or sites, develop new products and services, conduct educational research and for other purposes specified in the survey.

Contests and Drawings

Occasionally, we may sponsor a contest or drawing. Participation is optional. Pearson collects name, contact information and other information specified on the entry form for the contest or drawing to conduct the contest or drawing. Pearson may collect additional personal information from the winners of a contest or drawing in order to award the prize and for tax reporting purposes, as required by law.

Newsletters

If you have elected to receive email newsletters or promotional mailings and special offers but want to unsubscribe, simply email information@informit.com.

Service Announcements

On rare occasions it is necessary to send out a strictly service related announcement. For instance, if our service is temporarily suspended for maintenance we might send users an email. Generally, users may not opt-out of these communications, though they can deactivate their account information. However, these communications are not promotional in nature.

Customer Service

We communicate with users on a regular basis to provide requested services and in regard to issues relating to their account we reply via email or phone in accordance with the users' wishes when a user submits their information through our Contact Us form.

Other Collection and Use of Information


Application and System Logs

Pearson automatically collects log data to help ensure the delivery, availability and security of this site. Log data may include technical information about how a user or visitor connected to this site, such as browser type, type of computer/device, operating system, internet service provider and IP address. We use this information for support purposes and to monitor the health of the site, identify problems, improve service, detect unauthorized access and fraudulent activity, prevent and respond to security incidents and appropriately scale computing resources.

Web Analytics

Pearson may use third party web trend analytical services, including Google Analytics, to collect visitor information, such as IP addresses, browser types, referring pages, pages visited and time spent on a particular site. While these analytical services collect and report information on an anonymous basis, they may use cookies to gather web trend information. The information gathered may enable Pearson (but not the third party web trend services) to link information with application and system log data. Pearson uses this information for system administration and to identify problems, improve service, detect unauthorized access and fraudulent activity, prevent and respond to security incidents, appropriately scale computing resources and otherwise support and deliver this site and its services.

Cookies and Related Technologies

This site uses cookies and similar technologies to personalize content, measure traffic patterns, control security, track use and access of information on this site, and provide interest-based messages and advertising. Users can manage and block the use of cookies through their browser. Disabling or blocking certain cookies may limit the functionality of this site.

Do Not Track

This site currently does not respond to Do Not Track signals.

Security


Pearson uses appropriate physical, administrative and technical security measures to protect personal information from unauthorized access, use and disclosure.

Children


This site is not directed to children under the age of 13.

Marketing


Pearson may send or direct marketing communications to users, provided that

  • Pearson will not use personal information collected or processed as a K-12 school service provider for the purpose of directed or targeted advertising.
  • Such marketing is consistent with applicable law and Pearson's legal obligations.
  • Pearson will not knowingly direct or send marketing communications to an individual who has expressed a preference not to receive marketing.
  • Where required by applicable law, express or implied consent to marketing exists and has not been withdrawn.

Pearson may provide personal information to a third party service provider on a restricted basis to provide marketing solely on behalf of Pearson or an affiliate or customer for whom Pearson is a service provider. Marketing preferences may be changed at any time.

Correcting/Updating Personal Information


If a user's personally identifiable information changes (such as your postal address or email address), we provide a way to correct or update that user's personal data provided to us. This can be done on the Account page. If a user no longer desires our service and desires to delete his or her account, please contact us at customer-service@informit.com and we will process the deletion of a user's account.

Choice/Opt-out


Users can always make an informed choice as to whether they should proceed with certain services offered by InformIT. If you choose to remove yourself from our mailing list(s) simply visit the following page and uncheck any communication you no longer want to receive: www.informit.com/u.aspx.

Sale of Personal Information


Pearson does not rent or sell personal information in exchange for any payment of money.

While Pearson does not sell personal information, as defined in Nevada law, Nevada residents may email a request for no sale of their personal information to NevadaDesignatedRequest@pearson.com.

Supplemental Privacy Statement for California Residents


California residents should read our Supplemental privacy statement for California residents in conjunction with this Privacy Notice. The Supplemental privacy statement for California residents explains Pearson's commitment to comply with California law and applies to personal information of California residents collected in connection with this site and the Services.

Sharing and Disclosure


Pearson may disclose personal information, as follows:

  • As required by law.
  • With the consent of the individual (or their parent, if the individual is a minor)
  • In response to a subpoena, court order or legal process, to the extent permitted or required by law
  • To protect the security and safety of individuals, data, assets and systems, consistent with applicable law
  • In connection the sale, joint venture or other transfer of some or all of its company or assets, subject to the provisions of this Privacy Notice
  • To investigate or address actual or suspected fraud or other illegal activities
  • To exercise its legal rights, including enforcement of the Terms of Use for this site or another contract
  • To affiliated Pearson companies and other companies and organizations who perform work for Pearson and are obligated to protect the privacy of personal information consistent with this Privacy Notice
  • To a school, organization, company or government agency, where Pearson collects or processes the personal information in a school setting or on behalf of such organization, company or government agency.

Links


This web site contains links to other sites. Please be aware that we are not responsible for the privacy practices of such other sites. We encourage our users to be aware when they leave our site and to read the privacy statements of each and every web site that collects Personal Information. This privacy statement applies solely to information collected by this web site.

Requests and Contact


Please contact us about this Privacy Notice or if you have any requests or questions relating to the privacy of your personal information.

Changes to this Privacy Notice


We may revise this Privacy Notice through an updated posting. We will identify the effective date of the revision in the posting. Often, updates are made to provide greater clarity or to comply with changes in regulatory requirements. If the updates involve material changes to the collection, protection, use or disclosure of Personal Information, Pearson will provide notice of the change through a conspicuous notice on this site or other appropriate way. Continued use of the site after the effective date of a posted revision evidences acceptance. Please contact us if you have questions or concerns about the Privacy Notice or any objection to any revisions.

Last Update: November 17, 2020