Reactive Streams Support


Since ScalikejDBC 3.0, we support the Publisher of Reactive Streams to subscribe a stream from a database query. scalikejdbc-streams is a Reactive Streams 1.0 compliant implementation. It passes the existing unit tests to satisfy PublisherVerification in Reactive Streams TCK.

This library is originally assumed to be suitable for batch applications. You need to keep borrowing a connection while someone is subscribing a stream provided by scalikejdbc-streams. Be aware of the max size of connection pool and carefully monitor the state of the pool.


Setup


Add the following aditional dependency to your sbt project.

libraryDependencies += "org.scalikejdbc" %% "scalikejdbc-streams" % "4.2.1"

Usage


First Example

import scalikejdbc._
import scalikejdbc.streams._  // 1. import streams module. This provides SQL#iterator & DB#readOnlyStream.
import java.util.concurrent._

case class Company(id: Long, name: String, countryId: Option[Long], country: Option[Country] = None)

object Company extends SQLSyntaxSupport[Company] {

  // 2. Make a StreamReadySQL object for create publisher.
  //    The StreamReadySQL is immutable, so you can reuse it like SQL object.
  def streamBy(condition: SQLSyntax): StreamReadySQL[Company] = {
    withSQL {
      select.from(Company as m).where(condition)
    }.map(Company(m.resultName)).iterator()
  }
}

// ------------
// Prepare a connection pool in advance.
// https://scalikejdbc.org/documentation/configuration.html#scalikejdbc-config

// Prepare an ExecutionContext
implicit val publisherEC: ExecutionContext = ???

// 3. Get a publisher from DB (or NamedDB) object.
//    You need to give a StreamReadySQL object to readOnlyStream method.
val publisher: DatabasePublisher[Company] = DB readOnlyStream {
  Company.streamBy(sqls"id < 1000")
}

// 4. Give a Reactive Streams Subscriber to it.
publisher.subscribe(subscriber)

The DatabasePublisher capsulates an ExecutionContext to inside. We also recommend using ExecutionContext which is separate from the main thread pool.

The Subscriber example is here: Reactive Streams Example - AsyncSubscriber


Adjust DBSession attributes

At the current moment, scalikejdbc-streams natively supports MySQL and PostgreSQL. When using SQL#iterator factory method normally, ScalikeJDBC automatically enables required settings to use cursor feature. If you don’t prefer the behavior, you can customize adjusting DBSession attributes instead.

val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r => r.int("id"))
    .iterator
    .withDBSessionForceAdjuster(session => {
      session.conn.setAutoCommit(true)
    })
}

Integrate with Akka Streams

It is easy to integrate scalikejdbc-streams with another library that supports Reactive Streams. For example, Akka Streams, you can give a ScalikeJDBC’s publisher to Akka Streams Source as follows.

implicit val system = ActorSystem("streams")
import system.dispatcher
implicit val materializer = ActorMaterializer()

Source.fromPublisher(publisher).filter(_.id % 2 == 0).runForeach(println)

If this webpage has a typo or something wrong, Please report or fix it. How?