datasourcex-reactivestreams

Provides an API for binding Reactive Streams implementations such as Reactor, RxJava and Akka Streams to subjects and channels provided by Caplin DataSource.
This library is designed for use from both Java and Kotlin.

See also

Samples

Bind.using(dataSource) {
  to("my-service") {
    active {
      record {
        path(
            path = "/example/activeSubject",
            publisher =
                Flux.interval(Duration.ofSeconds(1)).map { aLong: Long ->
                  mapOf("Count" to aLong.toString())
                },
        ) {
          recordType = RecordType.TYPE1
        }
      }

      json {
        pattern(
            pattern = "/example/{username}/subjectPattern/{myKey}",
            configure = { objectMappings = mapOf("username" to "%u") },
            supplier = { _, parameters ->
              val username = parameters["username"]
              val myKey = parameters["myKey"]
              Flux.interval(Duration.ofSeconds(1)).map { count: Long ->
                JsonObject("$username $myKey $count")
              }
            },
        )
      }
    }
  }
}

Packages