Skip to main content

Arrows, Monads and Kleisli — part 1

Marcin Rzeźnicki

Oct 28, 2015|15 min read
Arrows Monads and Kleisli part 1

Arrows are a type class used in programming to describe computations in a pure and declarative fashion. First proposed by computer scientist John Hughes as a generalization of monads, arrows provide a referentially transparent way of expressing relationships between logical steps in a computation. Unlike monads, arrows don’t limit steps to having one and only one input.

Arrow a b c represents a process that takes as input something of type b and outputs something of type c.

1case class ProductionLot(id: Option[Long],
2 status: ProductionLotStatus.Value,
3 productionStartDate: Option[Date] = None,
4 productionEndDate: Option[Date] = None,
5 workerId: Option[Long] = None)
1class ProductionLotsRepository {
2 def findExistingById(productionLotId: Long): ProductionLot =
3 findById(productionLotId).getOrElse(sys.error(s"ProductionLot $productionLotId not found"))
4
5 def findById(productionLotId: Long): Option[ProductionLot] = ???
6
7 def save(productionLot: ProductionLot): Long = ???
8}
1class ProductionLotsService(productionLotsRepository: ProductionLotsRepository) {
2 def startProductionOf(productionLotId: Long, productionStartDate: Date, workerId: Long): Unit = {
3 val productionLot = productionLotsRepository.findExistingById(productionLotId)
4
5 verifyWorkerCanBeAssignedToProductionLot(productionLot, workerId)
6
7 val productionLotWithProductionStartData = productionLot.copy(
8 productionStartDate = Some(productionStartDate),
9 workerId = Some(workerId),
10 status = ProductionLotStatus.InProduction
11 )
12
13 productionLotsRepository.save(productionLotWithProductionStartData)
14 }
15
16 def changeAssignedWorker(productionLotId: Long, newWorkerId: Long): Unit = {
17 val productionLot = productionLotsRepository.findExistingById(productionLotId)
18
19 verifyWorkerChange(productionLot, newWorkerId)
20
21 val updatedProductionLot = productionLot.copy(workerId = Some(newWorkerId))
22
23 productionLotsRepository.save(updatedProductionLot)
24 }
25
26 def revokeProductionLotTo(productionLotId: Long,
27 productionLotStatus: ProductionLotStatus.Value): Unit = {
28 val productionLot = productionLotsRepository.findExistingById(productionLotId)
29
30 val revokedProductionLot = productionLot.copy(
31 status = productionLotStatus,
32 workerId = None,
33 productionStartDate = None,
34 productionEndDate = None
35 )
36
37 productionLotsRepository.save(revokedProductionLot)
38 }
39
40 private def verifyWorkerChange(productionLot: ProductionLot, newWorkerId: Long): Unit = {
41 require(productionLot.workerId.isDefined && productionLot.workerId.get != newWorkerId,
42 s"Production lot worker expected to be defined and different than $newWorkerId")
43 }
44
45 private def verifyWorkerCanBeAssignedToProductionLot(productionLot: ProductionLot, workerId: Long): Unit = {
46 val productionLotId = productionLot.id.get
47 val productionLotHasNoWorkerAssigned = productionLot.workerId.isEmpty
48
49 require(productionLotHasNoWorkerAssigned, s"Production lot: $productionLotId has worker already assigned")
50 }
51}
1def startProductionOf(productionLotId: Long, productionStartDate: Date, workerId: Long): Unit = {
2 val verify: ProductionLot => ProductionLot = { productionLot =>
3 verifyWorkerCanBeAssignedToProductionLot(productionLot, workerId)
4 productionLot
5 }
6
7 val copy: ProductionLot => ProductionLot = _.copy(
8 productionStartDate = Some(productionStartDate),
9 workerId = Some(workerId),
10 status = ProductionLotStatus.InProduction
11 )
12
13 val startProductionOfF = productionLotsRepository.findExistingById _ andThen
14 verify andThen
15 copy andThen
16 productionLotsRepository.save
17
18 startProductionOfF(productionLotId)
19 }
20
21 def changeAssignedWorker(productionLotId: Long, newWorkerId: Long): Unit = {
22 val verify: ProductionLot => ProductionLot = { productionLot =>
23 verifyWorkerChange(productionLot, newWorkerId)
24 productionLot
25 }
26
27 val copy: ProductionLot => ProductionLot = _.copy(workerId = Some(newWorkerId))
28
29 val changedAssignedWorkerF = productionLotsRepository.findExistingById _ andThen
30 verify andThen
31 copy andThen
32 productionLotsRepository.save
33
34 changedAssignedWorkerF(productionLotId)
35 }

