Parallel and Distributed Computing with Julia

Przemysław Szufel

Before running Jupyter notebook set in Julia number of threads. This should be done before actually running the notebook() command. The number of threads can be also set up in Julia options in Visual Studio code (if this is used to run this notebook).

# run this code from Julia console just before starting Jupyter Notebook
ENV["JULIA_NUM_THREADS"]=4
println("Number of threads that your Julia is run: ## $(Threads.nthreads())")
Number of threads that your Julia is run: ## 4
using BenchmarkTools, Distributed

Parallelize via Single Instruction Multiple Data (SIMD)

function dot1(x, y)
    s = 0.0
    for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end
dot1 (generic function with 1 method)
function dot2(x, y)
    s = 0.0
    @simd for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end
dot2 (generic function with 1 method)
x = 100*rand(10000)
y = 100*rand(10000);

#res1 = @btime dot1($x, $y)
#res2 = @btime dot2($x, $y)

#println(res1)
#println(res2)
res1 =  dot1(x, y)
2.491428520518866e7
res2 =  dot2(x, y)
2.491428520518861e7
res1 == res2
false
@show res1 
@show res2
res1 = 2.491428520518866e7
res2 = 2.491428520518861e7
2.491428520518861e7

Green threading

@time sleep(2)
  2.009545 seconds (63 allocations: 1.625 KiB)
@time t = @async sleep(2)
  0.052863 seconds (10.22 k allocations: 646.190 KiB, 56.06% compilation time)
Task (runnable) @0x000001fdbbb45f10
t
Task (runnable) @0x000001fdbbb45f10
function dojob(i)
    val = round(rand(), digits=2)
    sleep(val)   # this could be external computations with I/O
    i, val
end
dojob (generic function with 1 method)
result = Vector{Tuple{Int,Float64}}(undef, 8)
8-element Vector{Tuple{Int64, Float64}}:
 (2189270652080, 1.081643418665e-311)
 (2189270652176, 1.0816434187123e-311)
 (2189270652272, 1.08164341876e-311)
 (2189270652368, 1.081643418807e-311)
 (2189270652464, 1.0816434188546e-311)
 (2189270652560, 1.081643418902e-311)
 (2189270652752, 1.081643418997e-311)
 (2189270652848, 1.0816434190443e-311)
@time for i=1:8
    result[i] = dojob(i)
end
result
  4.136902 seconds (1.19 k allocations: 73.610 KiB, 0.52% compilation time)
8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.65)
 (2, 0.31)
 (3, 0.75)
 (4, 0.3)
 (5, 0.67)
 (6, 0.64)
 (7, 0.25)
 (8, 0.46)
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time for i=1:8
   @async result[i] = dojob(i)
end
result
  0.000174 seconds (85 allocations: 7.208 KiB)
8-element Vector{Tuple{Int64, Float64}}:
 (2, 1.5e-323)
 (5, 8.0e-323)
 (18, 9.4e-323)
 (21, 1.2e-322)
 (25, 1.5e-322)
 (31, 1.6e-322)
 (33, 1.7e-322)
 (35, 1.9e-322)
result
8-element Vector{Tuple{Int64, Float64}}:
 (2, 1.5e-323)
 (5, 8.0e-323)
 (18, 9.4e-323)
 (21, 1.2e-322)
 (25, 1.5e-322)
 (31, 1.6e-322)
 (33, 1.7e-322)
 (35, 1.9e-322)
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time @sync for i=1:8
   @async result[i] = dojob(i)
end
result
  0.793057 seconds (2.71 k allocations: 170.346 KiB, 9.69% compilation time)
8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.1)
 (2, 0.91)
 (3, 0.71)
 (4, 0.72)
 (5, 0.63)
 (6, 0.05)
 (7, 0.01)
 (8, 0.76)

Programming a simple web server

You should be able to connect using the address http://localhost:9992/3+4

To stop web server click http://localhost:9992/stopme

using Sockets
println("Starting the web server...")
server = Sockets.listen(9992)
Starting the web server...
Sockets.TCPServer(Base.Libc.WindowsRawSocket(0x00000000000003e4) active)
@async begin
    contt = Ref(true)
    while contt[]
        sock = Sockets.accept(server)
        @async begin
            data = readline(sock)
            print("Got request:\n", data, "\n")
            cmd = split(data, " ")[2][2:end]
            println(sock, "\nHTTP/1.1 200 OK\nContent-Type: text/html\n")
            contt[] = contt[] && (!occursin("stopme", data))
            if contt[]
                 println(sock, string("<html><body>", cmd, "=", 
                     eval(Meta.parse(cmd)), "</body></html>"))
            else
                println(sock,"<html><body>stopping</body></html>")
            end
            close(sock)
        end
    end
    println("Handling requests stopped")
