Reactive Streams Support


Since version 3.0, ScalikeJDBC has introduced support for the Publisher interface of Reactive Streams. This allows you to subscribe to a stream of results from a database query, enhancing the library’s capabilities for handling data flows reactively.

scalikejdbc-streams is a Reactive Streams 1.0 compliant implementation, which passes the PublisherVerification tests within the Reactive Streams TCK.

Originally, scalikejdbc-streams is assumed to be particularly suitable for batch applications that require processing large datasets. Note that you need to keep borrowing a connection while someone is still subscribing a stream provided by scalikejdbc-streams. Also, be aware of the max size of connection pool and carefully monitor the state of the pool.


Setup


Add the following aditional dependency to your sbt project.

libraryDependencies += "org.scalikejdbc" %% "scalikejdbc-streams" % "4.3.2"

Usage


First Example

import scalikejdbc._
import scalikejdbc.streams._  // 1. import streams module. This provides SQL#iterator & DB#readOnlyStream.
import java.util.concurrent._

case class Company(id: Long, name: String, countryId: Option[Long], country: Option[Country] = None)

object Company extends SQLSyntaxSupport[Company] {

  // 2. Make a StreamReadySQL object for create publisher.
  //    The StreamReadySQL is immutable, so you can reuse it like SQL object.
  def streamBy(condition: SQLSyntax): StreamReadySQL[Company] = {
    withSQL {
      select.from(Company as m).where(condition)
    }.map(Company(m.resultName)).iterator()
  }
}

// ------------
// Prepare a connection pool in advance.
// https://scalikejdbc.org/documentation/configuration.html#scalikejdbc-config

// Prepare an ExecutionContext
implicit val publisherEC: ExecutionContext = ???

// 3. Get a publisher from DB (or NamedDB) object.
//    You need to give a StreamReadySQL object to readOnlyStream method.
val publisher: DatabasePublisher[Company] = DB readOnlyStream {
  Company.streamBy(sqls"id < 1000")
}

// 4. Give a Reactive Streams Subscriber to it.
publisher.subscribe(subscriber)

The DatabasePublisher capsulates an ExecutionContext to inside. We also recommend using ExecutionContext which is separate from the main thread pool.

The Subscriber example is here: Reactive Streams Example - AsyncSubscriber


Adjust DBSession attributes

At the current moment, scalikejdbc-streams natively supports MySQL and PostgreSQL. When using SQL#iterator factory method normally, ScalikeJDBC automatically enables required settings to use cursor feature. If you don’t prefer the behavior, you can customize adjusting DBSession attributes instead.

val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r => r.int("id"))
    .iterator
    .withDBSessionForceAdjuster(session => {
      session.conn.setAutoCommit(true)
    })
}

Integrate with Akka Streams

It is easy to integrate scalikejdbc-streams with another library that supports Reactive Streams. For example, Akka Streams, you can give a ScalikeJDBC’s publisher to Akka Streams Source as follows.

implicit val system = ActorSystem("streams")
import system.dispatcher
implicit val materializer = ActorMaterializer()

Source.fromPublisher(publisher).filter(_.id % 2 == 0).runForeach(println)

If this webpage has a typo or something wrong, Please report or fix it. How?