AI-powered applications are all the buzz now and we too want to surf this wave of hype. What better way is there than to build an AI app from top to bottom in Scala - a full product, not just the app itself but the whole thing - including an infrastructural blueprint necessary to deploy it to the live environment.
What would be a suitable objective then? Something that would be useful and interesting - a self-hosted AI-based open source documentation assistant maybe? If we could host it on the cheap somewhere it would also be nice.
The architecture of an AI app in Scala 3 and Besom
For the AI app itself we're going to use tapir and sttp (with an openAI extension) for http, magnum for database access and Loom to make things easier.
Why do we need database access you might ask? Ah, that's because we're going to do machine learning the best way possible - via Postgres! PostgresML is a Postgres distribution that brings ML functionality to the SQL level where we can easily work with it via JDBC without dealing with the whole Python / C mess underneath.
The AI app will utilize Retrieval Augmented Generation pattern and leverage PostgresML built-in vector database functionality to store information from our docs as embeddings. This will allow us to find things our users ask about in our db using cosine similarity and then use the best semantically matching text as part of our crafted prompt to GPT 3.5 Turbo.
Since we're going to run containers and since we want to emulate the best practices of the startup world (mostly the overengineering part!) we're going to deploy a Kubernetes cluster. To avoid running over our non-existent budget, we'll avoid pricy hyperscalers and go with the cheapest provider that also works nicely with Pulumi - Hetzner.
Hetzner's offering does not offer a managed Kubernetes, but that shouldn't stop us! We could utilise the awesome power of Besom and Pulumi to just deploy a full-blown Kubernetes distribution like Typhoon. But we have to think like a startup and cut corners a bit so we'll just go with Rancher's K3S.
Here's a chart of how the infrastructure of the whole system will look like - a 3-node k3s cluster with a built-in Traefik ingress and all the necessary Kubernetes entities.
Let’s build the AI App in Scala 3
Our AI app itself will be quite simple - a plain http server serving pure html from index endpoint with a single POST /inquire endpoint to handle queries from the user. To be able to answer our users' questions about Besom we will have to be able to find relevant docs and to do that we need to create and populate embeddings table in PostgresML.
As this is a PoC (we're doing a startup, remember?) we will just put our markdown files into a directory that we will include on the classpath using scala-cli's directive: //> using resourceDir content. Then a small migrations script will create tables and populate them. Interesting bit here is how easy it is to create embeddings using PostgresML:
scala
import com.augustnagro.magnum.*
object Migrations:
// ... migrations skipped for brevity
private def createEmbeddingsTable(using DbCon): Unit =
sql"""CREATE TABLE IF NOT EXISTS docs_embeddings AS
SELECT id, url, content, pgml.embed('intfloat/e5-small', 'passage: ' || content)::vector(384) AS embedding
FROM docs""".update.run()
This single query uses data stored in docs table to create a parallel table enriched with embeddings using PostgresML's built-in function pgml.embed with a Microsoft's E5 embedding model. The query side looks like this:
scala
import com.augustnagro.magnum.*
class Db(private val ds: javax.sql.DataSource):
def queryEmbeddings(query: String): Option[Db.QueryResult] =
connect(ds) {
sql"""WITH request AS (
SELECT pgml.embed(
'intfloat/e5-small',
'query: ' || $query
)::vector(384) AS query_embedding
)
SELECT
id,
url,
content,
1 - (
embedding::vector <=> (SELECT query_embedding FROM request)
) AS cosine_similarity
FROM docs_embeddings
ORDER BY cosine_similarity DESC
LIMIT 1""".query[Db.QueryResult].run().headOption
}
object Db:
case class QueryResult(
id: Int,
url: String,
content: String,
similarity: Double
)
This time we use a common table expression (WITH) to create an embedding of the query from our user and then in the main query we compare the resulting vector with embeddings stored in our docs_embeddings table. After ordering by result of that comparison in descending order we are able to fetch the doc that matched the query the most.
The next step is to pass the doc along with it's url and the original query of the user to a large language model that will compose a response. We do this using the sttp-openai module to call GPT 3.5 Turbo with our custom prompt via OpenAI API. GPT 3.5 Turbo is both cheap and relatively fast so should be more than enough for our purposes for now:
scala
import sttp.openai.OpenAISyncClient
import sttp.openai.requests.completions.chat.ChatRequestResponseData.ChatResponse
import sttp.openai.requests.completions.chat.ChatRequestBody.{ChatBody, ChatCompletionModel}
import sttp.openai.requests.completions.chat.message.{Message, Content}
object AI:
def askDocs(question: String)(using conf: Config, db: Db): String =
val openAI = OpenAISyncClient(conf.openAIApiKey)
val contentFromDb = db.queryEmbeddings(question)
val prompt = contentFromDb match
case None =>
s"""You are a programming assistant. User has asked this question:
| $question
|We weren't able to find anything about that in our database.
|Please respond politely and explain that you have no information about this subject.
|""".stripMargin
case Some(result) =>
s"""You are a programming assistant. User has asked this question:
| $question
|We were able to find material regarding this topic in our database:
|
|${result.content}
|
|Please use the document above to formulate an answer for the user. You can use
|markdown with code snippets in your response. In the end of your response inform
|the user that more information can be found at this url:
|
|${result.url}
|""".stripMargin
val bodyMessages: Seq[Message] = Seq(
Message.UserMessage(
content = Content.TextContent(prompt)
)
)
val chatRequestBody: ChatBody = ChatBody(
model = ChatCompletionModel.GPT35Turbo,
messages = bodyMessages
)
Try(openAI.createChatCompletion(chatRequestBody)) match
case Failure(exception) =>
scribe.error("Failed to ask OpenAI", exception)
"Oops, something is not right!"
case Success(response) =>
response.choices.headOption match
case None =>
scribe.error("OpenAI response is empty")
"Oops, something is not right!"
case Some(chatResponse) =>
chatResponse.message.content
The last thing to do is to convert AI model's response to HTML as it will most surely respond in markdown (we don't even have to specify that in our prompt). We will use Flexmark library to do this:
scala
import com.vladsch.flexmark.html.HtmlRenderer
import com.vladsch.flexmark.parser.Parser
import com.vladsch.flexmark.util.data.MutableDataSet
object MD:
private val options = MutableDataSet()
private val parser = Parser.builder(options).build()
private val renderer = HtmlRenderer.builder(options).build()
def render(markdown: String): String =
val document = parser.parse(markdown)
renderer.render(document)
Finally http layer with tapir is just two endpoints and a simple, plain http server that works well with Loom's virtual threads:
scala
import sttp.tapir.*
import sttp.tapir.files.*
import sttp.tapir.server.jdkhttp.*
import java.util.concurrent.Executors
object Http:
private val index =
endpoint.get
.out(htmlBodyUtf8)
.handle(_ => Right(Templates.index()))
private def inquire(using Config, Db) =
endpoint.post
.in("inquire")
.in(formBody[Map[String, String]])
.out(htmlBodyUtf8)
.handle { form =>
form.get("q").flatMap { s => if s.isBlank() then None else Some(s) } match
case Some(question) =>
val response = AI.askDocs(question)
val rendered = MD.render(response)
Right(Templates.response(rendered))
case None => Right(Templates.response("Have nothing to ask?"))
}
def startServer()(using cfg: Config, db: Db) =
JdkHttpServer()
.executor(Executors.newVirtualThreadPerTaskExecutor())
.addEndpoint(staticResourcesGetServerEndpoint("static")(classOf[App].getClassLoader, "/"))
.addEndpoint(inquire)
.addEndpoint(index)
.port(cfg.port)
.start()
We can skip the Templates object as it only hosts two functions that return Strings of HTML. We also expose an endpoint to server static resources from the classpath as we have a nifty, animated loading indicator to show to our users while AI is crafting responses to their queries.
Publishing the AI app container
To get our AI app ready for deployment we have to publish it as a docker container available for download from a public docker container registry. Fortunately for us Github offers a container registry for its users. We have to start with logging into ghcr.io with our local docker to be able to push there:
bash
docker login -u lbialy -p $(gh auth token) ghcr.io
We can use Github's gh command line tool to fetch a token for us to streamline the process or just create a general token with write permissions to Github Container Registry in the web UI.
The next step is to build our AI app with scala-cli:
bash
scala-cli package app -o app.main -f --assembly
Then we have to build an image so let's start with a Dockerfile:
console
FROM ghcr.io/graalvm/jdk-community:21
COPY app.main /app/main
ENTRYPOINT java -jar /app/main
and then let's use it to build the image and push it to ghcr.io:
bash
docker buildx build . -f Dockerfile --platform linux/amd64 -t ghcr.io/lbialy/askme:0.1.0
docker push ghcr.io/lbialy/askme:0.1.0
Should you want to follow the same steps, remember to use your own Github username here.
Having built the AI app we still need to run it somewhere and that place has also allow us to deploy a PostgresML instance. Let's create our own platform!
The AI app’s infrastructure with Besom
First things first - we need servers. To create them on Hetzner Cloud we need to provide our public ssh key and few additional configuration parameters to create three servers in three different locations:
scala
import besom.*
import besom.api.hcloud
import hcloud.inputs.*
@main def main = Pulumi.run {
val locations = Vector("fsn1", "nbg1", "hel1")
val sshPublicKey = config.requireString("ssh_public_key_path").map(os.Path(_)).map(os.read(_))
val sshPrivateKey = config.requireString("ssh_private_key_path").map(os.Path(_)).map(os.read(_))
val hcloudProvider = hcloud.Provider(
"hcloud",
hcloud.ProviderArgs(
token = config.requireString("hcloud_token")
)
)
val sshKey = hcloud.SshKey(
"ssh-key",
hcloud.SshKeyArgs(
name = "ssh-key",
publicKey = sshPublicKey
),
opts(provider = hcloudProvider)
)
val serverPool = (1 to 1).map { i =>
hcloud
.Server(
s"k3s-server-$i",
hcloud.ServerArgs(
serverType = "cx21",
name = s"k3s-server-$i",
image = "ubuntu-22.04",
location = locations(i % locations.size),
sshKeys = List(sshKey.name),
publicNets = List(
ServerPublicNetArgs(
ipv4Enabled = true,
ipv6Enabled = false
)
)
),
opts(provider = hcloudProvider)
)
}.toVector
val spawnNodes = serverPool.parSequence
val nodeIps = serverPool.map(_.ipv4Address).parSequence
Stack(spawnNodes).exports(
nodeIps = nodeIps
)
}
We load the path to our public and private (we will need it later) ssh keys from the Pulumi configuration and then create a SshKey resource in Hetzner. Afterwards we create 3 servers running Ubuntu 22.04 of cx21 type with 2 vCPUs and 4 GB of RAM.
Finally we make sure they get created in parallel by turning Vector[Output[hcloud.Server]] into Output[Vector[hcloud.Server]] using the parSequence extension and putting resulting Output into Stack's dependencies. We also export ip addresses of all three nodes just to be able to see them as results of our pulumi up.
With this done we only need to provide the following three configuration properties.
- ssh_public_key_path
- ssh_private_key_path
- hcloud_token
using pulumi config set while remembering to use --secret for all secrets and then we can apply the stack and connect to any of our nodes via ssh because we've already set them up with our public ssh key.
Deploying Kubernetes for our AI app
The next step is to deploy K3S onto our servers to get a unified orchestration engine over all our machines. To do this we will leverage the Pulumi command package that allows us to run arbitrary commands on both local and remote machines.
The procedure itself is quite simple, but we have to translate each step where we run remote commands to Pulumi's CRUD-like resource model. The actions we have to complete are:
- Initialize a cluster leader node that will provide us with a join token that we have to use on follower node startup to form a cluster
- Initialize follower nodes with the aforementioned join token
- configure all the nodes with our Github Container Registry token that will allow us to pull images of our AI app published to ghcr.io
- Fetch local kubeconfig file from the leader node and apply some small tweaks to it so that we can use it to execute kubectl commands remotely
The code to do that will look like this:
scala
// on top
import besom.api.command.*
// inside of Pulumi.run function!
val clusterName = "askme-dev"
val ghcrToken = config.requireString("github_docker_token")
val authFileContents =
p"""configs:
| ghcr.io:
| auth:
| username: lbialy
| password: $ghcrToken""".stripMargin
val echoFileCommand =
p"""mkdir -p /etc/rancher/k3s/ && cat << EOF > /etc/rancher/k3s/registries.yaml
|$authFileContents
|EOF""".stripMargin
case class K3S(kubeconfig: Output[String], token: Output[String], nodeIps: Output[Vector[String]])
val k3s = serverPool.parSequence.flatMap { servers =>
// split servers into leader and followers group
val leader = servers.head
val followers = servers.tail
val leaderConn = remote.inputs.ConnectionArgs(
host = leader.ipv4Address,
user = "root",
privateKey = privateKey
)
val k3sVersion = "v1.29.1+k3s2"
val initializeK3sLeader = remote.Command(
"start-k3s-leader",
remote.CommandArgs(
connection = leaderConn,
create = s"curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=$k3sVersion sh -s - --flannel-backend=wireguard-native",
delete = "sh /usr/local/bin/k3s-uninstall.sh"
)
)
val token =
remote
.Command(
"get-leader-token",
remote
.CommandArgs(
connection = leaderConn,
create = "cat /var/lib/rancher/k3s/server/node-token"
),
opts(dependsOn = initializeK3sLeader)
)
.stdout
val insertGhcrToken =
remote.Command(
"insert-ghcr-token-leader",
remote.CommandArgs(
connection = leaderConn,
create = echoFileCommand
),
opts(dependsOn = initializeK3sLeader)
)
val restartK3sLeader =
remote.Command(
"restart-k3s-leader",
remote.CommandArgs(
connection = leaderConn,
create = "sudo systemctl force-reload k3s"
),
opts(dependsOn = insertGhcrToken)
)
val kubeconfig =
remote
.Command(
"get-kubeconfig",
remote.CommandArgs(
connection = leaderConn,
create = "cat /etc/rancher/k3s/k3s.yaml"
),
opts(dependsOn = initializeK3sLeader)
)
.stdout
val initializeFollowers = followers.zipWithIndex.map { case (followerServer, idx) =>
val followerIdx = idx + 1
val followerConnection = remote.inputs.ConnectionArgs(
host = followerServer.ipv4Address,
user = "root",
privateKey = privateKey
)
val installOnFollower = remote.Command(
s"start-k3s-follower-${followerIdx}",
remote.CommandArgs(
connection = followerConnection,
create =
p"curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=$k3sVersion K3S_URL=https://${leader.ipv4Address}:6443 K3S_TOKEN=${token} sh -s -"
),
opts(dependsOn = restartK3sLeader)
)
val insertGhcrToken = remote.Command(
s"insert-ghcr-token-${followerIdx}",
remote.CommandArgs(
connection = followerConnection,
create = echoFileCommand
),
opts(dependsOn = installOnFollower)
)
val restartK3sFollower = remote.Command(
s"restart-k3s-follower-${followerIdx}",
remote.CommandArgs(
connection = followerConnection,
create = "sudo systemctl force-reload k3s-agent"
),
opts(dependsOn = insertGhcrToken)
)
restartK3sFollower
}
val ipAddresses = servers.map(_.ipv4Address).parSequence
val adjustedKubeconfig =
for
_ <- restartK3sLeader
config <- kubeconfig
leaderIp <- serverPool.head.ipv4Address
yield config.replace("default", clusterName).replace("127.0.0.1", leaderIp)
initializeFollowers.parSequence.map(_ => K3S(adjustedKubeconfig, token, ipAddresses))
}
The interesting bit here is that given our commands are often unrelated on data level (outputs of previous are not inputs of the next) so we have to establish order of execution manually using the dependsOn resource option.
Making of a Component
Moreover, this bit of code looks like it's reusable and we have already defined a K3S case class to keep interesting outputs of this block so maybe it's worth turning it into a reusable component. To do that we need to provide some boilerplate data structures that form the API we know from other provider packages:
scala
import besom.api.command.*
case class AuthArgs(
registry: Input[NonEmptyString],
username: Input[NonEmptyString],
password: Input[NonEmptyString]
)
case class K3SArgs private (
clusterName: Output[NonEmptyString],
servers: Vector[Output[String]],
privateKey: Output[String],
k3sVersion: Output[String],
registryAuth: Output[List[AuthArgs]]
):
def authFileContents(using Context): Output[String] =
registryAuth.flatMap { registryCreds =>
registryCreds.foldLeft(Output("configs:\n")) { case (acc, cred) =>
acc.flatMap { str =>
val block =
p""" ${cred.registry}:
| auth:
| username: ${cred.username}
| password: ${cred.password}""".stripMargin
block.map(b => str + b)
}
}
}
object K3SArgs:
def apply(
clusterName: Input[NonEmptyString],
serverIps: Vector[Input[String]],
privateKey: Input[String],
k3sVersion: Input[String],
registryAuth: Input.OneOrList[AuthArgs] = List.empty
)(using Context): K3SArgs =
new K3SArgs(
clusterName.asOutput(),
serverIps.map(_.asOutput()),
privateKey.asOutput(),
k3sVersion.asOutput(),
registryAuth.asManyOutput()
)
This is the first time we need to work with Inputs to define a funnel-like API that allows others to pass values that are raw literals like String or dynamic values pulled from configuration or other resources as, for example, Output[String]. Inputs are in fact opaque types over union types and the most basic Input[A] is equivalent to A | Output[A] allows one to pass either a value of A type or its dynamic variant wrapped in Output. The family of extension functions seen here as .asOutput() or .asManyOutput() does the safe conversion from the opaque type into a lifted form, e.g.: Input[String] (so, as we now know String | Output[String]) into Output[String].
Taking care of the plans
One interesting outlier here is that we take serverIps argument as Vector[Input[String]] which yields a Vector[Output[String]] in K3SArgs case class. Why don't we just take an Input[List[String]] or Input.OneOrList[String] instead? Well, we know beforehand that there are just two ways one can obtain a list of ip addresses - one is that the user provides it by hand and a Vector[Input[String]] will accept a Vector[String] also so we're not making it troublesome really. The other way is to take those values from properties of other resources like we're actually doing here by taking ipv4Address of hcloud.Server resource. We have to remember however that properties of resources are unknown in dry runs so if we want to avoid spoiling deployment plan shown in pulumi up we have to avoid propagating unknown status and that's exactly what happens should we use an Output resulting from this computation: serverPool.map(_.ipv4Address).parSequence.
The final reformulation of this code into a Besom component looks like this:
scala
case class K3S(kubeconfig: Output[String], leaderIp: Output[String], followerIps: Output[Vector[String]])(using ComponentBase)
extends ComponentResource derives RegistersOutputs
object K3S:
def apply(name: NonEmptyString, args: K3SArgs, resourceOpts: ComponentResourceOptions)(using Context): Output[K3S] =
component(name, "user:component:K3S", resourceOpts) {
val echoFileCommand =
p"""mkdir -p /etc/rancher/k3s/ && cat << EOF > /etc/rancher/k3s/registries.yaml
|${args.authFileContents}
|EOF""".stripMargin
val k3sVersion = args.k3sVersion
val leaderIp = args.servers.headOption match
case Some(ip) => ip
case None => Output.fail(Exception("Can't deploy K3S without servers, silly."))
val followers = if args.servers.isEmpty then Vector.empty else args.servers.tail
val leaderConn = remote.inputs.ConnectionArgs(
host = leaderIp,
user = "root",
privateKey = args.privateKey
)
val initializeK3sLeader = remote.Command(
"start-k3s-leader",
remote.CommandArgs(
connection = leaderConn,
create = p"curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=$k3sVersion sh -s - --flannel-backend=wireguard-native",
delete = "sh /usr/local/bin/k3s-uninstall.sh"
)
)
val token =
remote
.Command(
"get-leader-token",
remote
.CommandArgs(
connection = leaderConn,
create = "cat /var/lib/rancher/k3s/server/node-token"
),
opts(dependsOn = initializeK3sLeader)
)
.stdout
val insertGhcrToken =
remote.Command(
"insert-ghcr-token-leader",
remote.CommandArgs(
connection = leaderConn,
create = echoFileCommand
),
opts(dependsOn = initializeK3sLeader)
)
val restartK3sLeader =
remote.Command(
"restart-k3s-leader",
remote.CommandArgs(
connection = leaderConn,
create = "sudo systemctl force-reload k3s"
),
opts(dependsOn = insertGhcrToken)
)
val kubeconfig =
remote
.Command(
"get-kubeconfig",
remote.CommandArgs(
connection = leaderConn,
create = "cat /etc/rancher/k3s/k3s.yaml"
),
opts(dependsOn = initializeK3sLeader)
)
.stdout
val initializeFollowers = followers.zipWithIndex.map { case (followerIpOutput, idx) =>
val followerIdx = idx + 1
val followerConnection = remote.inputs.ConnectionArgs(
host = followerIpOutput,
user = "root",
privateKey = args.privateKey
)
val installOnFollower = remote.Command(
s"start-k3s-follower-$followerIdx",
remote.CommandArgs(
connection = followerConnection,
create =
p"curl -sfL https://get.k3s.io | INSTALL_K3S_VERSION=$k3sVersion K3S_URL=https://${leaderIp}:6443 K3S_TOKEN=${token} sh -s -"
),
opts(dependsOn = restartK3sLeader)
)
val insertGhcrToken = remote.Command(
s"insert-ghcr-token-$followerIdx",
remote.CommandArgs(
connection = followerConnection,
create = echoFileCommand
),
opts(dependsOn = installOnFollower)
)
val restartK3sFollower = remote.Command(
s"""restart-k3s-follower-$followerIdx""",
remote.CommandArgs(
connection = followerConnection,
create = "sudo systemctl force-reload k3s-agent"
),
opts(dependsOn = insertGhcrToken)
)
restartK3sFollower
}.parSequence
val adjustedKubeconfig =
for
_ <- restartK3sLeader
_ <- initializeFollowers
config <- kubeconfig
clusterName <- args.clusterName
ip <- leaderIp
yield config.replace("default", clusterName).replace("127.0.0.1", ip)
new K3S(adjustedKubeconfig, leaderIp, Output.sequence(followers))
}
Finally we can use our new component in the main Pulumi.run block:
scala
val clusterName = "askme-dev"
val ghcrToken = config.requireString("github_docker_token").flatMap(_.toNonEmptyOutput)
val k3s = K3S(
clusterName,
K3SArgs(
clusterName = clusterName,
servers = serverPool.map(_.ipv4Address),
privateKey = sshPrivateKey,
k3sVersion = "v1.29.2+k3s1",
registryAuth = AuthArgs("ghcr.io", "lbialy", ghcrToken)
),
ComponentResourceOptions(
deletedWith = serverPool.headOption.getOrElse(None)
)
)
This in turn looks pretty straightforward and allows us to hide a lot of internal complexity of the deployment beyond a familiar facade of a resource. We also use the deletedWith resource option here to let Pulumi know that removal of this whole layer can be skipped should the leader node be deleted too because that would mean we're destroying the whole thing and there's no point in cleaning up Kubernetes deployment if the underlying server is also getting axed.
Deploying the AI app
There are few things we have to deploy to Kubernetes for our AI app to work: a PostgresML instance, our app itself and some networking cruft like Service and Ingress to be able to route external traffic to it. Having already grokked components why don't we try and make it an encapsulated, reusable piece of infrastructure too? Let's start with the boilerplate of configuration classes:
scala
case class PostgresArgs private (
port: Output[Int],
dashboardPort: Output[Int]
)
object PostgresArgs:
def apply(port: Input[Int], dashboardPort: Input[Int])(using Context): PostgresArgs =
new PostgresArgs(port.asOutput(), dashboardPort.asOutput())
case class AppArgs private (
name: Output[NonEmptyString],
replicas: Output[Int],
containerPort: Output[Int],
servicePort: Output[Int],
host: Output[NonEmptyString],
openAIToken: Output[String],
docsBaseUrl: Output[String]
)
object AppArgs:
def apply(
name: Input[NonEmptyString],
replicas: Input[Int],
containerPort: Input[Int],
servicePort: Input[Int],
host: Input[NonEmptyString],
openAIToken: Input[String],
docsBaseUrl: Input[String]
)(using Context): AppArgs =
new AppArgs(
name.asOutput(),
replicas.asOutput(),
containerPort.asOutput(),
servicePort.asOutput(),
host.asOutput(),
openAIToken.asOutput(),
docsBaseUrl.asOutput()
)
case class AppDeploymentArgs(
postgresArgs: PostgresArgs,
appArgs: AppArgs
)
We also need a beefy import block for all the stuff were going to use:
scala
import scala.concurrent.duration.*
import besom.api.{kubernetes => k8s}
import besom.internal.CustomTimeouts
import k8s.core.v1.enums.*
import k8s.core.v1.inputs.*
import k8s.apps.v1.inputs.*
import k8s.meta.v1.inputs.*
import k8s.apps.v1.{Deployment, DeploymentArgs, StatefulSet, StatefulSetArgs}
import k8s.core.v1.{Namespace, Service, ServiceArgs}
import k8s.networking.v1.{Ingress, IngressArgs}
import k8s.networking.v1.inputs.{
IngressSpecArgs,
IngressRuleArgs,
HttpIngressRuleValueArgs,
HttpIngressPathArgs,
IngressBackendArgs,
IngressServiceBackendArgs,
ServiceBackendPortArgs
}
Now that we have this out of our way we can declare our component:
scala
case class AppDeployment(
jdbcUrl: Output[String],
appUrl: Output[String]
)(using ComponentBase)
extends ComponentResource
derives RegistersOutputs
object AppDeployment:
def apply(name: NonEmptyString, args: AppDeploymentArgs, resourceOpts: ComponentResourceOptions)(using Context): Output[AppDeployment] =
component(name, "user:component:app-deployment", resourceOpts) {
AppDeployment(???, ???)
}
and start filling it with deployment logic. We are going to need some common constants, create a Kubernetes namespace and extract the values out of our component arguments:
scala
val labels = Map("app" -> name)
val dbLabels = Map("db" -> name)
val appNamespace = Namespace(name)
val openAIToken = args.appArgs.openAIToken
val postgresPort = args.postgresArgs.port
val dashboardPort = args.postgresArgs.dashboardPort
val containerPort = args.appArgs.containerPort
val servicePort = args.appArgs.servicePort
val ingressHost = args.appArgs.host
val docsBaseUrl = args.appArgs.docsBaseUrl
First important thing to do is to deploy the PostgresML instance and to do it nicely with persistent volumes taken care for us is to use a StatefulSet:
scala
val postgresmlStatefulSet = k8s.apps.v1.StatefulSet(
"postgresml",
k8s.apps.v1.StatefulSetArgs(
metadata = ObjectMetaArgs(
name = "postgresml",
namespace = appNamespace.metadata.name,
labels = dbLabels
),
spec = StatefulSetSpecArgs(
serviceName = "postgresml",
replicas = 1,
selector = LabelSelectorArgs(matchLabels = dbLabels),
template = PodTemplateSpecArgs(
metadata = ObjectMetaArgs(
labels = dbLabels
),
spec = PodSpecArgs(
containers = ContainerArgs(
name = "postgresml",
image = "ghcr.io/postgresml/postgresml:2.8.2",
args = List("tail", "-f", "/dev/null"),
readinessProbe = ProbeArgs(
exec = ExecActionArgs(
command = List("psql", "-d", "postgresml", "-c", "SELECT 1")
),
initialDelaySeconds = 15,
timeoutSeconds = 2
),
livenessProbe = ProbeArgs(
exec = ExecActionArgs(
command = List("psql", "-d", "postgresml", "-c", "SELECT 1")
),
initialDelaySeconds = 45,
timeoutSeconds = 2
),
ports = List(
ContainerPortArgs(name = "postgres", containerPort = postgresPort),
ContainerPortArgs(name = "dashboard", containerPort = dashboardPort)
)
) :: Nil
)
)
)
),
opts(customTimeouts = CustomTimeouts(create = 10.minutes))
)
We configure our database container with proper readiness and liveness probes, expose ports, set up a custom timeout for creation to ten minutes because the image for PostgresML is huge and deal with the fact that without overridden args the container quits immediately with a good, old tail -f /dev/null. With StatefulSet done we have to create a Service to expose the database to our pods:
scala
val postgresMlService = Service(
"postgresml-svc",
ServiceArgs(
spec = ServiceSpecArgs(
selector = dbLabels,
ports = List(
ServicePortArgs(name = "postgres", port = postgresPort, targetPort = postgresPort),
ServicePortArgs(name = "dashboard", port = dashboardPort, targetPort = dashboardPort)
)
),
metadata = ObjectMetaArgs(
namespace = appNamespace.metadata.name,
labels = labels
)
),
opts(
dependsOn = postgresmlStatefulSet
)
)
We pass all the necessary properties and mark the Service dependent on the creation of the StatefulSet via dependsOn resource option as they are unrelated entities otherwise and Service could fail with a timeout error waiting on the pods of StatefulSet to become available. We then extract the name of the service to get an internal Kubernetes domain name via which we can reach the database and create the first output value of our component.
scala
val postgresmlHost = postgresMlService.metadata.name
.getOrFail(Exception("postgresml service name not found!"))
val jdbcUrl = p"jdbc:postgresql://${postgresmlHost}:${postgresPort}/postgresml"
// below
AppDeployment(jdbcUrl, ???)
Next step is to declare a Deployment of our AI app.
scala
val appDeployment =
Deployment(
name,
DeploymentArgs(
spec = DeploymentSpecArgs(
selector = LabelSelectorArgs(matchLabels = labels),
replicas = 1,
template = PodTemplateSpecArgs(
metadata = ObjectMetaArgs(
name = p"$name-deployment",
labels = labels,
namespace = appNamespace.metadata.name
),
spec = PodSpecArgs(
containers = ContainerArgs(
name = "app",
image = "ghcr.io/lbialy/askme:0.1.0",
ports = List(
ContainerPortArgs(name = "http", containerPort = containerPort)
),
env = List(
EnvVarArgs(
name = "OPENAI_API_KEY",
value = openAIToken
),
EnvVarArgs(
name = "JDBC_URL",
value = jdbcUrl
),
EnvVarArgs(
name = "DOCS_BASE_URL",
value = docsBaseUrl
)
),
readinessProbe = ProbeArgs(
httpGet = HttpGetActionArgs(
path = "/",
port = containerPort
),
initialDelaySeconds = 10,
periodSeconds = 5
),
livenessProbe = ProbeArgs(
httpGet = HttpGetActionArgs(
path = "/",
port = containerPort
),
initialDelaySeconds = 10,
periodSeconds = 5
)
) :: Nil
)
)
),
metadata = ObjectMetaArgs(
namespace = appNamespace.metadata.name
)
)
)
There's nothing really fancy happening here - it's a plain, usual specification of a Deployment with configuration of the AI app passed via environment variables, with readiness and liveness probe for it to be a good neighbour in k8s environment and exposed ports. Our Deployment will need a companion Service and an Ingress based on K3S built-in Traefik:
scala
val appService =
Service(
s"$name-svc",
ServiceArgs(
spec = ServiceSpecArgs(
selector = labels,
ports = List(
ServicePortArgs(name = "http", port = servicePort, targetPort = containerPort)
),
`type` = ServiceSpecType.ClusterIP
),
metadata = ObjectMetaArgs(
namespace = appNamespace.metadata.name,
labels = labels
)
),
opts(deleteBeforeReplace = true)
)
val appIngress =
Ingress(
s"$name-ingress",
IngressArgs(
spec = IngressSpecArgs(
rules = List(
IngressRuleArgs(
host = ingressHost,
http = HttpIngressRuleValueArgs(
paths = List(
HttpIngressPathArgs(
path = "/",
pathType = "Prefix",
backend = IngressBackendArgs(
service = IngressServiceBackendArgs(
name = appService.metadata.name.getOrElse(name),
port = ServiceBackendPortArgs(
number = servicePort
)
)
)
)
)
)
)
)
),
metadata = ObjectMetaArgs(
namespace = appNamespace.metadata.name,
labels = labels,
annotations = Map(
"kubernetes.io/ingress.class" -> "traefik"
)
)
)
)
With this done the only thing to do is to compute the second output property of our AppDeployment component:
scala
// use all of the above and return final url
val appUrl =
for
_ <- appNamespace
_ <- postgresmlStatefulSet
_ <- postgresMlService
_ <- appDeployment
_ <- appService
_ <- appIngress
url <- p"http://$ingressHost/"
yield url
AppDeployment(jdbcUrl, appUrl)
It's time to use the component in our main Pulumi.run block. We also need to create a Kubernetes Provider instance first to let Pulumi know onto which cluster we want to deploy all of our resources:
scala
val k3sProvider = k8s.Provider(
"k8s",
k8s.ProviderArgs(
kubeconfig = k3s.flatMap(_.kubeconfig)
)
)
val app = AppDeployment(
"askme",
AppDeploymentArgs(
PostgresArgs(
port = 5432,
dashboardPort = 8000
),
AppArgs(
name = "askme",
replicas = 1,
containerPort = 8080,
servicePort = 8080,
host = "machinespir.it",
openAIToken = config.requireString("openai_token"),
docsBaseUrl = "https://virtuslab.github.io/besom/docs/"
)
),
ComponentResourceOptions(
providers = k3sProvider,
deletedWith = k3s
)
)
The domain being used here is privately owned and is managed by Cloudflare and this fact will come handy in a moment. We set up some configuration arguments and also inform Pulumi that, again, it's not worth dealing with deletion of Kubernetes entities if the whole Kubernetes cluster is going down.
We also do something quite significant here: we pass the k3sProvider as one of ComponentResourceOptions providers. You might have noticed above that we didn't pass opts (provider = k3sProvider) to all of the Kubernetes resources we created - that's because components have a very useful property - all resources belonging to the component inherit it's providers if they are any that match their package name.
This means that all the Kubernetes resources declared in the AppDeployment component will automatically use the correct provider, which is nice and definitely helps to keep bugs away.
Going public
Final step of the whole thing is to tinker a bit with the domain controlled by Cloudflare to make it point towards our cluster's and it's ingress ports. This bit is small enough not to put it into a component so we're going to just add it to the main Pulumi.run block as is:
scala
val cfProvider = cf.Provider(
"cloudflare-provider",
cf.ProviderArgs(
apiToken = config.requireString("cloudflare_token")
)
)
val aRecords = serverPool.zipWithIndex.map { case (server, idx) =>
val recordIdx = idx + 1
cf.Record(
s"askme-a-record-$recordIdx",
cf.RecordArgs(
name = "machinespir.it",
`type` = "A",
value = server.ipv4Address,
zoneId = config.requireString("cloudflare_zone_id"),
ttl = 1,
proxied = true
),
opts(provider = cfProvider)
)
}.parSequence
We create a Provider instance for Cloudflare and then use it to create an A DNS record for each of the servers. We also configure proxying setup so that we can benefit from Cloudflare's automatic TLS setup (provided it's set to flexible mode!) without deploying our own CertManager to set up Let's Encrypt certs. That's obviously something we should do in the future but for now this is good enough.
There's also a small helper piece that will help us connect to the newly deployed cluster which basically writes the adjusted kubeconfig.conf file into our working directory:
scala
// use in bash: KUBECONFIG=~/.kube/config:$(pwd)/kubeconfig.conf
val writeKubeconfig = k3s.flatMap { k3s =>
k3s.kubeconfig.map { kubeconfig =>
os.write.over(os.pwd / "kubeconfig.conf", kubeconfig)
}
}
We modify the final Stack definition:
scala
Stack(spawnNodes, writeKubeconfig, k3s, app, aRecords).exports(
nodes = nodeIps,
kubeconfigPath = (os.pwd / "kubeconfig.conf").toString,
url = app.flatMap(_.appUrl)
)
and our full deployment is ready. Now we can trigger pulumi up and see that everything deploys just fine beside our AI app, which ends up in a crash loop. We fetch the logs from the pod and see that we forgot to pass the PORT env variable. Sigh, such a dumb error, if only we used a language with superpowers...
Using Scala 3 superpowers for safety and profit
The piece of code that caused a small headache for us is this part of the deployment of our AI app.
scala
env = List(
EnvVarArgs(
name = "OPENAI_API_KEY",
value = openAIToken
),
EnvVarArgs(
name = "JDBC_URL",
value = jdbcUrl
),
EnvVarArgs(
name = "DOCS_BASE_URL",
value = docsBaseUrl
)
),
One of the EnvVarArgs is missing and it's very easy to miss this because there's an impassable border between our AI app, which is currently nicely wrapped in a docker image, and the runtime of infrastructure code. Is it really impassable? It turns out that if you use Scala there aren't many things that are impossible to do! Enter the besom-cfg package which allows us to safely connect the configuration of applications with the declarations of the infrastructure layer that will provide necessary configuration. Let's see how this works and start with an assumption that we're using a naive, hand-written configuration resolution:
scala
case class Config(
port: Int,
openAIApiKey: String,
jdbcUrl: String,
docsBaseUrl: String
)
object Config:
def fromEnv[A](key: String, f: String => A = identity): A =
val strVal =
try sys.env(key)
catch
case _: NoSuchElementException =>
throw Exception(s"Required configuration key $key not present among environment variables")
try f(strVal)
catch
case t: Exception =>
throw Exception(s"Failed to convert value $strVal for key $key", t)
def apply(): Config =
new Config(
fromEnv("PORT", _.toInt),
fromEnv("OPENAI_API_KEY"),
fromEnv("JDBC_URL"),
fromEnv("DOCS_BASE_URL")
)
It's worth to note that this could be done with pureconfig, ciris or zio-config - it doesn't matter as these libraries do exactly the same thing: resolve configuration from configuration files and/or environment variables without exposing any information about what configuration variables are necessary for the AI app to start successfully.
That's not the case with besom-cfg as the resolution of configuration case classes known from aforementioned libraries is just a part of what it does. More important bit is that it allows the infrastructural Besom program to introspect into the AI app at compile time and exposes information about what is really necessary for the app to successfully start. Let's see how this looks like in the application code:
scala
import besom.cfg.*
case class Config(
port: Int,
openAIApiKey: String,
jdbcUrl: String,
docsBaseUrl: String
) derives Configured
@main def main() =
val config: Config = resolveConfiguration[Config]
That's all, just derives Configured clause on a case class with our configuration and a call to resolveConfiguration function to load it. Nothing else has to be done. Now let's see what has to change in infrastructural code. We need to include some new imports:
scala
import besom.cfg.k8s.ConfiguredContainerArgs
import besom.cfg.Struct
and then replace things in the deployment of our AI app.
scala
val appDeployment =
Deployment(
name,
DeploymentArgs(
spec = DeploymentSpecArgs(
selector = LabelSelectorArgs(matchLabels = labels),
replicas = 1,
template = PodTemplateSpecArgs(
metadata = ObjectMetaArgs(
name = p"$name-deployment",
labels = labels,
namespace = appNamespace.metadata.name
),
spec = PodSpecArgs(
containers = ConfiguredContainerArgs( // here
name = "app",
image = "ghcr.io/lbialy/askme:0.1.0",
configuration = Struct( // here
openAIApiKey = openAIToken,
jdbcUrl = jdbcUrl,
docsBaseUrl = docsBaseUrl
),
// env removed
ports = List(
ContainerPortArgs(name = "http", containerPort = containerPort)
),
readinessProbe = ProbeArgs(
httpGet = HttpGetActionArgs(
path = "/",
port = containerPort
),
initialDelaySeconds = 10,
periodSeconds = 5
),
livenessProbe = ProbeArgs(
httpGet = HttpGetActionArgs(
path = "/",
port = containerPort
),
initialDelaySeconds = 10,
periodSeconds = 5
)
) :: Nil
)
)
),
metadata = ObjectMetaArgs(
namespace = appNamespace.metadata.name
)
)
)
Notice that we're still missing port property that is a part of the Config class on the application's side but now this is a compile error of the infrastructure program and this successfully prevents such a mistake:
bash
λ scala-cli compile .
Compiling project (Scala 3.3.1, JVM (17))
[error] ./app.scala:178:21
[error] Configuration provided for container app (ghcr.io/lbialy/askme:0.1.0) is invalid:
[error]
[error] {
[error] port: Int // missing
[error] openAIApiKey: String
[error] jdbcUrl: String
[error] docsBaseUrl: String
[error] }
Error compiling project (Scala 3.3.1, JVM (17))
Compilation failed
besom-cfg performs type checking of the configuration and will be capable of passing information about type refinements from libraries like Iron too:
scala
configuration = Struct(
port = "8080",
openAIApiKey = openAIToken,
jdbcUrl = jdbcUrl,
docsBaseUrl = docsBaseUrl
),
bash
λ scala-cli compile .
Compiling project (Scala 3.3.1, JVM (17))
[error] ./app.scala:178:21
[error] Configuration provided for container app (ghcr.io/lbialy/askme:0.1.0) is invalid:
[error]
[error] {
[error] port: got String, expected Int
[error] openAIApiKey: String
[error] jdbcUrl: String
[error] docsBaseUrl: String
[error] }
Error compiling project (Scala 3.3.1, JVM (17))
Compilation failed
The library currently supports Kubernetes containers only but other forms of deployment and integrations will be available in the future.
Deployment time!
After the setup of besom-cfg and correction of configuration for the AI app it's finally time to deploy the whole stack. To do this we have to provide the rest of configuration properties using pulumi config set:
- cloudflare_token
- cloudflare_zone_id
- github_docker_token
- openai_token
After this is done we can execute the deployment:
bash
λ pulumi up
Previewing update (dev):
Type Name Plan
+ pulumi:pulumi:Stack k3s-on-hetzner-dev create
+ ├─ hcloud:index:SshKey ssh-key create
+ ├─ hcloud:index:Server k3s-server-1 create
+ ├─ hcloud:index:Server k3s-server-3 create
+ ├─ hcloud:index:Server k3s-server-2 create
+ ├─ user:component:K3S askme-dev create
+ │ ├─ command:remote:Command start-k3s-leader create
+ │ ├─ command:remote:Command insert-ghcr-token-leader create
+ │ ├─ command:remote:Command restart-k3s-leader create
+ │ ├─ command:remote:Command start-k3s-follower-2 create
+ │ ├─ command:remote:Command start-k3s-follower-1 create
+ │ ├─ command:remote:Command insert-ghcr-token-1 create
+ │ ├─ command:remote:Command insert-ghcr-token-2 create
+ │ ├─ command:remote:Command restart-k3s-follower-2 create
+ │ ├─ command:remote:Command restart-k3s-follower-1 create
+ │ └─ command:remote:Command get-kubeconfig create
+ ├─ pulumi:providers:kubernetes k8s create
+ ├─ user:component:app-deployment askme create
+ │ ├─ kubernetes:core/v1:Namespace askme create
+ │ ├─ kubernetes:apps/v1:StatefulSet postgresml create
+ │ ├─ kubernetes:core/v1:Service postgresml-svc create
+ │ ├─ kubernetes:apps/v1:Deployment askme create
+ │ ├─ kubernetes:core/v1:Service askme-svc create
+ │ └─ kubernetes:networking.k8s.io/v1:Ingress askme-ingress create
+ ├─ pulumi:providers:cloudflare cloudflare-provider create
+ ├─ cloudflare:index:Record askme-a-record-3 create
+ ├─ cloudflare:index:Record askme-a-record-1 create
+ └─ cloudflare:index:Record askme-a-record-2 create
Outputs:
kubeconfigPath: "/Users/lbialy/Projects/foss/pulumi/askme/infra/kubeconfig.conf"
nodes : output<string>
Resources:
+ 28 to create
Do you want to perform this update? yes
Updating (dev):
Type Name Status
+ pulumi:pulumi:Stack k3s-on-hetzner-dev created (440s)
+ ├─ hcloud:index:SshKey ssh-key created (0.45s)
+ ├─ hcloud:index:Server k3s-server-3 created (13s)
+ ├─ hcloud:index:Server k3s-server-1 created (13s)
+ ├─ hcloud:index:Server k3s-server-2 created (11s)
+ ├─ user:component:K3S askme-dev created (46s)
+ │ ├─ command:remote:Command start-k3s-leader created (24s)
+ │ ├─ command:remote:Command insert-ghcr-token-leader created (0.54s)
+ │ ├─ command:remote:Command restart-k3s-leader created (4s)
+ │ ├─ command:remote:Command get-leader-token created (0.55s)
+ │ ├─ command:remote:Command start-k3s-follower-1 created (15s)
+ │ ├─ command:remote:Command start-k3s-follower-2 created (12s)
+ │ ├─ command:remote:Command insert-ghcr-token-2 created (0.55s)
+ │ ├─ command:remote:Command restart-k3s-follower-2 created (3s)
+ │ ├─ command:remote:Command insert-ghcr-token-1 created (0.50s)
+ │ ├─ command:remote:Command restart-k3s-follower-1 created (5s)
+ │ └─ command:remote:Command get-kubeconfig created (0.56s)
+ ├─ pulumi:providers:kubernetes k8s created (0.00s)
+ ├─ user:component:app-deployment askme created (306s)
+ │ ├─ kubernetes:core/v1:Namespace askme created (0.20s)
+ │ ├─ kubernetes:apps/v1:StatefulSet postgresml created (270s)
+ │ ├─ kubernetes:core/v1:Service postgresml-svc created (10s)
+ │ ├─ kubernetes:core/v1:Service askme-svc created (23s)
+ │ ├─ kubernetes:apps/v1:Deployment askme created (95s)
+ │ └─ kubernetes:networking.k8s.io/v1:Ingress askme-ingress created (0.14s)
+ ├─ pulumi:providers:cloudflare cloudflare-provider created (0.00s)
+ ├─ cloudflare:index:Record askme-a-record-1 created (1s)
+ ├─ cloudflare:index:Record askme-a-record-3 created (1s)
+ └─ cloudflare:index:Record askme-a-record-2 created (1s)
Outputs:
kubeconfigPath: "/Users/lbialy/Projects/foss/pulumi/askme/infra/kubeconfig.conf"
nodes : [
[0]: "157.90.171.154"
[1]: "65.109.163.32"
[2]: "167.235.230.34"
]
url : "https://machinespir.it"
Resources:
+ 29 created
Duration: 7m22s
Now we can visit the deployed site at the provided url and ask some questions about Besom.
Conclusion
The AI app itself is obviously a very limited PoC built to impress readers and it would require a lot of further iterations to make it really useful (along with a widget to embed it in an actual docs site) but it proves the point - full stack Scala is really here. It also hopefully conveys that it’s possible to build experiments, proofs of concepts and small apps in Scala 3 with code that’s easy to write, easy to grasp and that still offers much more scalability and type safety than alternatives. Moreover, with new developments around Besom, Scala offers a productive environment that integrates application layer with infrastructure layer and removes whole classes of errors in the deployment phase of the application lifecycle.
Full code for this blogpost is available here: https://github.com/VirtusLab/besom-ask-me
Our deployment will be available for some time but feel free to deploy your own!