Akka Http is a module of Akka that provides a full HTTP and Websocket server and client implementation, building on the power of Akka Streams. This means backpressure and resilience transparently out of the box. Great!
I recently needed to implement a bi-directional Websocket channel where each connected client is handled by an actor. The examples I could find were mostly about building a chat and therefore with a broader focus. Here is my simpler example.
"Hello " + s + "!"
when receiving a String messageAs described in the Server Side Websocket Support
page of Akka Http, a Websocket connection is modelled as a Flow that ingests messages and returns messages.
The key issue to solve was that I needed a reference to independently push messages down to the connected client. The trick
that helped me came from this post
by Bartek Kalinka. The idea is to pre-materialize a Source.actorRef
which jots down messages to a publisher Sink.
val (down, publisher) = Source
.actorRef[String](1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(fanout = false))(Keep.both)
.run()
At this point, each message sent to the down
actorRef will end up in the publisher
Sink. We can then create a Source
out of this sink, and use that source as the output for the Websocket Flow required by Akka Http.
Source.fromPublisher(publisher).map(TextMessage(_))
Every message sent to the down
actorRef above will be published as a TextMessage
from the source we just created.
The actor that brings this all together looks like this:
class ClientHandlerActor extends Actor {
implicit val as = context.system
implicit val am = ActorMaterializer()
val (down, publisher) = Source
.actorRef[String](1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(fanout = false))(Keep.both)
.run()
override def receive = {
case GetWebsocketFlow =>
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
// only works with TextMessage. Extract the body and sends it to self
val textMsgFlow = b.add(Flow[Message]
.mapAsync(1) {
case tm: TextMessage => tm.toStrict(3.seconds).map(_.text)
case bm: BinaryMessage =>
// consume the stream
bm.dataStream.runWith(Sink.ignore)
Future.failed(new Exception("yuck"))
})
val pubSrc = b.add(Source.fromPublisher(publisher).map(TextMessage(_)))
textMsgFlow ~> Sink.foreach[String](self ! _)
FlowShape(textMsgFlow.in, pubSrc.out)
})
sender ! flow
// sends "Hello XXX!" down the websocket
case s: String => down ! "Hello " + s + "!"
// passes any int down the Websocket
case n: Int => down ! n.toString
}
}
As it is born, it materializes the actorRef and the publisher Sink. When asked to return the handling flow, it creates
the flow using the GraphDSL
syntax and sends it back. In your route, it goes like
path("connect") {
val handler = as.actorOf(Props[ClientHandlerActor])
val futureFlow = (handler ? GetWebsocketFlow) (3.seconds).mapTo[Flow[Message, Message, _]]
onComplete(futureFlow) {
case Success(flow) => handleWebsocketMessages(flow)
case Failure(err) => complete(err.toString)
}
}
Now you can build your own custom logic and behaviour around the ClientHandlerActor. Full code is available on a GitHub repository. Note that this won’t work in a distributed setting. For that we’ll need a couple adjustments, in the next post!
All Tags |
The Netherlands |
61
|
amsterdam |
35
|
Bicycle |
19
|
Chile |
18
|
Valparaiso |
15
|
Australia |
13
|
Art |
12
|
nepal |
8
|
scala |
8
|
akka |
6
|
Santiago |
5
|
France |
4
|
Gouda |
4
|
Paris |
4
|
akka-stream |
3
|
akka-streams |
3
|
dashain |
3
|
everest trek |
3
|
india |
3
|
Italy |
3
|
Melbourne |
3
|
Perth |
3
|
Road trip |
3
|
Rotterdam |
3
|
akka-http |
2
|
Argentina |
2
|
code |
2
|
custom_image |
2
|
custom_summary |
2
|
Delft |
2
|
event-sourcing |
2
|
Geraldton |
2
|
Haarlem |
2
|
leaf_bundle |
2
|
Lille |
2
|
Milan |
2
|
New Delhi |
2
|
New York |
2
|
Punta Arenas |
2
|
Rome |
2
|
Ushuaia |
2
|
Websocket |
2
|
Abcoude |
1
|
akka-cluster |
1
|
amazon web services |
1
|
android |
1
|
aws |
1
|
Berlin |
1
|
Bloemendaal |
1
|
Brisbane |
1
|
chitwan |
1
|
Circus Maximum |
1
|
covid19 |
1
|
Enkhuizen |
1
|
Esperance |
1
|
Fraser Island |
1
|
gps |
1
|
gpx |
1
|
guitars |
1
|
iot |
1
|
Isla Negra |
1
|
japan |
1
|
java |
1
|
Kalgoorlie |
1
|
kathmandu |
1
|
Las Vegas |
1
|
LoPy |
1
|
lora |
1
|
Markem |
1
|
Matisse |
1
|
Mexico |
1
|
Middelburg |
1
|
misc |
1
|
Muiden |
1
|
planning |
1
|
play |
1
|
reactjs |
1
|
refluxjs |
1
|
Reims |
1
|
San Francisco |
1
|
Sodaq |
1
|
Sydney |
1
|
Texel |
1
|
Theo Van Doesburg |
1
|
tokyo |
1
|
Travel |
1
|
tulips |
1
|
USA |
1
|
webjars |
1
|
Weesp |
1
|