Navigate back to the homepage

Phoenix/Elixir - concurrency actor model with 'let it crash' philosophy

Bipin Paul Bedi
January 4th, 2019 · 5 min read

A concurrent program has multiple logical threads of control. These threads may or may not run in parallel. A parallel program potentially runs more quickly than a sequential program by executing different parts of the computation simultaneously (in parallel). It may or may not have more than one logical thread of control. An alternative way of thinking about this is that concurrency is an aspect of the problem domain—your program needs to handle multiple simultaneous (or near-simultaneous) events. Parallelism, by contrast, is an aspect of the solution domain—you want to make your program faster by processing different portions of the problem in parallel.

Functional programming avoids the problems associated with a shared mutable state by avoiding mutable state. Actor programming, by contrast, retains mutable state but avoids sharing it. An actor is like an object in an object-oriented (OO) program—it encapsulates state and communicates with other actors by exchanging messages. The difference is that actors run concurrently with each other and, unlike OO-style message passing (which is really just calling a method), actors really communicate by sending messages to each other.

Certainly, there are some concurrent programs that will always be non-deterministic. And this is unavoidable—some problems require solutions that are intrinsically dependent on the details of timing. But it’s not the case that all parallel programs are necessarily non-deterministic. The value of the sum of the numbers between 0 and 10,000 won’t change just because we add those numbers together in parallel instead of sequentially

Microsoft Orleansprocess virtual machine

In Erlang, and therefore Elixir, an actor is called a process. In most environments, a process is a heavyweight entity that consumes lots of resources and is expensive to create. An Elixir process, by contrast, is very lightweight—lighter weight even than most systems’ threads, both in terms of resource consumption and startup cost. Elixir programs typically create thousands of processes without problems and don’t normally need to resort to the equivalent of thread pools

Elixir actor by example

Elixir actors communicate via message passing using mailboxes, which are queues by data structure. For our example, we will create a md5 hash generator which based on a string return a md5 value of a string. If the value is already exiting it will not recompute to save CPU resource but send it from in-memory cache.

1defmodule HashIt do
2 def loop do
3 receive do
4 {:compute, value} -> IO.puts("#{value}")
5 end
6 loop
7 end
8end
1pid = spawn(&HashIt.loop/0)
2send(pid, {:compute, "bipin"})
3sleep(1000)

This function implements an infinite loop by calling itself recursively. The receive block waits for a message and then uses pattern matching to work out how to handle it. Elixir implements tail-call elimination. Tail-call elimination, as its name suggests, replaces a recursive call with a simple jump if the last thing the function does is call itself, thus infinite recursive call o loop function will not result in stack overflow.

