JEP 428: Structured Concurrency (Incubator)

AuthorsAlan Bateman, Ron Pressler
OwnerAlan Bateman
TypeFeature
ScopeJDK
StatusCandidate
Componentcore-libs
Discussionloom dash dev at openjdk dot java dot net
Reviewed byAlex Buckley, Brian Goetz
Created2021/11/15 15:01
Updated2022/05/18 03:53
Issue8277129

Summary

Simplify multithreaded programming by introducing a library for structured concurrency. Structured concurrency treats multiple tasks running in different threads as a single unit of work, thereby streamlining error handling and cancellation, improving reliability, and enhancing observability. This is an incubating API.

Goals

Non-Goals

Motivation

Java developers manage complexity by breaking down a task into multiple subtasks. In ordinary single-threaded code, the subtasks execute sequentially. However, if the subtasks are sufficiently independent of each other, and if there are sufficient hardware resources, then the task can be made to run faster (lower latency) by executing the subtasks concurrently. For example, a task that composes the results of multiple I/O operations will run faster if each I/O operation executes in its own thread. The availability of virtual threads makes it cost-effective to dedicate a thread to each I/O operation.

Unstructured concurrency with ExecutorService

Developers have traditionally used java.util.concurrent.ExecutorService, introduced in Java 5, to execute subtasks concurrently.

Here is a method, handle, that represents a task in a server application. It handles an incoming request by submitting two subtasks to an ExecutorService, es. One subtask executes the method findUser and the other subtask executes the method fetchOrder. The ExecutorService immediately returns a Future for each subtask, and executes each subtask in its own thread. handle awaits their results via blocking calls to Future.get – the task is said to join its subtasks.

Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = es.submit(() -> findUser());
    Future<Integer> order = es.submit(() -> fetchOrder());
    String theUser  = user.get();   // Join findUser 
    int    theOrder = order.get();  // Join fetchOrder
    return new Response(theUser, theOrder);
}

Because the subtasks execute concurrently, each can succeed or fail independently. (Failure means to throw an exception.) Often, a task like handle should fail if any of its subtasks fail. Understanding the lifetimes of the threads can be surprisingly complicated when failure occurs:

In each case, the problem is that our program is logically structured with a task-subtask relationship, but this relationship exists only in the programmer's mind. This not only creates more room for error, but it makes diagnosing and troubleshooting such errors more difficult because observability tools, such as thread dumps, will show handle, findUser, and fetchOrder on the call stacks of unrelated threads, with no hint of the task-subtask relationship.

We might attempt to do better by explicitly cancelling other subtasks when an error is detected, such as by wrapping tasks with try-finally and invoking the Future.cancel method on the other tasks in the catch block for the failing task. We'd also need to use the ExecutorService in a try-with-resources statement (as shown in JEP 425, the second example), because Future does not offer a way to wait for a task that's been cancelled. But this work can be very tricky to get right, and is often makes the logical intent of the code harder to discern. Keeping track of the inter-task relationship, and manually adding back the required inter-task cancellation edges, is asking a lot of developers.

The need to manually coordinate lifetimes comes from the fact that ExecutorService and Future allow unrestricted patterns of concurrency, without any constraints or ordering on the threads involved. One thread can create an ExecutorService, a second thread can submit work to it, and the threads which execute the work have no relationship to either the first or second thread. Moreover, after a thread has submitted work, a completely different thread can await the results of execution. Any code with a reference to a Future can join it (i.e., await its result by calling get()), even code in a thread other than the one which obtained the Future. In effect, a subtask started by one task does not have to "return" to the task that submitted it; it could "return" to any number of tasks, or even none.

Because ExecutorService and Future allow for such "unstructured" use, they do not enforce or even understand any relationships among tasks and subtasks, even though such relationships are quite common and useful. Accordingly, even when subtasks are submitted and joined in the same task, the failure of one subtask cannot automatically cause cancellation of another; in handle, the failure of fetchOrder cannot automatically cause cancellation of findUser. The Future for fetchOrder is unaware of the sibling Future for findUser, and neither knows which thread will join it via Future.get. We want to ensure that such cancellation can be reliably automated, rather than asking developers to manage it manually.

Task structure should reflect code structure

In contrast to the freewheeling assortment of threads under ExecutorService, the execution of single-threaded code always enforces a hierarchy of tasks and subtasks. The {...} block of a method corresponds to a task, and the methods invoked within the block correspond to subtasks. An invoked method must return (or throw) to the method that invoked it; it cannot outlive the method that invoked it, nor return or throw to a different method. Thus, all subtasks finish before the task, and the lifetime of each subtask relative to each other and to the task is governed by the syntactic block structure of the code.