A type constructor arr that takes functions from any type s to another t, and lifts those functions into an arrow A between the two types.

A piping method first that takes an arrow between two types and converts it into an arrow between tuples. The first elements in the tuples represent the portion of the input and output that is altered, while the second elements are a third type u describing an unaltered portion that bypasses the computation.

A composition operator >>> that can attach a second arrow to a first as long as the first function’s output and the second’s input have matching types.

A merging operator *** that can take two arrows, possibly with different input and output types, and fuse them into one arrow between two compound types.

Image Alt

1trait Arrow[=>:[_, _]] {
2 def id[A]: A =>: A
3 def arr[A, B](f: A => B): A =>: B
4
5 def compose[A, B, C](fbc: B =>: C, fab: A =>: B): A =>: C
6
7 def first[A, B, C](f: A =>: B): (A, C) =>: (B, C)
8 def second[A, B, C](f: A =>: B): (C, A) =>: (C, B)
9
10 def merge[A, B, C, D](f: A =>: B, g: C =>: D): (A, C) =>: (B, D) = compose(first(f), second(g))
11 def split[A, B, C](fab: A =>: B, fac: A =>: C): A =>: (B, C) = compose(merge(fab, fac), arr((x: A) => (x, x)))
12}
1final class ArrowOps[=>:[_, _], A, B](val self: A =>: B)(implicit val arr: Arrow[=>:]) {
2 def >>>[C](fbc: B =>: C): A =>: C = arr.compose(fbc, self)
3
4 def <<<[C](fca: C =>: A): C =>: B = arr.compose(self, fca)
5
6 def ***[C, D](g: C =>: D): (A, C) =>: (B, D) = arr.merge(self, g)
7
8 def &&&[C](fac: A =>: C): A =>: (B, C) = arr.split(self, fac)
9}
10
11object Arrow extends ArrowInstances {
12 implicit def ToArrowOps[F[_, _], A, B](v: F[A, B])(implicit arr: Arrow[F]): ArrowOps[F, A, B] = new ArrowOps(v)
13}
1trait ArrowInstances {
2 // function is arrow
3 implicit object FunctionArrow extends Arrow[Function1] {
4 override def id[A]: A => A = identity[A]
5
6 override def arr[A, B](f: (A) => B): A => B = f
7
8 override def compose[A, B, C](fbc: B => C, fab: A => B): A => C = fbc compose fab
9
10 override def first[A, B, C](f: A => B): ((A, C)) => (B, C) = prod => (f(prod._1), prod._2)
11
12 override def second[A, B, C](f: A => B): ((C, A)) => (C, B) = prod => (prod._1, f(prod._2))
13
14 override def merge[A, B, C, D](f: (A) => B, g: (C) => D): ((A, C)) => (B, D) = { case (x, y) => (f(x), g(y)) }
15 }
16}
Arrows Monads and Kleisli part 1 business logic flow

