AnyCable off Rails: connecting Twilio streams with Hanami
Processing audio streams in realtime with AnyCable and Go
Originally published on Martian Chronicles
WebSockets has been around for years, but its popularity isn’t showing any signs of fading. They still power most of the real-time web, primarily because they’re supported by 98% of browsers in use today (Opera Mini users, we miss you). However, browsers, mobile applications, and other clients aren’t the only ones using this technology. There can be situations when communication between servers require a high throughput channel—and WebSockets come into play. In this post, we explore how to tame a server-to-server WebSockets communication and put it under the control of a Ruby app with the help of AnyCable.
Today, building a WebSocket-driven web application is a piece of cake: major web frameworks provide everything you need out of the box. There are tons of client libraries and PaaS services—the surplus of choice is the only problem.
Dealing with server-to-server connections is not that simple: we need to come up with a protocol and implement both server and client solutions. It becomes even more complicated when we have to integrate with a third-party provider using a unique protocol: we’re likely to build everything from scratch.
What can a WebSocket-based integration between servers look like? One example is the so-called (okay, we made it up) WebSocketHooks: a concept similar to webhooks, but instead of performing an HTTP request to your server, a third party establishes a WebSocket connection and streams data. Thus, you need to somehow handle these connections in your, say, Rails application, but you can’t rely on Action Cable because this third party doesn’t speak the Action Cable protocol.
And that’s where AnyCable enters the story. The “Any” part of AnyCable is not a marketing gimmick, it’s actually reflects the true nature of the project, and its core motto—connect everyone with anything. Today, you’ll learn how to build a WebSocket application on top of AnyCable-Go to support custom protocols and build sophisticated data processing workflows—all controlled by your Ruby application. We’ll cover the following topics:
AnyCable-Go as a library
AnyCable-Go is a WebSocket server that powers AnyCable applications. It’s a core component that is responsible for handling connections, dealing with broadcasts, and so on. Usually, AnyCable-Go is used as a standalone service launched via the official CLI, anycable-go
, or a Docker image. However, if you take a look at the source code, you can see that the application has a modular architecture and can be seen itself as a framework for building. There are over a dozen packages responsible for different aspects of the application lifecycle:
$ ls -l
cli/
common/
encoders/
metrics/
node/
pubsub/
rpc/
server/
...
The list above shows the most important components of AnyCable-Go and sheds light on its architecture. Unsurprisingly, the cli
package is responsible for the command-line interface: parsing options, configuring, and running the application. The rpc
and pubsub
components are responsible for RPC communication and receiving broadcast messages, respectively. If we need to build a new broadcasting implementation (like we did with NATS), we just need to build a new pubsub
implementation. There’s no need to touch any other components. The same holds for RPC controllers.
The encoders
package is of particular importance for us: it abstracts the concept of Encoder, a component responsible for transforming WebSocket messages to and from internal representation (which is described in the common
package, by the way). (In the OSS version of AnyCable, we only have a JSON encoder, but the Pro version comes with three more: two for binary protocols and one more for Apollo GraphQL support.)
How can you add a custom encoder for AnyCable-Go? Well, in Go, we cannot simply write a new .go
file, put it somewhere, and load at runtime. We need to rebuild the entire application. One option is to fork the anycable/anycable-go
project and maintain it till the end of time. The risks and maintainability overhead of this approach are huge. So, we need a better option—let’s use AnyCable-Go as a library!
Since AnyCable-Go was originally designed to be distributed as a service, and not a library, it can be tricky to get started with it in a blank Go project. Luckily, we have you covered—let us introduce the AnyCable-Go scaffold!
AnyCable-Go scaffold is a minimal Go application that demonstrates the most useful APIs and integration points. With its help, you can focus on implementing your extension right away without thinking of how to glue pieces together. It also comes with useful CI workflows, tests and linters configured, and a Makefile
with useful commands.
Let’s explore the most important bits of the source code.
NOTE: Here and below we omit parts of the source code (e.g., error handling) for better readability.
In the cmd
package, we can see the initialization code for the binary:
func main() {
conf := config.NewConfig()
anyconf, err, ok := acli.NewConfigFromCLI(
os.Args,
acli.WithCLIName("mycable"),
acli.WithCLIUsageHeader("MyCable, the custom AnyCable-Go build"),
acli.WithCLIVersion(version.Version()),
acli.WithCLICustomOptions(cli.CustomOptions(conf)),
)
// error handling
if err := cli.Run(conf, anyconf); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
}
Here you can extend the set of configuration parameters for your application (WithCLICustomOptions
) and change the metadata information. So, if you build the app (make build
) and run dist/mycable -h
you can see all the AnyCable settings listed along with yours.
The cli
package contains the application initialization code:
func initAnyCableRunner(appConf *config.Config, anyConf *aconfig.Config) (*acli.Runner, error) {
opts := []acli.Option{
acli.WithDefaultSubscriber(),
acli.WithWebSocketEndpoint("/ws", myWebsocketHandler(appConf)),
}
if appConf.FakeRPC {
opts = append(opts, acli.WithController(func(m *metrics.Metrics, c *aconfig.Config) (node.Controller, error) {
return fake_rpc.NewController(), nil
}))
} else {
opts = append(opts, acli.WithDefaultRPCController())
}
return acli.NewRunner(anyConf, opts)
}
This is where you inject your custom logic. The example code demonstrates how to define an additional WebSocket endpoint ("/ws"
) with a custom handler. The built-in "/cable"
endpoint would work, too (we’ll use this feature in our demo application, see below). The handler code shows how to attach a custom encoder and an executor to a WebSocket session:
func myWebsocketHandler(config *config.Config) func(n *node.Node, c *aconfig.Config) (http.Handler, error) {
return func(n *node.Node, c *aconfig.Config) (http.Handler, error) {
extractor := ws.DefaultHeadersExtractor{Headers: c.Headers, Cookies: c.Cookies}
executor := custom.NewExecutor(n)
return ws.WebsocketHandler([]string{}, &extractor, &c.WS, func(wsc *websocket.Conn, info *ws.RequestInfo, callback func()) error {
wrappedConn := ws.NewConnection(wsc)
session := node.NewSession(n, wrappedConn, info.URL, info.Headers, info.UID)
session.SetEncoder(custom.Encoder{})
session.SetExecutor(executor)
// Invokes Authenticate RPC method
_, err := n.Authenticate(session)
if err != nil {
return err
}
return session.Serve(callback)
}), nil
}
}
What is an executor? Well, not every protocol can be directly translated into Action Cable communication flow. An executor solves this problem by intercepting incoming messages and translating them into RPC calls, or handling them inside the Go application.
The starter code also provides a fake RPC controller implementation. This can be used to stub RPC calls, so you can test the Go application in isolation.
Now, that we’ve learned the basics of AnyCable-Go, let’s build something cool!
Processing Twilio Media streams with AnyCable
It’s not easy to find an example of a WebSocketHook in the wild. Okay, we’ve had experience with just one—Twilio Media streams.
Twilio provides a collection of communication-related services, from old-school SMS messaging and phone calls to email marketing and real-time video tools. The Programmable Voice APIs allow you to make and receive phone calls. So, with Twilio, you can build voice assistants, auto-responders or other kinds of virtual assistants.
One of the distinguishing features of the Twilio Voice API is the ability to receive voice data in real-time via media streams. A media stream can be created for a phone call to send audio bytes and metadata via a WebSocket connection to your server. How can we leverage this feature?
Let’s build a phone call monitoring application. The app will provide a dashboard to see live calls (made via Twilio) in real-time and also will allow you to “peek” at the calls—see transcripts of what’s been said, and also in real-time.
The stack we’ll use today includes the following components, besides AnyCable and Twilio:
Vosk server: it provides offline speech recognition capabilities.
Hanami: a Ruby web framework. Why should everyone just use Rails?
The complete data flow looks like this:
The Hanami application initiates a phone call (our robot asks the recipient an important question: “Why do you love Ruby?“)
Twilio initiates a media stream.
AnyCable accepts the media stream’s WebSocket connection and authenticates it (by calling Hanami via AnyCable RPC).
AnyCable consumes media packets, sends them to Vosk, and transmits the recognition results back to Hanami app via RPC.
As the phone call ends, Twilio closes the media stream’s connection and AnyCable notifies Hanami about it (via RPC, as always).
As you can see, we try to keep as little logic as possible in the AnyCable-Go application; all the business-logic aspects are delegated to the Ruby app. This is an important motivation behind using AnyCable as a framework for handling Twilio media streams: you can easily integrate this service with your existing Ruby or Rails application without spreading the logic across applications.
Using AnyCable as a framework for building a real-time service simplifies the integratation of this service with the existing Ruby/Rails application and helps keep business-logic in one place.
The final source code can be found on GitHub. Let’s look at the most interesting parts, starting with the Go application.
Translating from Twilio to AnyCable
The first required component for our application is Twilio encoder. Twilio has decent documentation; we can find the specification of the incoming WebSocket messages there and use it as a reference for our encoder.
We define a Go struct for each message type and then use a generic DecodedMessage
struct as a container for incoming messages:
type DecodeMessage struct {
Event string `json:"event"`
StreamSID string `json:"streamSid"`
Start StartPayload `json:"start,omitempty"`
Media MediaPayload `json:"media,omitempty"`
Stop StopPayload `json:"stop,omitempty"`
Mark MarkPayload `json:"mark,omitempty"`
}
Then, in the encoder’s Decode
function, we transform the DecodedMessage
into a CommonMessage
—this is AnyCable’s internal representation of incoming messages:
func (Encoder) Decode(raw []byte) (*common.Message, error) {
twMsg := &DecodeMessage{}
if err := json.Unmarshal(raw, &twMsg); err != nil {
return nil, err
}
var data interface{}
switch twMsg.Event {
case StartEvent:
data = twMsg.Start
case MediaEvent:
data = twMsg.Media
case MarkEvent:
data = twMsg.Mark
case StopEvent:
data = twMsg.Stop
}
msg := common.Message{Command: twMsg.Event, Identifier: twMsg.StreamSID, Data: data}
return &msg, nil
}
The Twilio commands (“start”, “media”, etc.) do not match the Action Cable commands (“subscribe”, “message”, etc.). Moreover, we do not have any subscriptions for Twilio streams. So, we need to treat the incoming messages stream differently. For that, we need to define an executor:
// Handling Twilio events and transforming them into Action Cable commands
type Executor struct {
node node.AppNode
conf *config.Config
}
// HandleCommand is reponsible for handling incoming messages; here msg has been decoded
// with the Twilio encoder
func (ex *Executor) HandleCommand(s *node.Session, msg *common.Message) error {
// ...
if msg.Command == StartEvent {
// ...
}
if msg.Command == MediaEvent {
// ...
}
// Ignore everything else
return nil
}
We only care about two events: “start” and “media”. The first, “start”, carries metadata about the call, which we can use for authentication. “media” messages contain actual audio bytes. Let’s talk a bit about the speech recognition part of the service.
Vosk integration
Disclaimer: The implementation of this recognition service is meant for demonstration purposes only. Do not use this in production.
We created a separate component called Streamer
, which is responsible for consuming audio bytes and sending them to a Vosk server via gRPC.
There are two API functions: KickOff()
and Push(msg *Packet)
. The first one creates a gRPC client and starts a bi-directional gRPC stream (StreamingRecognize
):
func (s *Streamer) KickOff(ctx context.Context) error {
// ...
conn, _ := grpc.Dial(s.config.VoskRPC, dialOptions...)
s.client = vosk.NewSttServiceClient(conn)
stream, _ := s.client.StreamingRecognize(cancelCtx)
stream.Send(&vosk.StreamingRecognitionRequest{
StreamingRequest: &vosk.StreamingRecognitionRequest_Config{
Config: &vosk.RecognitionConfig{
Specification: &vosk.RecognitionSpec{
SampleRateHertz: 8000,
PartialResults: s.config.PartialRecognize,
},
},
},
})
s.stream = stream
go s.readFromStream()
return nil
}
Note that we also send the first message over the stream to configure the recognition service. We must provide the audio sample rate (Twilio uses 8kHz) and can also turn partial results on or off. When partial results are on, the recognition results arrive quickly, but they can be incomplete and less accurate; there are multiple results for the same phrase.
In this function, we also start a Go routine to read results from the stream:
func (s *Streamer) readFromStream() {
for {
resp, err := s.stream.Recv()
if err == nil {
chunk := resp.GetChunks()[0]
alt := chunk.Alternatives[0]
if alt.Text == "" && chunk.Final {
s.log.Debugf("recognition completed")
break
}
if alt.Text != "" {
s.sendResultFunction(&Response{Message: alt.Text, Final: chunk.Final, Event: "transcript"})
}
} else {
// error handling
break
}
}
s.conn.Close()
}
The sendResultFunction
function is a callback we defined in the executor (see below).
Finally, the Push()
function:
func (s *Streamer) Push(msg *Packet) error {
s.buf.Write(msg.Audio)
if s.buf.Len() > bytesPerFlush {
s.stream.Send(&vosk.StreamingRecognitionRequest{
StreamingRequest: &vosk.StreamingRecognitionRequest_AudioContent{
AudioContent: s.buf.Bytes(),
},
}
s.buf.Reset()
}
return nil
}
We buffer the audio before sending it to Vosk in order not to overload the recognition service with requests. For transcription purposes, splitting audio into hundred-millisecond chunks is totally sufficient.
Now, let’s show the “media” part of the executor:
if msg.Command == MediaEvent {
twilioMsg := msg.Data.(MediaPayload)
var t *streamer.Streamer
if rawStreamer, ok := s.InternalState["streamer"]; ok {
t = rawStreamer.(*streamer.Streamer)
}
audioBytes, _ := base64.StdEncoding.DecodeString(twilioMsg.Payload)
err = t.Push(&streamer.Packet{Audio: g711.DecodeUlaw(audioBytes)})
return err
}
We must perform some manipulations with the audio bytes before sending them to the streamer. Twilio sends audio/x-mulaw
bytes as base64 encoded strings, so, first, we need to decode them. Then, we must convert mulaw to PCM (the only codec supported by Vosk out-of-the-box).
As you can see, we use the streamer struct stored in the session’s internal estate: how do we put it there?
Authentication and streamer initialization
Security matters. Our WebSocket server is open to the public (so Twilio can reach it), thus, we need to protect it from unauthorized access.
As soon as Twilio sends a “start” message, we must authenticate the call. For that, we obtain the Twilio account SID (unique identifier) and send it over to AnyCable RPC (Hanami). In Ruby, we can compare it with the known Twilio account identifier, and approve or reject the connection:
if msg.Command == StartEvent {
start, _ok := msg.Data.(StartPayload)
// We add account SID as a header to the sesssion.
// So, we can access it via request.headers['x-twilio-account'] in Ruby.
s.GetEnv().SetHeader("x-twilio-account", start.AccountSID)
res, err := ex.node.Authenticate(s)
if err != nil {
return err
}
// We need to perform an additional RPC call to initialize the channel subscription
// and notify about the call start.
ex.node.Subscribe(s, &common.Message{Identifier: channelId(start.CallSID), Command: "subscribe"})
ex.initStreamer(s, start.CallSID)
return nil
}
We use the call SID as a part of the channel identifier, so in Ruby, we’ll be able to access it via params[:sid]
. This is how we distinguish calls from each other.
The initStreamer
function does exactly what it says it does. The part worth showing here is the transcription result callback:
func (ex *Executor) initStreamer(s *node.Session, sid string) error {
identifier := channelId(sid)
st := streamer.NewStreamer(ex.conf)
st.OnResponse(func(response *streamer.Response) {
_, performError := ex.node.Perform(s, &common.Message{
Identifier: identifier,
Command: "message",
Data: string(
utils.ToJSON(map[string]interface{}{
"action": "handle_message",
"result": response,
})),
})
})
st.KickOff(context.Background())
s.InternalState["streamer"] = st
return nil
}
We use the node.Peform()
function, which performs the respective RPC call under the hood: the #handle_mesage
method will be called on a channel object in Ruby.
That’s pretty much it for the Go application. But before jumping into the Ruby part, let’s talk about how we can test in isolation.
Using wsdirector to emulate Twilio media streams
Developing a feature which heavily relies on a third-party service is always painful. How can you test an application which transcribes a phone call in real-time? You’ll be constantly calling yourself and speaking gibberish on the phone (don’t do this in a public workspace 🙃). It’s not only incovenient, but also time consuming. We Rubyists always look for ways to improve our productivity, and this situation is no exception.
AnyCable is more than a service or a library, it’s an ecosystem. There are plenty of useful tools for any real-time occasions. The one that helped us with testing Twilio media streams is wsdirector. We spent a few minutes and recorded a handful of fixtures: media stream dumps in YAML format. Now, instead of making real phone calls, we can just run a wsdirector
CLI to emulate them:
wsirector -f etc/fixtures/wsdirector/ruby.yml -u ws://localhost:8080/streams
This was especially helpful to test and tune our Vosk integration. As a side effect, you don’t need a Twilio account to play with the demo application—just make the calls via wsdirector!
Faking Vosk
Speaking of DX, we’ve also made it possible to avoid running a Vosk server (which eats a lot of RAM and is painfully slow within Docker on M1 Macs) by creating a fake Vosk server with Ruby and grpc_kit. Thanks to gRPC, all we need to implement is a Ruby class with the #streaming_recognize
method; most of the code we autogenerated using gRPC tools.
Our fake Vosk uses ffaker
to generate random phrases in response to incoming requests. So, combining wsirector, a fake RPC controller, and a fake Vosk server, we can test the Go app completely in isolation.
Connecting AnyCable to Hanami
Usually, we integrate AnyCable into Rails applications. (That also was the case for our original work on Twilio media streams for a client.) However, we decided to add an additional twist to this article and go off the Rails, so to speak.
At Evil Martians, we’ve been familiar with Hanami since its early days (and we even used it in production back when it was still called Lotus), but the recent 2.0 release was new to us. So, we decided to give it a try.
“Kaisen” comes from the Japanese “line” or “circuit”.
The Hanami documentation is a good starting point. We followed the guide closely to bootstrap an application:
$ hanami new kaisen
...
We did skip the part related to persistence—we don’t need it for this demo.
We then faced our first challenge—Hanami, as of v2.0, doesn’t provide a view layer at all (it’s planned for 2.1). We wanted to keep things simple and avoid adding yet another (frontend) application; we were looking for a full-stack way of building Hanami web applications. The search wasn’t fruitful, so we decided to build it from scratch.
Using Phlex for views
Phlex is a Ruby framework for building views. It’s relatively new, but recently hit its first major release; so, it’s ready to be a part of your application.
Read also: ViewComponent in the Wild I: building modern Rails frontends
In Phlex, each view is represented as a Ruby class. Here we can see some similarities to View Component. Unlike View Component though, Phlex also allows you to declare HTML via Ruby by using methods and blocks. Thus, you don’t need to switch between multiple files, a template and a Ruby class, to modify a component.
Here is, for example, our main application view class:
class Show < View
option :call_sid, optional: true
option :phone, optional: true
def template
div(class: "min-w-full flex flex-row") do
div(class: "w-1/3 border-r border-red-100 mr-4") do
a(href: path_for(:calls)) { h2(class: "font-bold text-2xl mb-5") { "Calls" } }
render Form.new(phone:)
hr(class: "border-red-100 mt-2")
div(id: "calls", class: "pr-2") do
stream_from("calls")
end
end
render Events.new(call_sid:)
end
end
end
If you’ve ever worked with React, you’re probably experiencing flashbacks right now.
We can organize views into components (see Form
and Events
above), use OOP features, and so on. For example, our base View
class contains some helpers and adds dry-initializer
for more convenient constructor parameter declaration:
module Kaisen
class View < Phlex::HTML
extend Dry::Initializer
private
def path_for(...) = ::Hanami.app["routes"].path(...)
end
end
Finally, we added a helper to our base action class to infer a view from the action. Here’s how we use it:
module Calls
class Show < Kaisen::Action
def handle(request, response)
call_sid = request.params[:id]
response.body = phlex(locals: {call_sid:})
end
end
end
The interface is obviously inspired by Rails (e.g., locals
). We believe that it’s better to stick to well-known patterns to reduce mental overhead for developers switching between frameworks.
(Let’s not dilute the post with unnecessary code snippets.The helper implementation can be found in the source code.)
Using Vite for assets
Read also: Vite-lizing Rails: get live reload and hot replacement with Vite Ruby
A view layer consists, not only of HTML, but also of assets (JavaScript, CSS, etc.). This challenge was the simplest one to overcome—Vite is the way. Luckily, Vite Ruby is a framework-agnostic project, so we knew we’d be able to make it work with Hanami.
We found the existing vite-hanami
gem, which was built for Hanami 1.3, and cherry-picked some bits from it to make Vite Ruby work with Hanami 2.0. Since Vite depends on the HTML implementation, our Vite helpers rely on Phlex. For example:
def vite_client
return unless src = vite_manifest.vite_client_src
script(src: src, type: "module")
end
def vite_javascript(name, **options)
entries = vite_manifest.resolve_entries(*name, type: :javascript)
return unless entries
entries.first.last.each do |src|
script(src:, **options)
end
end
The full source code can be found here.
We also found that Hanami comes with a very restrictive Content-Security Policy by default, so we had to adjust it to allow serving Vite assets:
environment :development do
# Allow @vite/client to hot reload changes in development
config.actions.content_security_policy[:script_src] += " 'unsafe-eval' 'unsafe-inline'"
config.actions.content_security_policy[:connect_src] += " ws://#{ ViteRuby.config.host_with_port }"
config.actions.content_security_policy[:style_src] += " 'unsafe-eval'"
end
Finally, to serve assets, we added a couple of Rack middlewares:
environment :development do
config.middleware.use(ViteRuby::DevServerProxy) if ViteRuby.run_proxy?
config.middleware.use Rack::Static, { urls: ["/vite-dev/"], root: "public" }
end
The Rack::Static
middleware is used to serve precompiled assets if there is no Vite dev server running.
Getting reactivity ready with Cable Ready
Although we configured Vite and made it possible to use JS in our Hanami application, we decided to rely on as little JS code as possible. Instead, we wanted to follow the HTML-over-the-wire approach.
Ruby is also not a requirement for Hotwire. There are server-side libraries available for other languages, too; for example, Turbo Laravel for PHP.
Hotwire was our first candidate. The JavaScript part of Hotwire (the most important part) is not coupled with Rails; thus, it’s possible to use Hotwire with a web application built with any framework, not only Rails. Nevertheless, we decided to go all in on Rails alternatives. Thus, we chose Cable Ready.
Cable Ready 5.0 comes with a #cable_ready_stream_from
helper which has a similar interface to Turbo Streams. That means we can subscribe to server updates by dropping an HTML element on the page, no Javascript required.
Then, we can send Cable Ready operations from the server to perform DOM modifications. Cable Ready comes with dozens of operations out of the box, so you can do pretty much anything. (And if something is missing, you can always define a custom operation).
Setting up Cable Ready on the client side didn’t require a lot of effort:
import CableReady from 'cable_ready';
import { createConsumer } from "@anycable/web";
const consumer = createConsumer();
CableReady.initialize({ consumer });
Note that we also added the AnyCable client library as a WebSocket client implementation.
Unfortunately, we couldn’t use the cable_ready
gem: it depends on Rails components, such as Action Pack and Active Support. (It could’ve work out, but we decided to stay Rails-free.)
To use the #cable_ready_stream_from
feature, we had to re-implement the stream signing functionality. Cable Ready uses the MessageVerifier class from Active Support for that. Our signer needs only Base64 and OpenSSL:
class StreamName
def signed(name)
data = ::Base64.strict_encode64(name.to_json)
digest = generate_digest(data)
"#{data}--#{generate_digest(data)}"
end
private
def generate_digest(data)
require "openssl" unless defined?(OpenSSL)
OpenSSL::HMAC.hexdigest(OpenSSL::Digest::SHA256.new, ::Hanami.app["settings"][:cable_ready_sign_key], data)
end
end
The implementation is fully compatible with the one that comes with the cable_ready
gem.
Backporting operations (enough for our demo) resulted in some copy-pasting. You can find the result here.
To make it easy to access the #cable_ready
object anywhere in the codebase, we created a Hanami provider:
Hanami.app.register_provider(:cable_ready) do
prepare do
require "cable_ready/hanami"
end
start do
broadcaster = Kaisen::CableReady::Hanami::Broadcaster.new
stream_name = Kaisen::CableReady::Hanami::StreamName.new
register "cable_ready", broadcaster
register "cable_ready_stream_name", stream_name
end
end
Providers are the way to configure application dependencies. They may look similar to Rails initializers, but they are more powerful and better designed (no global state, lifecycle events, etc.).
Now, you can add the #cable_ready
broadcaster to your class by injecting it as a dependency:
class MyClass
include Deps["cable_ready"]
def broadcast_something = cable_ready.action(...).broadcast_to("test")
end
Finally, we added the <cable-ready-stream-from>
HTML element support to Phlex:
class View < Phlex::HTML
register_element :cable_ready_stream_from
private
def stream_from(name)
cable_ready_stream_from(identifier: ::Hanami.app["cable_ready_stream_name"].signed(name))
end
end
With this configuration, we can drop #stream_from(name)
in any view to subscribe to the stream updates. The WebSocket client will automatically be initiated and connected to our Go application (to the /cable
endpoint serving regular Action Cable clients). At this point, we don’t even need an RPC server: AnyCable supports signed Cable Ready streams out of the box.
That said, we still need a broadcasting component to send updates from Hanami to AnyCable.
Integrating AnyCable via LiteCable
Our Go application uses AnyCable RPC to authenticate streams and send transcription results. Thus, we need to define a channel to handle these requests.
For that, we can use Lite Cable—a lightweight implementation of Action Cable. It comes with the same Connection and Channel abstractions and supports the minimal viable subset of the Action Cable API.
Here is the definition of the Connection class in our Hanami application:
class Connection < LiteCable::Connection::Base
def connect
sid = request.env["HTTP_X_TWILIO_ACCOUNT"]
return unless sid
twilio_account_sid = Hanami.app["settings"].twilio_account_sid
reject_unauthorized_connection unless sid == twilio_account_sid
end
end
That’s where we check that the media stream connection is made from our Twilio account. We read the value of the x-twilio-account
HTTP header (via request.env
), and then, only if it’s present, we compare it with the application’s Twilio account ID.
The append_or_replace
Cable Ready operation is a custom one we define here.
We also define a Twilio channel to handle events from the stream and broadcast updates via Cable Ready:
class Twilio < Channel
def subscribed
cable_ready.append(
selector: "#calls",
html: render_call(call_sid:)
).broadcast_to("calls")
cable_ready.append(
selector: "#events",
html: render_event(text: "Call started", event_type: "start")
).broadcast_to("call_#{call_sid}")
end
def unsubscribed
# ...
end
def handle_message(data)
data.fetch("result").values_at("id", "text", "event") => id, text, event_type
cable_ready.append_or_replace(
selector: "#events",
target: "#event_#{id}",
html: render_event(id:, text:, event_type:)
).broadcast_to("call_#{call_sid}")
end
private
def call_sid = params["sid"]
def render_event(**)
Views::Calls::Show::Event.new(**).call
end
def render_call(**)
Views::Calls::Show::Call.new(**).call
end
end
That’s pretty much it!
The final piece of the puzzle is a publish/subscribe service. This is a component that helps to distribute broadcasts across connected clients. With Action Cable, we usually use Redis for that. With AnyCable, we can avoid adding one more infrastructure dependency by using an embedded NATS server. We enable it in our Go application by setting the default environment variables:
# .env
ANYCABLE_EMBED_NATS=true
ANYCABLE_BROADCAST_ADAPTER=nats
In the Hanami application, we configured AnyCable to use NATS via the application settings:
# config/settings.rb
module Kaisen
class Settings < Hanami::Settings
# ...
setting :anycable_broadcast_adapter, default: "nats", constructor: Types::String
end
end
To make AnyCable understand Hanami settings, we added a custom loader for Anyway Config (which powers AnyCable configuration). You can find it here.
To verify that our setup works as expected, we can emulate a phone call using the ruby.yml
wsdirector scenario. You should see something like this in your browser:
Twilio calls monitor demo
Alright, that was a heck of a journey, but now it’s time to say goodbye!
In this post, we learned that AnyCable can truly hold up on the “any” part of its namesake if you start using it as a library. Whenever you need to connect your Ruby or Rails application to a WebSocket stream, or when you have to support proprietary client applications speaking foreign protocols—AnyCable is there to help you.
We also hope that our experiments with Hanami will encourage you to pay attention to this Ruby framework and its ecosystem.
P.S. Don’t forget to star the demo application repo on GitHub!