Reactive Streams Support
Since version 3.0, ScalikeJDBC has introduced support for the Publisher interface of Reactive Streams. This allows you to subscribe to a stream of results from a database query, enhancing the library’s capabilities for handling data flows reactively.
scalikejdbc-streams is a Reactive Streams 1.0 compliant implementation, which passes the PublisherVerification
tests within the Reactive Streams TCK.
Originally, scalikejdbc-streams is assumed to be particularly suitable for batch applications that require processing large datasets. Note that you need to keep borrowing a connection while someone is still subscribing a stream provided by scalikejdbc-streams
. Also, be aware of the max size of connection pool and carefully monitor the state of the pool.
Setup
Add the following aditional dependency to your sbt project.
libraryDependencies += "org.scalikejdbc" %% "scalikejdbc-streams" % "4.3.2"
Usage
First Example
import scalikejdbc._
import scalikejdbc.streams._ // 1. import streams module. This provides SQL#iterator & DB#readOnlyStream.
import java.util.concurrent._
case class Company(id: Long, name: String, countryId: Option[Long], country: Option[Country] = None)
object Company extends SQLSyntaxSupport[Company] {
// 2. Make a StreamReadySQL object for create publisher.
// The StreamReadySQL is immutable, so you can reuse it like SQL object.
def streamBy(condition: SQLSyntax): StreamReadySQL[Company] = {
withSQL {
select.from(Company as m).where(condition)
}.map(Company(m.resultName)).iterator()
}
}
// ------------
// Prepare a connection pool in advance.
// https://scalikejdbc.org/documentation/configuration.html#scalikejdbc-config
// Prepare an ExecutionContext
implicit val publisherEC: ExecutionContext = ???
// 3. Get a publisher from DB (or NamedDB) object.
// You need to give a StreamReadySQL object to readOnlyStream method.
val publisher: DatabasePublisher[Company] = DB readOnlyStream {
Company.streamBy(sqls"id < 1000")
}
// 4. Give a Reactive Streams Subscriber to it.
publisher.subscribe(subscriber)
The DatabasePublisher
capsulates an ExecutionContext
to inside.
We also recommend using ExecutionContext
which is separate from the main thread pool.
The Subscriber example is here: Reactive Streams Example - AsyncSubscriber
Adjust DBSession attributes
At the current moment, scalikejdbc-streams
natively supports MySQL and PostgreSQL.
When using SQL#iterator
factory method normally, ScalikeJDBC automatically enables required settings to use cursor feature.
If you don’t prefer the behavior, you can customize adjusting DBSession attributes instead.
val publisher: DatabasePublisher[Int] = DB readOnlyStream {
sql"select id from users".map(r => r.int("id"))
.iterator
.withDBSessionForceAdjuster(session => {
session.conn.setAutoCommit(true)
})
}
Integrate with Akka Streams
It is easy to integrate scalikejdbc-streams
with another library that supports Reactive Streams. For example, Akka Streams, you can give a ScalikeJDBC’s publisher to Akka Streams Source as follows.
implicit val system = ActorSystem("streams")
import system.dispatcher
implicit val materializer = ActorMaterializer()
Source.fromPublisher(publisher).filter(_.id % 2 == 0).runForeach(println)