1defmodule HashIt do
2 def loop do
3 receive do
4 {:compute, value} -> IO.puts(:crypto.hash(:md5, value) |> Base.encode16())
5 {:shutdown} -> -> exit(:normal)
6 end
7 loop
8 end
9end
1receive do
2 {:EXIT, ^pid, reason} -> IO.puts("HasIt has exited (#{reason})")
3end
1Process.flag(:trap_exit, true)
2pid = spawn_link(&HashIt.loop/0)
3send(pid, {:compute, "bipin"})
4send(pid, {:shutdown}

Adding state to the actor

We will add a variable to store all values sent and their computed hash

1defmodule HashIt do
2 def loop(strg) do
3 receive do
4 {:compute, value} ->
5 hashValue = :crypto.hash(:md5 , value) |> Base.encode16()
6 updatedStrg = strg.put(strg, value, hashValue)
7 IO.puts(hashValue)
8 loop(updatedStrg)
9 end
10 loop
11 end
12end

Here Strg is an elixir Map and we can start the process by using

pid = spawn(HashIt, :loop, [%{}])

or we can define methods to start, compute also provide it a name instead of using pid by registering it.

1defmodule HashIt do
2 def start(name, strg, cryptoType) do
3 pid = spawn(__MODULE__, :loop, [strg, cryptoType])
4 Process.register(pid, name)
5 pid
6 end
7 def compute(name, value) do
8 ref = make_ref()
9 send(name, {:compute, value, self(), ref})
10 receive do
11 {:ok, ^ref, reply} -> reply
12 end
13 end
14 def loop(strg, cryptoType) do
15 receive do
16 {:compute, value, sender, ref} ->
17 hashValue = :crypto.hash(cryptoType , value) |> Base.encode16()
18 updatedStrg = Map.put_new(strg, value, hashValue)
19 send(sender, {:ok, ref, hashValue})
20 loop(updatedStrg, cryptoType)
21 end
22 loop
23 end
24end

The program can be started via start method and uses pseudo-variable MODULE, which evaluates to the name of the current module. The Process.register registers the pid as name :hashit. Moreover, instead of printing the hash value it now returns it to the sender, which helps in bi-directional communication. The carot ^ symbol in {:ok, ^ref, reply} denotes we want to match the value rather than binding it. The pattern matching in elixir is used to match inside a data structure. Effectively we can now execute the HashIt module via

1:hashItMD5 |> HashIt.start([%{}, :md5])
2:hashItMD5 |> HashIt.compute("bipin")

Adding check and compute logic

Adding the return value to check in the cache before recomputing above module can be refactored as

1defmodule HashIt do
2 def start(name, strg, cryptoType) do
3 pid = spawn(__MODULE__, :loop, [strg, cryptoType])
4 Process.register(pid, name)
5 pid
6 end
7 def compute(name, value) do
8 ref = make_ref()
9 send(name, {:compute, value, self(), ref})
10 receive do
11 {:ok, ^ref, reply} -> reply
12 end
13 end
14 def loop(strg, cryptoType) do
15 receive do
16 {:compute, value, sender, ref} ->
17 result = Map.fetch(strg, value)
18 case result do
19 {:ok, val} -> send(sender, {:ok, ref, val})
20 loop(strg, cryptoType)
21 {:error, _reason} ->
22 hashValue = :crypto.hash(cryptoType , value) |> Base.encode16()
23 updatedStrg = Map.put_new(strg, value, hashValue)
24 send(sender, {:ok, ref, hashValue})
25 loop(updatedStrg, cryptoType)
26 end
27 end
28 loop
29 end
30end

Making it fault tolerant

Thus various processes can be started in parallel for different crypto compute example MD5, SHA128, SHA256. Using the above process mechanism we can create multiple processes for different or same task resulting in both concurrent and parallel deterministic outputs. But this architecture does not provide fault tolerance. What if there is an error and it is aborted abruptly? Elixir provides a mechanism to link it to a process which is bi-directional.

1:hashItMD5 |> HashIt.start([%{}, :md5])
2:hashItSHA256 |> HashIt.start([%{}, :sha256])
3:hashItMD5 |> Process.link(:hashItSHA256)
4:hashItMD5 |> exit(:forced_kill)

Since we are using spawn in our hash, We can also use spawn_link method to link process instead of process.link(). Please note links created are bi directional. and calling abnormal exit on :hashItMD5 will also set :hashItSHA256 to nil

Process.info(:hashItMD5, :status)

nil

Process.info(:hashItSHA256, :status)

nil

but normal exit will keep the linked process active, viz:

:hashItMD5 |> exit(:normal)

Process.info(:hashItMD5, :status)

nil

Process.info(:hashItSHA256, :status)

{:status, :waiting}

This implies we can set the system trap to capture other processes exit, when can be utilized to create supervisor and restart the system if the process crashes. We can set Process.flag(:trap_exit, true) to capture the exit of linked process and take appropriate action. In our example of HashIt, a supervisor can be created as:

1defmodule HashItSupervisor do
2 def start do
3 spawn(__MODULE__, :loop_system,[])
4 end
5 def loop_system do
6 Process.flag(:trap_exit, true)
7 loop
8 end
9 def loop do
10 pid = HashIt.start(%{}, :md5)
11 // instead of using spawn please change it to spawn_link in HashIt module
12 receive do
13 {:EXIT, ^pid, :normal} -> IO.puts("Hash It exited normally")
14 :ok
15 {:EXIT, ^pid, reason} ->
16 IO.puts("Hash It failed with reason #{inspect reason}...restarting")
17 loop`
18 end
19 end
20end
1HashItSupervisor.start

If the HashIt system now crashes it is captured by HashItSupervisor and is restarted. If the two processes are dependent on each other and can result in deadlock or infinite waiting because of crashing of sender the receiver can be guarded using timeout clause in receive do loop by using after clause. example:

1receive do
2 {:ok, ^ref, value} -> IO.puts(value)
3 after 1000 -> nil
4end

Scaling to multiple nodes/computers

The actor progra is .net implementation of Actor Model but we will focus on programming language which was built with a focus on concurrent execution. Elixir/Erlang: Erlang is a programming language used to build massively scalable soft real-time systems with requirements on high availability. Some of its uses are in telecom, banking, e-commerce, computer telephony and instant messaging. Erlang’s runtime system has built-in support for concurrency, distribution and fault tolerance. OTP is a set of Erlang libraries and design principles providing middle-ware to develop these systems. It includes its own distributed database, applications to interface towards other languages, debugging and release handling tools. Erlang runs on VM called BEAM which is essentially a mming naturally supports an approach to writing fault-tolerant code that leverages this observation: the error-kernel pattern. In the elixir system, the kernel is the root supervisor which can start other supervisors or workers. When we create an elixir virtual machine we create a node we can create nodes multiple nodes on the same system or on network of computer by naming them using —name or —sname option. To make multiple nodes part of the same cluster it must use same —cookie name argument. This results in running your system across multiple systems. To connect multiple nodes we can use connect function

iex([email protected])1> Node.self

:”[email protected]

iex([email protected])2> Node.list

[]

iex([email protected])3> Node.connect(:"[email protected]")

true

iex([email protected])4> Node.list

[:”[email protected]]

Now use Node.Spwan to start worker or supervisors and use :global.register_name() instead of Process.register() to make names cluster global.

Important notes

For this explanatory purpose, we used dynamic atom naming above. However, naming dynamic processes with atoms is a terrible idea! If we use atoms, we would need to convert the name (often received from an external client) to atoms, and we should never convert user input to atoms. This is because atoms are not garbage collected. Once an atom is created, it is never reclaimed. Generating atoms from user input would mean the user can inject enough different names to exhaust our system memory!

In practice, it is more likely you will reach the Erlang VM limit for the maximum number of atoms before you run out of memory, which will bring your system down regardless. Moreover, supervisor model used above can result in inconsistent naming convention across various modules and libraries. Thus elixir provides a standard protocol for defining, starting and maintaining workers using efficient bucketed methodology using GenServer, providing standard call, cast and info method implementation for various operation.

#elixir #phoenix #actor-model #design-patterns #functional-programming #technology

More articles from Bipin

Asymptotic computational complexity simplified

discrete representation of algorithm computational complexity

October 20th, 2018 · 2 min read

L1 & L2 model regularizations techniques

feature engineering for machine learning models

October 12th, 2018 · 2 min read
© 2018–2050 Bipin
Link to $https://twitter.com/bipinpaulbediLink to $https://github.com/bipinpaulbediLink to $https://instagram.com/bipinpaulbediLink to $https://www.linkedin.com/in/bipinpaulbedi