end
Task (runnable) @0x000001fdbb1eee90

Multithreading

Threads.nthreads()
4
function ssum(x)
    r, c = size(x)
    y = zeros(c)
    for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end
ssum (generic function with 1 method)
function tsum(x)
    r, c = size(x)
    y = zeros(c)
    Threads.@threads for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end
tsum (generic function with 1 method)
x = rand(1000,10000);
@time ssum(x)
@time ssum(x);
  0.048094 seconds (13.70 k allocations: 1.009 MiB, 62.07% compilation time)
  0.017480 seconds (2 allocations: 78.172 KiB)
@time tsum(x)
@time tsum(x);
  0.253809 seconds (41.53 k allocations: 2.933 MiB, 230.17% compilation time)
  0.011025 seconds (35 allocations: 81.469 KiB)

Locking mechanism for threads

function f_bad()
    x = 0
    Threads.@threads for i in 1:10^6
        x += 1
    end
    return x
end
f_bad (generic function with 1 method)
@time f_bad()
  0.168872 seconds (1.01 M allocations: 15.892 MiB, 90.33% compilation time)
251079
function f_add()
    x = 0 
    for i in 1:10^7
        x += 1
    end
    x
end
@btime f_add()
    
  3.400 ns (0 allocations: 0 bytes)
10000000
function f_atomic()
    x = Threads.Atomic{Int}(0)
    Threads.@threads for i in 1:10^6
        Threads.atomic_add!(x, 1)
    end
    return x[]
end
f_atomic()
1000000


function f_spin()
    l = Threads.SpinLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end

function f_reentrant()
    l = ReentrantLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end
f_reentrant (generic function with 1 method)
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_spin, f_reentrant]
    for i = 1:2
        value, elapsedtime  = @timed f()
        push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
    end
end
println(stats)
8×4 DataFrame
 Row │ f            i      value    timems   
     │ String       Int64  Int64    Float64  
─────┼───────────────────────────────────────
   1 │ f_bad            1   507942   74.5333
   2 │ f_bad            2   495913   50.5647
   3 │ f_atomic         1  1000000   31.4744
   4 │ f_atomic         2  1000000   33.8758
   5 │ f_spin           1  1000000  693.007
   6 │ f_spin           2  1000000  586.934
   7 │ f_reentrant      1  1000000  847.592
   8 │ f_reentrant      2  1000000  521.148

Multi-processing and distributed computing

using Distributed

This code adds 4 workers (and avoids adding more)

addprocs(max(0, 5-nprocs()));
workers()
4-element Vector{Int64}:
 2
 3
 4
 5
function s_rand()
    n = 10^4
    x = 0.0
    for i in 1:n
        x += sum(rand(10^4))
    end
    x / n
end
 
@time s_rand()
@time s_rand()
  1.206733 seconds (20.00 k allocations: 763.397 MiB, 18.47% gc time)
  1.018381 seconds (20.00 k allocations: 763.397 MiB, 17.30% gc time)
5000.101742284563
using Distributed
 
 
function p_rand()
    n = 10^4
    x = @distributed (+) for i in 1:n
        #line
        # but the last line will be aggregated
        sum(rand(10^4))
    end
    x / n
end

@time p_rand()
@time p_rand()
  7.204575 seconds (508.98 k allocations: 33.710 MiB, 24.34% compilation time)
  0.784540 seconds (564 allocations: 34.305 KiB)
4999.629326079839
workers()'
1×4 adjoint(::Vector{Int64}) with eltype Int64:
 2  3  4  5
fetch(@spawnat 3 4+3)
7
@everywhere function f() 
    println("I am on worker ", myid())
    rand()
end
f()
I am on worker 1
0.37079910289627693
fetch(@spawnat 4 f())
      From worker 4:    I am on worker 4
0.22025454198281003
vec(collect(Iterators.product(1:4, 1:5)))
        
20-element Vector{Tuple{Int64, Int64}}:
 (1, 1)
 (2, 1)
 (3, 1)
 (4, 1)
 (1, 2)
 (2, 2)
 (3, 2)
 (4, 2)
 (1, 3)
 (2, 3)
 (3, 3)
 (4, 3)
 (1, 4)
 (2, 4)
 (3, 4)
 (4, 4)
 (1, 5)
 (2, 5)
 (3, 5)
 (4, 5)
