Reactive Streams Support

Since version 3.0, ScalikeJDBC has introduced support for the Publisher interface of Reactive Streams. This allows you to subscribe to a stream of results from a database query, enhancing the library’s capabilities for handling data flows reactively.

scalikejdbc-streams is a Reactive Streams 1.0 compliant implementation, which passes the PublisherVerification tests within the Reactive Streams TCK.

Originally, scalikejdbc-streams is assumed to be particularly suitable for batch applications that require processing large datasets. Note that you need to keep borrowing a connection while someone is still subscribing a stream provided by scalikejdbc-streams. Also, be aware of the max size of connection pool and carefully monitor the state of the pool.


Add the following aditional dependency to your sbt project.

libraryDependencies += "org.scalikejdbc" %% "scalikejdbc-streams" % "4.3.0"


First Example

import scalikejdbc._
import scalikejdbc.streams._  // 1. import streams module. This provides SQL#iterator & DB#readOnlyStream.
import java.util.concurrent._

case class Company(id: Long, name: String, countryId: Option[Long], country: Option[Country] = None)

object Company extends SQLSyntaxSupport[Company] {

  // 2. Make a StreamReadySQL object for create publisher.
  //    The StreamReadySQL is immutable, so you can reuse it like SQL object.
  def streamBy(condition: SQLSyntax): StreamReadySQL[Company] = {
    withSQL {
      select.from(Company as m).where(condition)

// ------------
// Prepare a connection pool in advance.

// Prepare an ExecutionContext
implicit val publisherEC: ExecutionContext = ???

// 3. Get a publisher from DB (or NamedDB) object.
//    You need to give a StreamReadySQL object to readOnlyStream method.
val publisher: DatabasePublisher[Company] = DB readOnlyStream {
  Company.streamBy(sqls"id < 1000")

// 4. Give a Reactive Streams Subscriber to it.

The DatabasePublisher capsulates an ExecutionContext to inside. We also recommend using ExecutionContext which is separate from the main thread pool.

The Subscriber example is here: Reactive Streams Example - AsyncSubscriber

Adjust DBSession attributes

At the current moment, scalikejdbc-streams natively supports MySQL and PostgreSQL. When using SQL#iterator factory method normally, ScalikeJDBC automatically enables required settings to use cursor feature. If you don’t prefer the behavior, you can customize adjusting DBSession attributes instead.

val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r =>"id"))
    .withDBSessionForceAdjuster(session => {

Integrate with Akka Streams

It is easy to integrate scalikejdbc-streams with another library that supports Reactive Streams. For example, Akka Streams, you can give a ScalikeJDBC’s publisher to Akka Streams Source as follows.

implicit val system = ActorSystem("streams")
import system.dispatcher
implicit val materializer = ActorMaterializer()

Source.fromPublisher(publisher).filter( % 2 == 0).runForeach(println)

If this webpage has a typo or something wrong, Please report or fix it. How?