1 private def productionLotArrow[Env](verify: (ProductionLot, Env) => Unit, copy: (ProductionLot, Env) => ProductionLot): (Long, Env) => Long =
2 Function.untupled(
3 (productionLotsRepository.findExistingById _ *** identity[Env])
4 >>> (verify.tupled
5 &&& (copy.tupled >>> productionLotsRepository.save))
6 >>> (_._2)
7 )
1 private val changeWorkerA = productionLotArrow[Long] (verifyWorkerChange, (pl, id) => pl.copy(workerId = Some(id)))
2
3 def changeAssignedWorker(productionLotId: Long, newWorkerId: Long): Unit =
4 changeWorkerA(productionLotId, newWorkerId)
5
6 private val revokeToA =
7 productionLotArrow[ProductionLotStatus.Value] (
8 (_, _) => (),
9 (pl, status) => pl.copy(
10 status = status,
11 workerId = None,
12 productionStartDate = None,
13 productionEndDate = None
14 )
15 )
16
17 def revokeProductionLotTo(productionLotId: Long,
18 productionLotStatus: ProductionLotStatus.Value): Unit =
19 revokeToA(productionLotId, productionLotStatus)
20
1 private case class StartProduction(productionStartDate: Date, workerId: Long)
2 private val startProductionA = productionLotArrow[StartProduction] (
3 (pl, env) => verifyWorkerCanBeAssignedToProductionLot(pl, env.workerId),
4 (pl, env) => pl.copy(
5 productionStartDate = Some(env.productionStartDate),
6 workerId = Some(env.workerId),
7 status = ProductionLotStatus.InProduction
8 )
9 )
10
11 def startProductionOf(productionLotId: Long, productionStartDate: Date, workerId: Long): Unit =
12 startProductionA(productionLotId, StartProduction(productionStartDate, workerId))
1 private def productionLotArrow[Env](verify: (ProductionLot, Env) => Unit,
2 copy: (ProductionLot, Env) => ProductionLot): (Long, Env) => Long = {
3 val verifyProductionLotNotDoneF: ((ProductionLot, Env)) => Unit = { case (pl, _) => verifyProductionLotNotDone(pl) }
4
5 Function.untupled(
6 (productionLotsRepository.findExistingById _ *** identity[Env])
7 >>> ((verify.tupled &&& verifyProductionLotNotDoneF)
8 &&& (copy.tupled >>> productionLotsRepository.save))
9 >>> (_._2)
10 )
11 }
12
13 private def verifyProductionLotNotDone(productionLot: ProductionLot): Unit =
14 require(productionLot.status != ProductionLotStatus.Done, "Attempt to operate on finished production lot")
15

“Do or do not, there is no try”.

1sealed abstract class Error(val message: String)
2
3case class ProductionLotNotFoundError(id: Long) extends Error(s"ProductionLot $id does not exist")
4case class ProductionLotClosedError(pl: ProductionLot) extends Error(s"Attempt to operate on finished ProductionLot $pl")
5//...
1class ProductionLotsService(productionLotsRepository: ProductionLotsRepository) {
2//...
3 private def verifyProductionLotNotDone(productionLot: ProductionLot): Either[Error, ProductionLot] =
4 Either.cond(productionLot.status != ProductionLotStatus.Done, productionLot, ProductionLotClosedError(productionLot))
5
6 private def verifyWorkerChange(productionLot: ProductionLot, newWorkerId: Long): Either[Error, ProductionLot] =
7 productionLot.workerId.fold[Either[Error, ProductionLot]](
8 Left(NoWorkerError(productionLot)))(
9 workerId => Either.cond(workerId != newWorkerId, productionLot, SameWorkerError(productionLot)))
10
11 private def verifyWorkerCanBeAssignedToProductionLot(productionLot: ProductionLot, workerId: Long): Either[Error, ProductionLot] =
12 Either.cond(productionLot.workerId.isEmpty, productionLot, WorkerAlreadyAssignedError(productionLot))
13}
14
15class ProductionLotsRepository {
16 def findExistingById(productionLotId: Long): Either[Error, ProductionLot] =
17 findById(productionLotId).toRight(ProductionLotNotFoundError(productionLotId))
18
19 def findById(productionLotId: Long): Option[ProductionLot] = ???
20
21 def save(productionLot: ProductionLot): Either[Error, Long] = ???
22}

The return operation takes a value from a plain type and puts it into a monadic container using the constructor, creating a monadic value. The bind operation takes as its arguments a monadic value and a function from a plain type to a monadic value, and returns a new monadic value.