For example, in the single-threaded version of handle below, the task-subtask relationship is apparent from the syntactic structure:

Response handle() throws IOException {
    String theUser  = findUser();
    int    theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

We don't start the fetchOrder subtask until the findUser subtask has completed, whether successfully or unsuccessfully. If findUser fails, we don't start fetchOrder at all, and the handle task fails implicitly. The fact that a subtask can only return to its parent is extremely significant. It means that the parent task can implicitly treat the failure of one subtask as a trigger to "cancel" all remaining subtasks as well as abort the parent task.

When the task-subtask hierarchy is defined by the call stack, we get the parent-child relationship, which flows into error-propagation, for free. Moreover, this relationship is reified in the call stack at runtime. When observing a single thread, the hierarchical relationship is obvious: findUser (and later fetchOrder) appear subordinate to handle.

Multithreaded programming would be considerably more reliable and observable if the parent-child relationship between a task and a subtask were reified. This would allow a child to report a result or exception only to its parent — the unique task that owns all the subtasks — which, then, could implicitly cancel the remaining subtasks.

The following properties, which we have in sequential code, would give us similar benefits for concurrent code:

  1. In source code, a syntactic block structure that delineates and enforces the lifetimes of operations; such structure, in turn, would impose,
  2. At run time, a representation of the inter-thread hierarchy that is analogous to the intra-thread call stack, so as to support error propagation and cancellation for reliability, and to allow observation of the concurrent program in a meaningful way.

(The JDK already has mechanisms that impose structure on concurrent tasks, such as java.util.concurrent.ForkJoinPool, the execution engine behind parallel streams. However, that mechanism is designed for compute-intensive tasks rather than tasks which involve I/O.)

Structured Concurrency for multithreaded code

Structured Concurrency is an approach to multithreaded programming that preserves the readability and maintainability enjoyed by developers of single-threaded code. It is the principle that if a task splits into concurrent subtasks, they all return to the same place: the task's code block.

The term "structured concurrency" was coined by Martin Sústrik and popularized by Nathaniel J. Smith. Ideas from other languages, such as Erlang's hierarchical supervisors, inform the design of error handling in structured concurrency.

By "returning" to the same code block, the lifetime of a concurrent subtask is confined to a syntactic block. Because the lifetime of all sibling subtasks are confined to the same block, they can be reasoned-about and managed as a unit; because that block is nested in that of the parent task, it induces a hierarchy that can be reified in a manner similar to the call stack. Subtasks work on behalf of a task — code in the enclosing block — that awaits their results and monitors them for failures. As with structured programming techniques for code in a single thread, the power of structured concurrency for multiple threads comes from two ideas: (1) well-defined entry and exit points for the flow of execution through a block of code, and (2) a strict nesting of the lifetime of operations in a way that mirrors their nesting in the code.

At run time, structured concurrency builds a tree-shaped hierarchy of tasks, with sibling subtasks being owned by the same parent task; the tree is the concurrent counterpart to the call stack of a single thread, and observability tools use it to present subtasks as subordinate to their parent tasks.

Structured concurrency is a great match for virtual threads. Virtual threads are a lightweight implementation of threads provided by the JDK. Many virtual threads share the same OS thread, allowing for very large numbers of virtual threads. In addition to being plentiful, virtual threads are cheap enough to represent any concurrent unit of behavior, even behavior that involves I/O. This means that a server application could use structured concurrency to process thousands or millions of incoming requests at once: it would dedicate a new virtual thread to the task of handling a request, and when the task "fans out" by submitting subtasks for concurrent execution, it would dedicate a new virtual thread to each subtask. Behind the scenes, the task-subtask relationship would be reified by outfitting each virtual thread with its unique owner, so it knows its place, similar to how a frame in the call stack knows its unique caller.

In summary, virtual threads deliver an abundance of threads, and structured concurrency ensures they are correctly and robustly coordinated. Observability tools will see threads organized in the logical manner intended by the developer. Having a library for structured concurrency in the JDK offers maintainability and reliability to all developers of server applications.

Description

The class StructuredTaskScope allows developers to structure a task as a family of concurrent subtasks, and to coordinate them as a unit. Subtasks are created in their own threads by forking them individually, but are then joined as a unit and possibly cancelled as a unit; their exceptions or successful results are aggregated and handled by the parent task. It confines the lifetimes of the subtasks, or forks, to a clear lexical scope where all of a task's interactions with its subtasks — forking, joining, cancelling, handling errors, and composing results — takes place.

Here is the handle example from earlier, written to use StructuredTaskScope (the meaning of ShutdownOnFailure is explained below):

Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String>  user  = scope.fork(() -> findUser()); 
        Future<Integer> order = scope.fork(() -> fetchOrder());

        scope.join();          // Join both forks
        scope.throwIfFailed(); // ... and propagate errors

        // Here, both forks have succeeded, so compose their results
        return new Response(user.resultNow(), order.resultNow());
    }
}