using Distributed
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using Distributed, Random, DataFrames

@everywhere function calc(x, y)
    2x + y
end

@everywhere function init_worker()    
   Random.seed!(myid())
    # readding CSV file
end

@sync for wid in workers()
    @async fetch(@spawnat wid init_worker())
end
      From worker 5:      Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
      From worker 3:      Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
      From worker 4:      Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
      From worker 2:      Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
  Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`

Typically results are collected to a DataFrame

data = @distributed (append!) for (i, j) = vec(collect(Iterators.product(1:4, 1:5)))
    a = rand(1:499)
    b = rand(1:9)*1000
    c = calc(a, b)
    DataFrame(;i,j,a,b,c,procid = myid())
end
20×6 DataFrame
Row i j a b c procid
Int64 Int64 Int64 Int64 Int64 Int64
1 1 1 143 9000 9286 2
2 2 1 291 4000 4582 2
3 3 1 320 8000 8640 2
4 4 1 377 3000 3754 2
5 1 2 30 7000 7060 2
6 2 2 351 3000 3702 3
7 3 2 118 8000 8236 3
8 4 2 415 9000 9830 3
9 1 3 397 1000 1794 3
10 2 3 260 9000 9520 3
11 3 3 132 8000 8264 4
12 4 3 420 1000 1840 4
13 1 4 304 4000 4608 4
14 2 4 112 3000 3224 4
15 3 4 349 6000 6698 4
16 4 4 466 8000 8932 5
17 1 5 413 6000 6826 5
18 2 5 426 5000 5852 5
19 3 5 326 1000 1652 5
20 4 5 481 8000 8962 5

Advanced Interprocess communication - cellular automaton example

using Distributed
@everywhere using ParallelDataTransfer, Distributed


@everywhere function rule30()
    lastv = Main.caa[1]
    for i in 2:(length(Main.caa)-1)
        current = Main.caa[i]
        Main.caa[i] = xor(lastv, Main.caa[i] || Main.caa[i+1])
        lastv = current
    end
end


@everywhere function getcaa()
    Main.caa
end
@everywhere function getsetborder()
    #println(myid(),"gs");flush(stdout)
    Main.caa[1] = (@fetchfrom Main.neighbours[1] getcaa()[15+1])
    #println(myid(),"gs1");flush(stdout)
    Main.caa[end] = (@fetchfrom Main.neighbours[2] getcaa()[2])
    #println(myid(),"gse");flush(stdout)
end

function printsimdist(workers::Array{Int})
    for w in workers
        dat = @fetchfrom w caa
        for b in dat[2:end-1]
            print(b ? "#" : " ")
        end
    end
    println()
    flush(stdout)
end

function runca(steps::Int, visualize::Bool)
    @sync for w in workers()
        @async @fetchfrom w fill!(caa, false)
    end
    @fetchfrom wks[Int(nwks/2)+1] caa[2]=true
    visualize && printsimdist(workers())
    for i in 1:steps
        @sync for w in workers()
            @async @fetchfrom w getsetborder()
        end
        @sync for w in workers()
            @async @fetchfrom w rule30()
        end
        visualize && printsimdist(workers())
    end
end
runca (generic function with 1 method)
wks = workers()
nwks = length(wks)
for i in 1:nwks
    sendto(wks[i], neighbours = (i==1 ? wks[nwks] : wks[i-1],
                                i==nwks ? wks[1] : wks[i+1]))
    fetch(@defineat wks[i] const caa = zeros(Bool, 15+2));
end

runca(20,true)
                              #                             
                             ###                            
                            ##  #                           
                           ## ####                          
                          ##  #   #                         
                         ## #### ###                        
                        ##  #    #  #                       
                       ## ####  ######                      
                      ##  #   ###     #                     
                     ## #### ##  #   ###                    
                    ##  #    # #### ##  #                   
                   ## ####  ## #    # ####                  
                  ##  #   ###  ##  ## #   #                 
                 ## #### ##  ### ###  ## ###                
                ##  #    # ###   #  ###  #  #               
               ## ####  ## #  # #####  #######              
              ##  #   ###  #### #    ###      #             
             ## #### ##  ###    ##  ##  #    ###            
            ##  #    # ###  #  ## ### ####  ##  #           
           ## ####  ## #  ######  #   #   ### ####          
          ##  #   ###  ####     #### ### ##   #   #