1trait Monad[M[_]] {
2 def point[A](a: => A): M[A]
3 def bind[A, B](ma: M[A])(f: A => M[B]): M[B]
4 def fmap[A, B](ma: M[A])(f: A => B): M[B]
5}
1final class MonadOps[M[_], A](val self: M[A])(implicit val monad: Monad[M]) {
2 def >>=[B](f: A => M[B]) = monad.bind(self)(f)
3}
4
5object Monad extends MonadInstances {
6 implicit def ToMonadOps[M[_], A](v: M[A])(implicit m: Monad[M]): MonadOps[M, A] = new MonadOps(v)
7}
1trait MonadInstances {
2 implicit def eitherMonad[L] = new Monad[({ type λ[β] = Either[L, β] })#λ] {
3 override def point[A](a: => A): Either[L, A] = Right(a)
4
5 override def bind[A, B](ma: Either[L, A])(f: (A) => Either[L, B]): Either[L, B] = ma.right.flatMap(f)
6
7 override def fmap[A, B](ma: Either[L, A])(f: (A) => B): Either[L, B] = ma.right.map(f)
8 }
9}
1final case class Kleisli[M[_], A, B](run: A => M[B]) {
2 import Monad._
3 import Kleisli._
4
5 def apply(a: A) = run(a)
6
7 def >=>[C](k: Kleisli[M, B, C])(implicit m: Monad[M]): Kleisli[M, A, C] = Kleisli((a: A) => this(a) >>= k.run)
8 def andThen[C](k: Kleisli[M, B, C])(implicit m: Monad[M]): Kleisli[M, A, C] = this >=> k
9
10 def >==>[C](k: B => M[C])(implicit m: Monad[M]): Kleisli[M, A, C] = this >=> Kleisli(k)
11 def andThenK[C](k: B => M[C])(implicit m: Monad[M]): Kleisli[M, A, C] = this >==> k
12
13 def <=<[C](k: Kleisli[M, C, A])(implicit m: Monad[M]): Kleisli[M, C, B] = k >=> this
14 def compose[C](k: Kleisli[M, C, A])(implicit m: Monad[M]): Kleisli[M, C, B] = k >=> this
15
16 def <==<[C](k: C => M[A])(implicit m: Monad[M]): Kleisli[M, C, B] = Kleisli(k) >=> this
17 def composeK[C](k: C => M[A])(implicit m: Monad[M]): Kleisli[M, C, B] = this <==< k
18
19 def map[C](f: B ⇒ C)(implicit m: Monad[M]): Kleisli[M, A, C] = Kleisli((a: A) => m.fmap(this(a))(f))
20}
21
22object Kleisli extends KleisliInstances {
23 implicit def kleisliFn[M[_], A, B](k: Kleisli[M, A, B]): (A) ⇒ M[B] = k.run
24}
1trait KleisliInstances {
2 //kleisli (a => m b) is arrow
3 abstract class KleisliArrow[M[_]] extends Arrow[({ type λ[α, β] = Kleisli[M, α, β] })#λ] {
4 import Kleisli._
5 import Monad._
6
7 implicit def M: Monad[M]
8
9 override def id[A]: Kleisli[M, A, A] = Kleisli(a => M.point(a))
10
11 override def arr[A, B](f: (A) => B): Kleisli[M, A, B] = Kleisli(a => M.point(f(a)))
12
13 override def first[A, B, C](f: Kleisli[M, A, B]): Kleisli[M, (A, C), (B, C)] = Kleisli {
14 case (a, c) => f(a) >>= ((b: B) => M.point((b, c)))
15 }
16
17 override def second[A, B, C](f: Kleisli[M, A, B]): Kleisli[M, (C, A), (C, B)] = Kleisli {
18 case (c, a) => f(a) >>= ((b: B) => M.point((c, b)))
19 }
20
21 override def compose[A, B, C](fbc: Kleisli[M, B, C], fab: Kleisli[M, A, B]): Kleisli[M, A, C] = fab >=> fbc
22 }
23
24 implicit def kleisliArrow[M[_]](implicit m: Monad[M]) = new KleisliArrow[M] {
25 override implicit def M: Monad[M] = m
26 }
27}
1 implicit def ToArrowOpsFromKleisliLike[G[_], F[G[_], _, _], A, B](v: F[G, A, B])(implicit arr: Arrow[({ type λ[α, β] = F[G, α, β] })#λ]) =
2 new ArrowOps[({ type λ[α, β] = F[G, α, β] })#λ, A, B](v)
Arrows Monads and Kleisli part 1 error handling

1 type E[R] = Either[Error, R]
2
3 private def productionLotArrow[Env](verify: (ProductionLot, Env) => E[ProductionLot],
4 copy: (ProductionLot, Env) => ProductionLot): Env => Long => E[Long] = {
5 val verifyProductionLotNotDoneF: (ProductionLot) => E[ProductionLot] = verifyProductionLotNotDone
6
7 (env: Env) => (
8 Kleisli[E, Long, ProductionLot]{ productionLotsRepository.findExistingById }
9 >>> Kleisli { verify(_, env) }
10 >>> Kleisli { verifyProductionLotNotDoneF }
11 ).map(copy(_, env)) >==> productionLotsRepository.save _
12 }
1no type parameters for method apply: (run: A => M[B])org.virtuslab.blog.kleisli.Kleisli[M,A,B] exist so that it can be applied to arguments (org.virtuslab.blog.kleisli.ProductionLot =>
2Either[org.virtuslab.blog.kleisli.Error,org.virtuslab.blog.kleisli.ProductionLot])
3[error] — because —
4[error] argument expression’s type is not compatible with formal parameter type;
5[error] found : org.virtuslab.blog.kleisli.ProductionLot =>
6Either[org.virtuslab.blog.kleisli.Error,org.virtuslab.blog.kleisli.ProductionLot]
7[error] required: ?A => ?M[?B]
8[error] Kleisli { verify(_, env) } >>>
1Kleisli[({ type E[R] = Either[Error, R] })#E, Long, ProductionLot] { productionLotsRepository.findExistingById }
2
1value >>> is not a member of
2org.virtuslab.blog.kleisli.Kleisli[[R]scala.util.Either[org.virtuslab.blog.kleisli.Error,R],Long,org.virtuslab.blog. kleisli.ProductionLot]
1ToArrowOpsFromKleisliLike is not a valid implicit value for (=>
2org.virtuslab.blog.kleisli.Kleisli[[R]scala.util.Either[org.virtuslab.blog.kleisli.Error,R],Long,org.virtuslab.blog. kleisli.ProductionLot]) => ?{def >>>: ?} because:
3[info] no type parameters for method ToArrowOpsFromKleisliLike: (v: F[G,A,B])(implicit arr: org.virtuslab.blog. kleisli.Arrow[[β, γ]F[G,β,γ]])org.virtuslab.blog.kleisli.ArrowOps[[β, γ]F[G,β,γ],A,B] exist so that it can be applied to arguments
4(org.virtuslab.blog.kleisli.Kleisli[[R]scala.util.Either[org.virtuslab.blog.kleisli.Error,R],Long,org.virtuslab.blog. kleisli.ProductionLot])
5[info] — because —
6[info] argument expression’s type is not compatible with formal parameter type;
7[info] found :
8org.virtuslab.blog.kleisli.Kleisli[[R]scala.util.Either[org.virtuslab.blog.kleisli.Error,R],Long,org.virtuslab.blog. kleisli.ProductionLot]
9[info] required: ?F[?G, ?A, ?B]

Type lambdas are cool and all, but not a single line of the compiler was ever written with them in mind. They’re just not going to work right: the relevant code is not robust.

1 private def productionLotArrow[Env](verify: (ProductionLot, Env) => Either[Error, ProductionLot],
2 copy: (ProductionLot, Env) => ProductionLot): Env => Long => Either[Error, Long] = {
3 type Track[T] = Either[Error, T]
4 def track[A, B](f: A => Track[B]) = Kleisli[Track, A, B](f)
5
6 val getFromDb = track { productionLotsRepository.findExistingById }
7 val validate = (env: Env) => track { verify(_: ProductionLot, env) } >>> track { verifyProductionLotNotDone }
8 val save = track { productionLotsRepository.save }
9
10 (env: Env) => (
11 getFromDb
12 >>> validate(env)
13 ).map(copy(_, env)) >>> save
14 }

Subscribe to our newsletter and never miss an article