In contrast to the original example, understanding the lifetimes of the threads involved is easy: under all conditions, the lifetimes are confined to a lexical scope: body of the try-with-resources. Furthermore, this code gets a number of desirable properties for free:

  1. Error handling with short-circuiting: If either findUser or fetchOrder fail, the other will be cancelled if it hasn't yet completed (this is managed by the cancellation policy implemented by ShutdownOnFailure; other policies are possible too).

  2. Cancellation Propagation: If the thread running handle is interrupted before or during the call to join, both forks will be automatically cancelled when the scope is exited.

  3. Clarity: The above code has a clear structure: set up the child subtasks, wait for them (either to complete or to be canceled), and then decide whether to succeed (and process the results of the child tasks, which are already finished) or fail (and the subtasks are already finished, so there's nothing more to clean up.)

  4. Observability: A thread dump, as described below, will clearly demonstrate the task hierarchy, with the threads running findUser and fetchOrder shown as children of the scope.

Like ExecutorService.submit, StructuredTaskScope.fork takes a Callable and returns a Future. Unlike ExecutorService, the returned Future is not joined via Future.get. The use of StructuredTaskScope encourages the joining or cancelling of all forks as a single unit, obviating the need for Future.get or Future.cancel. New Future methods, resultNow and exceptionNow are designed to be used after it is known that the task is complete, such as when following a StructuredTaskScope.join.

Using StructuredTaskScope

The general workflow of code using StructuredTaskScope to structure a task is as follows:

  1. Create a scope. The thread that creates the scope is its owner.

  2. Fork concurrent subtasks in the scope.

  3. Any of the forks in the scope, or the scope's owner, may call shutdown to request cancelation of all remaining subtasks.

  4. The scope's owner joins the scope — i.e. all of its forks — as a unit. join is a blocking call; it will return when all forks have either completed (successfully or not) or been cancelled with shutdown (see below). As an alternative, joinUntil accepts a deadline.

  5. After joining, handle any errors in the forks and process their results (more examples follow).

  6. Close the scope, usually implicitly thanks to try-with-resources. This shuts down the scope and waits for any straggler forks to complete.

If the owner is already a member of an existing scope (i.e. created as a fork in one), then that scope effectively becomes the parent of the new scope; tasks form a tree, with scopes as the intermediate nodes and threads as the leaves.

Every fork runs in its own newly created thread, which by default is a virtual thread. The forks' threads are owned by the scope, which in turn is owned by its creating thread, thus forming a hierarchy. Any fork can create its own nested StructuredTaskScope to fork its own subtasks, thus extending the hierarchy. That hierarchy is reflected in the code's block structure, which confines the lifetimes of the forks: all of the forks' threads are guaranteed to have terminated once the scope has closed, and no thread is left behind when the block exits.

Any fork in a scope, their own transitive forks, or the scope's owner, can call StructuredTaskScope.shutdown at any time to signify that the task is complete, even while other forks are still running. shutdown causes the threads of all forks that are still active in the scope to be interrupted; all forks should, therefore, be written in a way that is responsive to interruption. In effect, shutdown is the concurrent analog of the break statement in sequential code.

When join returns, all forks are known to have either completed (successfully or not) or been cancelled. Their result or exception can be obtained, without any additional blocking, using the Future.resultNow or Future.exceptionNow methods which have been added to Future. (These methods throw an IllegalStateException if called before the Future completes.)

Calling join or joinUntil within a scope is mandatory. If the block exits before the call to join, then the implicit closing of the scope will wait for all forks to terminate, and then throw an exception.

It is possible for the owner thread to be interrupted before or while joining — for example, if it is a fork of an enclosing StructuredTaskScope that's been shut down. If this occurs, join or joinUntil will throw an exception because there is no point in continuing – any results obtained by forks so far are insufficient, or the scope would have already been shut down. The try-with-resources statement will then shut down the scope, cancelling all forks (and waiting for them to terminate). This has the effect of automatically propagating the cancellation of the task to its subtasks. If the joinUntil deadline expires before the forks terminate or shutdown is called, then joinUntil will throw an exception; again, the scope will be shut down and the forks automatically cancelled.

The structured use of StructuredTaskScope is enforced at runtime. For example, attempts to call fork from a thread that is not in the tree hierarchy of the scope — i.e., the owner, the forks, and their own forks in nested StructuredTaskScopes — will fail with an exception. Code that uses StructuredTaskScope outside a try-with-resources block and returns without calling close() or does not maintain proper nesting of close() calls may experience StructureViolationExceptions thrown from StructuredTaskScope methods.

Because StructuredTaskScope enforces a proper structure and order on operations, it does not implement the ExecutorService or Executor interfaces, as they are commonly used in a non-structured way (see the Alternatives section). However, it is straightforward to migrate code that uses ExecutorService, but would benefit from structure, to use StructuredTaskScope.

StructuredTaskScope resides in an incubator module, excluded by default

The examples above use the StructuredTaskScope API, so to run them on JDK XX you must add the jdk.incubator.concurrent module, as well as enable preview features to use virtual threads as follows:

Shutdown policies

Certain "short-circuiting" patterns are common when dealing with concurrent subtasks, such as cancelling all subtasks if one of them fails (we call that pattern "invoke all"), or, alternatively, when one of them succeeds (we call that "invoke any"). To support these patterns, two subclasses of StructuredTaskScopeShutdownOnFailure and ShutdownOnSuccess— implement basic policies that shut down the scope upon the first fork failure or success, respectively. They also provide methods for handling exceptions and/or successful results.

Here is a StructuredTaskScope with a shutdown-on-failure policy (seen in the handle example above) that runs a collection of tasks concurrently, and fails if any of them fails:

<T> List<T> runAll(List<Callable<T>> tasks) throws Throwable {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<Future<T>> futures = tasks.stream().map(scope::fork).toList();
        scope.join();
        scope.throwIfFailed(e -> e);  // Propagate exception as-is if any fork fails
        // Here, all tasks have succeeded, so compose their results
        return futures.stream().map(Future::resultNow).toList();
    }
}

