Leon Weemen
Leon Weemen

Reputation: 372

Scala R2DBC: How to test a repository class in Akka / Pekko

Heey all,

First of all I want to warn you that my way of thinking might be wrong in the first place so please bear with me:

Ok so I'm using R2DBC in a scala eventsourcing project. I written this lovely projection class:

object IRacingSessionProjection {
  def init(system: ActorSystem[_], repository: IracingSessionRepositoryImpl): Unit = {
    def sourceProvider(sliceRange: Range): SourceProvider[Offset, EventEnvelope[IRacingSession.Event]] = {
      EventSourcedProvider.eventsBySlices[IRacingSession.Event](
        system,
        readJournalPluginId = R2dbcReadJournal.Identifier,
        "IRacingSession",
        sliceRange.min,
        sliceRange.max
      )
    }

    def projection(sliceRange: Range): Projection[EventEnvelope[IRacingSession.Event]] = {
      val minSlice = sliceRange.min
      val maxSlice = sliceRange.max
      val projectionId =
        ProjectionId("IRacingSessionProjection", s"iracingsessions-$minSlice-$maxSlice")


      R2dbcProjection.exactlyOnce(
        projectionId,
        settings = None,
        sourceProvider(sliceRange),
        handler = () =>
          new IRacingSessionProjector(
            system,
            repository))(system)
    }

    ShardedDaemonProcess(system).init(
      name = "IRacingSessionProjection",
      numberOfInstances = 1,
      behaviorFactory = { daemonContext =>
        val sliceRanges =
          EventSourcedProvider.sliceRanges(
            system,
            R2dbcReadJournal.Identifier,
            daemonContext.max(1))
        val sliceRange = sliceRanges(1)
        ProjectionBehavior(projection(sliceRange))
      },
      ShardedDaemonProcessSettings(system),
      stopMessage = Some(ProjectionBehavior.Stop))
  }
}

This projection has a repository class like this:

class IracingSessionRepositoryImpl (using ec: ExecutionContext) {

  def insert(session: R2dbcSession,
                      sessionId: String,
                      track: Track,
                      driver: Driver,
                      lapNumber: Int,
                      lapsCompleted: Int,
                      lapTimeLastLap: Option[Float]): Future[Done] = {
    session.updateOne(
      session.createStatement(
          "INSERT INTO iracing_session (" +
            "session_id, " +
            "track_name, " +
            "track_config_name, " +
            "driver_id, " +
            "driver_name, " +
            "lap_number," +
            "laps_completed " +
            ") VALUES ($1, $2, $3, $4, $5, $6)")
        .bind(0, sessionId)
        .bind(1, track.name)
        .bind(2, track.configName)
        .bind(3, driver.driverId)
        .bind(4, driver.driverName)
        .bind(5, lapNumber)
        .bind(6, lapsCompleted)
    )

    if (lapsCompleted > 0) {
      getLapTime(session, sessionId, driver, lapNumber - 1).flatMap {
        case Some(lapTime) =>
          val sectorTime = lapTimeLastLap.getOrElse(0f) - lapTime.currentLaptime
          val sectorNumber = lapTime.currentSector()
          session.updateOne(
            session.createStatement(
                "UPDATE iracing_laptime_data " +
                  "SET lap_completed = TRUE, " +
                  "sector$1 = $2 " +
                  "WHERE session_id = $1 " +
                  "AND driver_id = $2 " +
                  "AND track_name = $3 " +
                  "AND lap_number = $4 " +
                  "AND lap_completed < $5")
              .bind(0, sectorNumber)
              .bind(1, sectorTime)
              .bind(2, sessionId)
              .bind(3, driver.driverId)
              .bind(4, track.name)
              .bind(5, lapNumber - 1)
              .bind(6, lapsCompleted)
          )
        case None => Future.successful(0)
      }
    }
    Future.successful(Done)
  }


