implementation "org.grails.plugins:async:{version}"
Grails Async Framework
Authors: Graeme Rocher
Version: 6.0.0-M1
Table of Contents
1 Introduction
With modern hardware featuring multiple cores, many programming languages have been adding asynchronous, parallel programming APIs, Groovy being no exception.
Added Grails 2.3 and since 3.3 an external project, the Async features of Grails aim to simplify concurrent programming within the framework and include the concept of Promises and a unified event model.
2 Promises
A Promise is a concept being embraced by many concurrency frameworks. They are similar to java.util.concurrent.Future
instances, but include a more user friendly exception handling model, useful features like chaining and the ability to attach listeners.
To use the Grails Promise abstraction you should add a dependency on the async
plugin to your build.gradle
file:
Promise Basics
In Grails the grails.async.Promises
class provides the entry point to the Promise API:
import static grails.async.Promises.*
To create promises you can use the task
method, which returns an instance of the grails.async.Promise
interface:
def p1 = task { 2 * 2 }
def p2 = task { 4 * 4 }
def p3 = task { 8 * 8 }
assert [4,16,64] == waitAll(p1, p2, p3)
The waitAll
method waits synchronously, blocking the current thread, for all of the concurrent tasks to complete and returns the results.
If you prefer not to block the current thread you can use the onComplete
method:
onComplete([p1,p2,p3]) { List results ->
assert [4,16,64] == results
}
The waitAll
method will throw an exception if an error occurs executing one of the promises. The originating exception will be thrown. The onComplete
method, however, will simply not execute the passed closure if an exception occurs. You can register an onError
listener if you wish to handle exceptions without blocking:
onError([p1,p2,p3]) { Throwable t ->
println "An error occured ${t.message}"
}
If you have just a single long running promise then the grails.async.Promise
interface provides a similar API on the promise itself. For example:
import static java.util.concurrent.TimeUnit.*
import static grails.async.Promises.*
Promise p = task {
// Long running task
}
p.onError { Throwable err ->
println "An error occured ${err.message}"
}
p.onComplete { result ->
println "Promise returned $result"
}
// block until result is called
def result = p.get()
// block for the specified time
def result = p.get(1,MINUTES)
The PromiseFactory Interface
By default, the Promises
static methods use an instance of PromiseFactory
. This PromiseFactory
interface has various implementations. The default implementation is CachedThreadPoolPromiseFactory which uses a thread pool that will create threads as needed (the same as java.util.concurrent.Executors.newCachedThreadPool()
)
However, the design of the Grails promises framework is such that you can swap out the underlying implementation for your own or one of the pre-supported implementations. For example to use RxJava 1.x simply add the RxJava dependency to build.gradle
:
runtimeOnly "org.grails:grails-async-rxjava:{version}"
With the above in place RxJava 1.x will be used to create Promise
instances.
The following table summarizes the available implementation and the dependency that should be added to activate them:
Framework | Dependency | Implementation Class |
---|---|---|
GPars 1.2.x |
|
|
RxJava 1.2.x |
|
|
RxJava 2.x |
|
|
You can also override the grails.async.PromiseFactory
class used by Promises
by setting the promiseFactory
static field.
One common use case for this is unit testing, typically you do not want promises to execute asynchronously during unit tests, as this makes tests harder to write. For this purpose Grails ships with a org.grails.async.factory.SynchronousPromiseFactory
instance that makes it easier to test promises:
import org.grails.async.factory.*
import grails.async.*
Promises.promiseFactory = new SynchronousPromiseFactory()
Using the PromiseFactory
mechanism it is theoretically possible to plug in other concurrency libraries into the Grails framework. For this you need to override the two interfaces grails.async.Promise
and grails.async.PromiseFactory
.
Promise Chaining
It is possible to chain several promises and wait for the chain to complete using the then
method:
final polish = { ... }
final transform = { ... }
final save = { ... }
final notify = { ... }
Promise promise = task {
// long running task
}
promise.then polish then transform then save then {
// notify end result
}
If an exception occurs at any point in the chain it will be propagated back to the caller and the next step in the chain will not be called.
Promise Lists and Maps
Grails' async API also features the concept of a promise lists and maps. These are represented by the grails.async.PromiseList
and grails.async.PromiseMap
classes respectively.
The easiest way to create a promise list or map is via the tasks
method of the Promises
class:
import static grails.async.Promises.*
def promiseList = tasks([{ 2 * 2 }, { 4 * 4}, { 8 * 8 }])
assert [4,16,64] == promiseList.get()
The tasks
method, when passed a list of closures, returns a PromiseList
. You can also construct a PromiseList
manually:
import grails.async.*
def list = new PromiseList()
list << { 2 * 2 }
list << { 4 * 4 }
list << { 8 * 8 }
list.onComplete { List results ->
assert [4,16,64] == results
}
The PromiseList class does not implement the java.util.List interface, but instead returns a java.util.List from the get() method
|
Working with PromiseMap
instances is largely similar. Again you can either use the tasks
method:
import static grails.async.Promises.*
def promiseMap = tasks one:{ 2 * 2 },
two:{ 4 * 4},
three:{ 8 * 8 }
assert [one:4,two:16,three:64] == promiseMap.get()
Or construct a PromiseMap
manually:
import grails.async.*
def map = new PromiseMap()
map['one'] = { 2 * 2 }
map['two'] = { 4 * 4 }
map['three'] = { 8 * 8 }
map.onComplete { Map results ->
assert [one:4,two:16,three:64] == results
}
DelegateAsync Transformation
It is quite common to require both synchronous and asynchronous versions of the same API. Developing both can result in a maintenance problem as typically the asynchronous API would simply delegate to the synchronous version.
The DelegateAsync
transformation is designed to mitigate this problem by transforming any synchronous API into an asynchronous one.
For example, consider the following service:
class BookService {
List<Book> findBooks(String title) {
// implementation
}
}
The findBooks
method executes synchronously in the same thread as the caller. To make an asynchronous version of this API you can define another class as follows:
import grails.async.*
class AsyncBookService {
@DelegateAsync BookService bookService
}
The DelegateAsync
transformation will automatically add a new method that looks like the following to the AsyncBookService
class:
Promise<List<Book>> findBooks(String title) {
Promises.task {
bookService.findBooks(title)
}
}
As you see the transform adds equivalent methods that return a Promise and execute asynchronously.
The AsyncBookService
can then be injected into other controllers and services and used as follows:
AsyncBookService asyncBookService
def findBooks(String title) {
asyncBookService.findBooks(title)
.onComplete { List results ->
println "Books = ${results}"
}
}
3 Events
Grails 3.3 introduces a new Events API that replaces the previous implementation that was based on Reactor 2.x (which is no longer maintained and deprecated).
In Grails 3.3 and above a new EventBus abstraction has been introduced. Like the PromiseFactory
notion, there are implementations of the EventBus
interface for common asynchronous frameworks like GPars and RxJava.
To use the Grails Events abstraction you should add a dependency on the events
plugin to your build.gradle
file:
implementation "org.grails.plugins:events:{version}"
If no asynchronous framework is present on the classpath then by default Grails creates an EventBus based off of the currently active PromiseFactory
. The default implementation is CachedThreadPoolPromiseFactory which uses a thread pool that will create threads as needed (the same as java.util.concurrent.Executors.newCachedThreadPool()
).
If you wish to use a popular async framework such as RxJava as the EventBus
implementation then you will need to add the appropriate dependency. For example for RxJava 1.x:
runtimeOnly "org.grails:grails-events-rxjava:{version}"
The following table summarizes async framework support and the necessary dependency:
Framework | Dependency | Implementation Class |
---|---|---|
GPars 1.2.x |
|
|
RxJava 1.2.x |
|
|
RxJava 2.x |
|
|
3.1 Event Publishing
The simplest way to trigger event notification is via the Publisher annotation.
For example:
import grails.events.annotation.*
...
Unresolved directive in <stdin> - include::{sourcedir}/grails-events-transform/src/test/groovy/grails/events/annotation/PubSubSpec.groovy[tags=publisher]
What the above does is take the return value of the method and publish an event using an event id
that is the same as the method name.
If you wish to change the event id
you can provide a value to @Publisher
:
@Publisher('myEvent')
If you want more flexiblity then you could simulate the behaviour of annotation by writing the following code:
import grails.events.*
...
Unresolved directive in <stdin> - include::{sourcedir}/grails-events-transform/src/test/groovy/grails/events/ManualPubSubSpec.groovy[tags=publisher]
Notice in the above example, the EventPublisher trait is explicitly implemented.
Although generally the annotation approach is recommended because it is simpler and more concise, the EventPublisher
trait does provide more flexibility in terms of being able to publish multiple events in a single method and so on.
3.2 Subscribing to Events
There are several ways to consume an event. The recommended way is to use the Subscriber annotation. Note that the class using this annotation needs to be a Spring bean.
For example:
import grails.events.annotation.*
...
Unresolved directive in <stdin> - include::{sourcedir}/grails-events-transform/src/test/groovy/grails/events/annotation/PubSubSpec.groovy[tags=subscriber]
In this example, every time a sum
event occurs the subscriber will be invoked.
Once again the method name is used by default for the event id, although it can start with the word "on". In other words either a method name of sum
or onSum
would work for the above example. Or alternatively you can provide the event id to subscribe to:
@Subscriber('myEvent')
If you wish to subscribe to events dynamically or need more flexibility, then another option is to interact with the EventBus
directly. For example:
import grails.events.bus.EventBusAware
import jakarta.annotation.PostConstruct
...
Unresolved directive in <stdin> - include::{sourcedir}/grails-events-transform/src/test/groovy/grails/events/ManualPubSubSpec.groovy[tags=subscriber]
In this example the TotalService
calls subscribe
and passes a closure within a method called init
. The init
method is annotated with the annotation @PostConstruct
so that is called after the EventBus
has been injected by Spring, ensuring it is only called once and the events are correctly subscribed to.
3.3 Reactor Spring Annotations
In Grails versions earlier than 3.3, Grails used Reactor 2.x which provided @Consumer
and @Selector
annotations to listen for events. For example:
import reactor.spring.context.annotation.*
@Consumer
class MyService {
@Selector('myEvent')
void myEventListener(Object data) {
println "GOT EVENT $data"
}
}
Within the grails-events-compat
dependency Grails 3.3 and above ship versions of these annotations that bridge and provide compatibility for applications upgrading to Grails 3.3, however these annotations are considered deprecated and the Subscriber annotation should be used instead which provides equivalent functionality.
3.4 Events from GORM
GORM defines a number of useful events that you can listen for.
To subscribe for an event just define a Subscriber
that receives the event type as an argument:
import grails.events.annotation.*
...
Unresolved directive in <stdin> - include::{sourcedir}/examples/pubsub-demo/src/main/groovy/pubsub/demo/BookSubscriber.groovy[tags=gorm, indent=0]
These events are triggered asynchronously, and so cannot cancel or manipulate the persistence operations. |
If you wish to a define a synchronous listener then you should instead use the Listener
annotation:
import grails.events.annotation.gorm.*
...
@Listener
void tagFunnyBooks(PreInsertEvent event) {
String title = event.getEntityAccess().getPropertyValue("title")
if(title?.contains("funny")) {
event.getEntityAccess().setProperty("title", "Humor - ${title}".toString())
}
}
If you plan to modify properties of an entity being inserted, use the EntityAccess object as shown above. If you set properties directly on the the entity it will be marked as dirty which will cause another update to be issued.
|
3.5 Events from Spring
Spring also fires a number of useful events. All events in the org.springframework
use the spring
namespace. However, these events are disabled by default as they add a performance overhead.
To enable Spring Event translation set grails.events.spring
to true
in application.yml
prior to defining subscribers.
For example:
import grails.events.annotation.*
import org.springframework.web.context.support.*
import org.springframework.boot.context.event.*
...
@Events(namespace="spring")
class MyService {
@Subscriber
void applicationStarted(ApplicationStartedEvent event) {
// fired when the application starts
}
@Subscriber
void servletRequestHandled(RequestHandledEvent event) {
// fired each time a request is handled
}
}
3.6 Configure the Default Event Bus
If you include one of the aforementioned concrete EventBus
implementations for RxJava or GPars then it is a matter of configuring the appropriate implementation.
With RxJava 1.x this is done with RxJavaHooks, with RxJava 2.x with RxJavaPlugins and with GPars with GParsConfig |
If you use the default implementation then you can override the thread pool used by the EventBus
by registering the appropriate bean in grails-app/conf/spring/resources.groovy
:
import org.grails.events.bus.*
import java.util.concurrent.*
beans = {
eventBus(ExecutorEventBus, Executors.newFixedThreadPool(5))
}
4 Asynchronous GORM
Since Grails 2.3, GORM features an asynchronous programming model that works across all supported datastores (Hibernate, MongoDB etc.).
Although GORM executes persistence operations asynchronously, these operations still block as the underlying database drivers are not asynchronous. Asynchronous GORM is designed to allow you to isolate these blocking operations onto a separate thread you can scale and control allowing your controller layer to remain non-blocking. |
The AsyncEntity Trait
Since Grails 3.3, the asynchronous part of GORM is optional. To enable it you first need to add the grails-datastore-gorm-async
dependency to build.gradle
:
implementation "org.grails:grails-datastore-gorm-async"
Then in your domain classes you wish to allow asynchronous processing you should use the AsyncEntity
trait:
import grails.gorm.async.*
class MyEntity implements AsyncEntity<MyEntity> {
//...
}
Async Namespace
The AsyncEntity
entity trait provides an async
namespace that exposes all of the GORM methods in an asynchronous manner.
For example, the following code listing reads 3 objects from the database asynchronously:
import static grails.async.Promises.*
def p1 = Person.async.get(1L)
def p2 = Person.async.get(2L)
def p3 = Person.async.get(3L)
def results = waitAll(p1, p2, p3)
Using the async
namespace, all the regular GORM methods are available (even dynamic finders), but instead of executing synchronously, the query is run in the background and a Promise
instance is returned.
The following code listing shows a few common examples of GORM queries executed asynchronously:
import static grails.async.Promises.*
Person.async.list().onComplete { List results ->
println "Got people = ${results}"
}
def p = Person.async.getAll(1L, 2L, 3L)
List results = p.get()
def p1 = Person.async.findByFirstName("Homer")
def p2 = Person.async.findByFirstName("Bart")
def p3 = Person.async.findByFirstName("Barney")
results = waitAll(p1, p2, p3)
Async and the Session
When using GORM async each promise is executed in a different thread. Since the Hibernate session is not concurrency safe, a new session is bound per thread.
This is an important consideration when using GORM async (particularly with Hibernate as the persistence engine). The objects returned from asynchronous queries will be detached entities.
This means you cannot save objects returned from asynchronous queries without first merging them back into session. For example the following will not work:
def promise = Person.async.findByFirstName("Homer")
def person = promise.get()
person.firstName = "Bart"
person.save()
Instead you need to merge the object with the session bound to the calling thread. The above code needs to be written as:
def promise = Person.async.findByFirstName("Homer")
def person = promise.get()
person.merge()
person.firstName = "Bart"
Note that merge()
is called first because it may refresh the object from the cache or database, which would result in the change being lost. In general it is not recommended to read and write objects in different threads and you should avoid this technique unless absolutely necessary.
Finally, another issue with detached objects is that association lazy loading will not work and you will encounter LazyInitializationException
errors if you do so. If you plan to access the associated objects of those returned from asynchronous queries you should use eager queries (which is recommended anyway to avoid N+1 problems).
Multiple Asynchronous GORM calls
As discussed in the previous section you should avoid reading and writing objects in different threads as merging tends to be inefficient.
However, if you wish to do more complex GORM work asynchronously then the GORM async namespace provides a task
method that makes this possible. For example:
def promise = Person.async.task {
withTransaction {
def person = findByFirstName("Homer")
person.firstName = "Bart"
person.save(flush:true)
}
}
Person updatedPerson = promise.get()
Note that the GORM task
method differs from the static Promises.task
method in that it deals with binding a new session to the asynchronous thread for you. If you do not use the GORM version and do asynchronous work with GORM then you need to do this manually. Example:
import static grails.async.Promises.*
def promise = task {
Person.withNewSession {
// your logic here
}
}
Async DetachedCriteria
The DetachedCriteria
class also supports the async
namespace. For example you can do the following:
DetachedCriteria query = Person.where {
lastName == "Simpson"
}
def promise = query.async.list()
5 RxJava Support
Since Grails 3.2, you can use RxJava to write reactive logic in your Grails controllers that leverages the underlying containers asynchronous processing capabilities.
To get started simply declare a dependency on the plugin in build.gradle
:
dependencies {
//...
implementation 'org.grails.plugins:rxjava'
}
You can then return rx.Observable
as a return value from any controller and Grails will automatically apply the following steps:
-
Create a new asynchronous request
-
Spawn a new thread that subscribes to the observable
-
When the observable emits a result, process the result using the respond method.
For more detailed instructions on how to use the RxJava plugin see the user guide documentation for the plugin.
5.1 Server Sent Events
Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C.
The RxJava plugin adds support for SSE to Grails making it simple to write controllers that maintain continuous non-blocking communication with a JavaScript client.
For example:
def index() {
rx.stream { Observer observer ->
for(i in (0..5)) {
if(i % 2 == 0) {
observer.onNext(
rx.render("Tick")
)
}
else {
observer.onNext(
rx.render("Tock")
)
}
sleep 1000
}
observer.onCompleted()
}
}
Call the stream method passing a closure that accepts an rx.Subscriber to start sending events |
|
Emit a one or many items using onNext |
|
Call sleep to simulate a slow request |
|
Call onCompleted to complete the request |
For more detailed instructions on how to use SSE and the RxJava plugin see the user guide documentation for the plugin.
5.2 RxGORM
RxGORM is new implementation of GORM that has the following goals:
-
Reactive
-
Non-blocking
-
Stateless
-
Simple
RxGORM, unlike the Asynchronous GORM implementation, aims to be truly non-blocking, down to the driver level.
The following in an example of RxGORM in action:
Book.get(id)
.subscribe { Book it ->
println "Title = ${it.title}"
}
You can combine RxGORM with the RxJava plugin to implement reactive responses from Grails controllers. For example:
def show() {
// returns an rx.Observable
Book.get(params.id?.toString())
}
For more information on how to use RxGORM, see the RxGORM user guide.
6 Asynchronous Request Handling
If you are deploying to a Servlet 3.0 container such as Tomcat 7 and above then it is possible to deal with responses asynchronously.
In general for controller actions that execute quickly there is little benefit in handling requests asynchronously. However, for long running controller actions it is extremely beneficial.
The reason being that with an asynchronous / non-blocking response, the one thread == one request == one response relationship is broken. The container can keep a client response open and active, and at the same time return the thread back to the container to deal with another request, improving scalability.
For example, if you have 70 available container threads and an action takes a minute to complete, if the actions are not executed in a non-blocking fashion the likelihood of all 70 threads being occupied and the container not being able to respond is quite high and you should consider asynchronous request processing.
Since Grails 2.3, Grails features a simplified API for creating asynchronous responses built on the Promise
mechanism discussed previously.
The implementation is based on Servlet 3.0 async. So, to enable the async features you need to set your servlet target version to 3.0 in application.yml:
grails:
servlet:
version: 3.0
Async Models
A typical activity in a Grails controller is to produce a model (a map of key/value pairs) that can be rendered by a view.
If the model takes a while to produce then the server could arrive at a blocking state, impacting scalability. You tell Grails to build the model asynchronously by returning a grails.async.PromiseMap
via the Promises.tasks
method:
import static grails.async.web.WebPromises.*
...
def index() {
tasks books: Book.async.list(),
totalBooks: Book.async.count(),
otherValue: {
// do hard work
}
}
Grails will handle the response asynchronously, waiting for the promises to complete before rendering the view. The equivalent synchronous action of the above is:
def index() {
def otherValue = ...
[ books: Book.list() ,
totalBooks: Book.count(),
otherValue: otherValue ]
}
You can even render different view by passing the PromiseMap
to the model
attribute of the render
method:
import static grails.async.web.WebPromises.*
//...
def index() {
render view:"myView", model: tasks( one:{ 2 * 2 },
two:{ 3 * 3 } )
}
Async Response Rendering
You can also write to the response asynchronously using promises in Grails 2.3 and above:
import static grails.async.web.WebPromises.*
class StockController {
def stock(String ticker) {
task {
ticker = ticker ?: 'GOOG'
def url = new URL("http://download.finance.yahoo.com/d/quotes.csv?s=${ticker}&f=nsl1op&e=.csv")
Double price = url.text.split(',')[-1] as Double
render "ticker: $ticker, price: \$price"
}
}
}
The above example using Yahoo Finance to query stock prices, executing asynchronously and only rendering the response once the result has been obtained. This is done by returning a Promise
instance from the controller action.
If the Yahoo URL is unresponsive the original request thread will not be blocked and the container will not become unresponsive.
7 Servlet 3.0 Async
In addition to the higher level async features discussed earlier in the section, you can access the raw Servlet 3.0 asynchronous API from a Grails application.
Servlet 3.0 Asynchronous Rendering
To do so your should first implement the grails.async.web.AsyncController
trait in your controller:
import grails.async.web.*
...
class BookController implements AsyncController {
...
}
You can render content (templates, binary data etc.) in an asynchronous manner by calling the startAsync
method which returns an instance of the Servlet 3.0 AsyncContext
. Once you have a reference to the AsyncContext
you can use Grails' regular render method to render content:
def index() {
def ctx = startAsync()
ctx.start {
new Book(title:"The Stand").save()
render template:"books", model:[books:Book.list()]
ctx.complete()
}
}
Note that you must call the complete()
method to terminate the connection.
Resuming an Async Request
You resume processing of an async request (for example to delegate to view rendering) by using the dispatch
method of the AsyncContext
class:
def index() {
def ctx = startAsync()
ctx.start {
// do working
...
// render view
ctx.dispatch()
}
}