In contrast to the example above, it is sometimes desirable to finish a task early not if any fork fails but, rather, if any fork succeeds. For example, a server application may want to obtain a result from any one of a collection of redundant services. Here is a StructuredTaskScope with a shutdown-on-success policy that returns the result of the first successful subtask; it fails if all subtasks fail or a deadline elapses. The policy will automatically shut down the scope, cancelling active forks, as soon as one of them succeeds.

<T> T race(List<Callable<T>> tasks, Instant deadline) throws ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task);
        }
        scope.joinUntil(deadline);
        return scope.result(); // Throws if none of the forks completed successfully 
    } 
}

While these two policies are provided out of the box, custom policies that abstract other common patterns can be created by extending StructuredTaskScope and overriding the handleComplete method.

Fan-in uses

While the examples above focused on fanout, i.e., concurrently performing multiple outgoing I/O operations, StructuredTaskScope can be used in other ways, such as managing tasks that respond to multiple incoming I/O operations. Such uses will likely fork an unknown number of forks in response to incoming requests. Here is an example of a server that forks subtasks to handle incoming connections inside a StructuredTaskScope:

void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept();
                scope.fork(() -> handle(socket));
            }
        } finally {
            // if there's been an error or we're interrupted, we stop accepting
            // if we want to cancel all active connections, we also shut down:
            scope.shutdown();
            scope.join();
        }
    }
}

This will ensure all connections are closed before serve returns, and present all connection-handling subtasks as children of the scope's owner in a thread dump.

Observability improvements

The new thread dump added by JEP 425 is extended to support StructureTaskScope's grouping of threads into a hierarchy when generating a thread dump in JSON format. The jcmd command can be used to generate such a thread dump with:

$ jcmd <pid> JavaThread.dump -format=json <file>

The thread dump will include a JSON object for each StructuredTaskScope. The JSON object contains an array of the threads forked in the scope and their stack traces. The owner of a StructuredTaskScope will typically be blocked in the join method waiting for subtasks to complete; a thread dump makes it easy to see what the subtasks' threads are doing by showing the tree hierarchy imposed by structured concurrency. The JSON object for a StructuredTaskScope also has a reference to its parent so that the structure of the program can be reconstituted from the thread dump.

The com.sun.management.HotSpotDiagnosticsMXBean API can also be used to generate this thread dump. This API can also be invoked indirectly via the platform MBeanServer from a local or remote JMX tool. Future APIs may be introduced to improve diagnosability and debugging support.

Alternatives

Dependences

JEP 425: Virtual Threads (Preview)