  def insertSectorTime(session: R2dbcSession, sessionId: String, driver: Driver, lapNumber: Int, sectorNumber: Int, sectorTime: Any): Future[Int] = {
    session.updateOne(
      session.createStatement(
        "UPDATE iracing_laptime_data " +
          "SET sector$1 = $2 " +
          "WHERE session_id = $3 " +
          "AND driver_id = $4 " +
          "AND lap_number = $5"
      )
      .bind(0, sectorNumber)
      .bind(1, sectorTime)
      .bind(2, sessionId)
      .bind(3, driver.driverId)
      .bind(4, lapNumber)
    )
  }

  def getLapTime(session: R2dbcSession,
                       sessionId: String,
                       driver: Driver,
                       lapNumber: Int): Future[Option[LapTime]] = session.selectOne(
      session.createStatement(
          "SELECT sector1, sector2, sector3, sector4, sector5, sector6 " +
            "FROM iracing_laptime_data " +
            "WHERE session_id = $1 " +
            "AND driver_id = $2 " +
            "AND lap_number = $3 ")
        .bind(0, sessionId)
        .bind(1, driver.driverId)
        .bind(2, lapNumber.toString)
    ) {
      row => LapTime(
        row.get("sector1", classOf[Option[scala.Float]]),
        row.get("sector2", classOf[Option[scala.Float]]),
        row.get("sector3", classOf[Option[scala.Float]]),
        row.get("sector4", classOf[Option[scala.Float]]),
        row.get("sector5", classOf[Option[scala.Float]]),
        row.get("sector6", classOf[Option[scala.Float]])
      )}
}

Which is just matching the docs. I works so I'm happy.

However I want to test my projections and especially what my repository is doing needs to be rights. I don't want to end up in a trial and error situation the whole so I started looking into testing.

My initial approach (which might be wrong) was instead of using postgres in dev I want to use a in memory database which should be fairly easy to setup so I wrote a test something like this:

class IRacingSessionProjectorSpec extends AnyWordSpecLike {

  given system: ActorSystem[?] = ActorSystem(Behaviors.empty, "iracingsetups-test-svc")
  given ExecutionContext = system.executionContext

  def createProjection(events: List[EventEnvelope[IRacingSession.Event]],
                       repository: IracingSessionRepositoryImpl): Projection[EventEnvelope[IRacingSession.Event]]  = {
    val config =
      ConfigFactory
        .parseString( """
      pekko {
        loglevel = "WARNING"
        persistence {
          r2dbc {
            protocol = "mem"
            database = "iracingsetups"
          }
        }
      }
      """.stripMargin)

    R2dbcProjection.exactlyOnce(
      ProjectionId("IRacingSessionProjection", s"iracingsessions-1-1"),
      settings = Some(R2dbcProjectionSettings(config)),
      TestSourceProvider[Offset, EventEnvelope[IRacingSession.Event]](
        Source(events),
        extractOffset = env => env.offset),
      handler = () =>
        new IRacingSessionProjector(system, repository))(system)
  }




  "The iRacingSetupsSession" should {
    "can create a new session in the database" in {
      val iracingEvents = List[EventEnvelope[IRacingSession.Event]](
        EventEnvelope(Offset.sequence(1), IRacingSession.NewRacingSessionHasBeenMade("iracing-session-01", "Spa", "Leon")),
      )
      val repository = IracingSessionRepositoryImpl()
      val projection = createProjection(iracingEvents, repository)
      val testKit = ProjectionTestKit(system)
        testKit.run(projection) {
          repository.get("iracing-session-01").map { session =>
            session.trackName shouldBe "Spa"
            session.driverName shouldBe "Leon"
          }
        }
      }
  }
}

But there is now one tiny annoying problem. The repository methods gets the Db Connection as the first argument. Now in the test I don't have that DB connection and most likely I can construct it if I'm going to deep dive in some Pekko internal classes but this feels wrong from the start.

So that leaves me to this question: What is the best way to approach this problem at all. Does my approach makes sense and did I just forget something or does my problem just require a different approach?

Many thanks in advance!

Upvotes: 0

Views: 33

Answers (0)

Related Questions