println("Number of threads that your Julia is run: ## $(Threads.nthreads())")
Number of threads that your Julia is run: ## 4
Przemysław Szufel
Before running Jupyter notebook set in Julia number of threads. This should be done before actually running the notebook()
command. The number of threads can be also set up in Julia options in Visual Studio code (if this is used to run this notebook).
# run this code from Julia console just before starting Jupyter Notebook
ENV["JULIA_NUM_THREADS"]=4
Number of threads that your Julia is run: ## 4
dot1 (generic function with 1 method)
dot2 (generic function with 1 method)
0.052863 seconds (10.22 k allocations: 646.190 KiB, 56.06% compilation time)
Task (runnable) @0x000001fdbbb45f10
function dojob(i)
val = round(rand(), digits=2)
sleep(val) # this could be external computations with I/O
i, val
end
dojob (generic function with 1 method)
8-element Vector{Tuple{Int64, Float64}}:
(2189270652080, 1.081643418665e-311)
(2189270652176, 1.0816434187123e-311)
(2189270652272, 1.08164341876e-311)
(2189270652368, 1.081643418807e-311)
(2189270652464, 1.0816434188546e-311)
(2189270652560, 1.081643418902e-311)
(2189270652752, 1.081643418997e-311)
(2189270652848, 1.0816434190443e-311)
4.136902 seconds (1.19 k allocations: 73.610 KiB, 0.52% compilation time)
8-element Vector{Tuple{Int64, Float64}}:
(1, 0.65)
(2, 0.31)
(3, 0.75)
(4, 0.3)
(5, 0.67)
(6, 0.64)
(7, 0.25)
(8, 0.46)
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time for i=1:8
@async result[i] = dojob(i)
end
result
0.000174 seconds (85 allocations: 7.208 KiB)
8-element Vector{Tuple{Int64, Float64}}:
(2, 1.5e-323)
(5, 8.0e-323)
(18, 9.4e-323)
(21, 1.2e-322)
(25, 1.5e-322)
(31, 1.6e-322)
(33, 1.7e-322)
(35, 1.9e-322)
8-element Vector{Tuple{Int64, Float64}}:
(2, 1.5e-323)
(5, 8.0e-323)
(18, 9.4e-323)
(21, 1.2e-322)
(25, 1.5e-322)
(31, 1.6e-322)
(33, 1.7e-322)
(35, 1.9e-322)
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time @sync for i=1:8
@async result[i] = dojob(i)
end
result
0.793057 seconds (2.71 k allocations: 170.346 KiB, 9.69% compilation time)
8-element Vector{Tuple{Int64, Float64}}:
(1, 0.1)
(2, 0.91)
(3, 0.71)
(4, 0.72)
(5, 0.63)
(6, 0.05)
(7, 0.01)
(8, 0.76)
You should be able to connect using the address http://localhost:9992/3+4
To stop web server click http://localhost:9992/stopme
Starting the web server...
Sockets.TCPServer(Base.Libc.WindowsRawSocket(0x00000000000003e4) active)
@async begin
contt = Ref(true)
while contt[]
sock = Sockets.accept(server)
@async begin
data = readline(sock)
print("Got request:\n", data, "\n")
cmd = split(data, " ")[2][2:end]
println(sock, "\nHTTP/1.1 200 OK\nContent-Type: text/html\n")
contt[] = contt[] && (!occursin("stopme", data))
if contt[]
println(sock, string("<html><body>", cmd, "=",
eval(Meta.parse(cmd)), "</body></html>"))
else
println(sock,"<html><body>stopping</body></html>")
end
close(sock)
end
end
println("Handling requests stopped")
end
Task (runnable) @0x000001fdbb1eee90
function ssum(x)
r, c = size(x)
y = zeros(c)
for i in 1:c
for j in 1:r
@inbounds y[i] += x[j, i]
end
end
y
end
ssum (generic function with 1 method)
function tsum(x)
r, c = size(x)
y = zeros(c)
Threads.@threads for i in 1:c
for j in 1:r
@inbounds y[i] += x[j, i]
end
end
y
end
tsum (generic function with 1 method)
0.048094 seconds (13.70 k allocations: 1.009 MiB, 62.07% compilation time)
0.017480 seconds (2 allocations: 78.172 KiB)
0.253809 seconds (41.53 k allocations: 2.933 MiB, 230.17% compilation time)
0.011025 seconds (35 allocations: 81.469 KiB)
f_bad (generic function with 1 method)
3.400 ns (0 allocations: 0 bytes)
10000000
function f_atomic()
x = Threads.Atomic{Int}(0)
Threads.@threads for i in 1:10^6
Threads.atomic_add!(x, 1)
end
return x[]
end
f_atomic()
1000000
function f_spin()
l = Threads.SpinLock()
x = 0
Threads.@threads for i in 1:10^6
Threads.lock(l) do
x += 1
end
end
return x
end
function f_reentrant()
l = ReentrantLock()
x = 0
Threads.@threads for i in 1:10^6
Threads.lock(l) do
x += 1
end
end
return x
end
f_reentrant (generic function with 1 method)
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_spin, f_reentrant]
for i = 1:2
value, elapsedtime = @timed f()
push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
end
end
println(stats)
8×4 DataFrame
Row │ f i value timems
│ String Int64 Int64 Float64
─────┼───────────────────────────────────────
1 │ f_bad 1 507942 74.5333
2 │ f_bad 2 495913 50.5647
3 │ f_atomic 1 1000000 31.4744
4 │ f_atomic 2 1000000 33.8758
5 │ f_spin 1 1000000 693.007
6 │ f_spin 2 1000000 586.934
7 │ f_reentrant 1 1000000 847.592
8 │ f_reentrant 2 1000000 521.148
This code adds 4 workers (and avoids adding more)
function s_rand()
n = 10^4
x = 0.0
for i in 1:n
x += sum(rand(10^4))
end
x / n
end
@time s_rand()
@time s_rand()
1.206733 seconds (20.00 k allocations: 763.397 MiB, 18.47% gc time)
1.018381 seconds (20.00 k allocations: 763.397 MiB, 17.30% gc time)
5000.101742284563
using Distributed
function p_rand()
n = 10^4
x = @distributed (+) for i in 1:n
#line
# but the last line will be aggregated
sum(rand(10^4))
end
x / n
end
@time p_rand()
@time p_rand()
7.204575 seconds (508.98 k allocations: 33.710 MiB, 24.34% compilation time)
0.784540 seconds (564 allocations: 34.305 KiB)
4999.629326079839
I am on worker 1
0.37079910289627693
20-element Vector{Tuple{Int64, Int64}}:
(1, 1)
(2, 1)
(3, 1)
(4, 1)
(1, 2)
(2, 2)
(3, 2)
(4, 2)
(1, 3)
(2, 3)
(3, 3)
(4, 3)
(1, 4)
(2, 4)
(3, 4)
(4, 4)
(1, 5)
(2, 5)
(3, 5)
(4, 5)
using Distributed
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using Distributed, Random, DataFrames
@everywhere function calc(x, y)
2x + y
end
@everywhere function init_worker()
Random.seed!(myid())
# readding CSV file
end
@sync for wid in workers()
@async fetch(@spawnat wid init_worker())
end
From worker 5: Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
From worker 3: Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
From worker 4: Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
From worker 2: Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
Activating project at `C:\AAABIBLIOTEKA\MIT_Boston\Stuttgart`
Typically results are collected to a DataFrame
data = @distributed (append!) for (i, j) = vec(collect(Iterators.product(1:4, 1:5)))
a = rand(1:499)
b = rand(1:9)*1000
c = calc(a, b)
DataFrame(;i,j,a,b,c,procid = myid())
end
Row | i | j | a | b | c | procid |
---|---|---|---|---|---|---|
Int64 | Int64 | Int64 | Int64 | Int64 | Int64 | |
1 | 1 | 1 | 143 | 9000 | 9286 | 2 |
2 | 2 | 1 | 291 | 4000 | 4582 | 2 |
3 | 3 | 1 | 320 | 8000 | 8640 | 2 |
4 | 4 | 1 | 377 | 3000 | 3754 | 2 |
5 | 1 | 2 | 30 | 7000 | 7060 | 2 |
6 | 2 | 2 | 351 | 3000 | 3702 | 3 |
7 | 3 | 2 | 118 | 8000 | 8236 | 3 |
8 | 4 | 2 | 415 | 9000 | 9830 | 3 |
9 | 1 | 3 | 397 | 1000 | 1794 | 3 |
10 | 2 | 3 | 260 | 9000 | 9520 | 3 |
11 | 3 | 3 | 132 | 8000 | 8264 | 4 |
12 | 4 | 3 | 420 | 1000 | 1840 | 4 |
13 | 1 | 4 | 304 | 4000 | 4608 | 4 |
14 | 2 | 4 | 112 | 3000 | 3224 | 4 |
15 | 3 | 4 | 349 | 6000 | 6698 | 4 |
16 | 4 | 4 | 466 | 8000 | 8932 | 5 |
17 | 1 | 5 | 413 | 6000 | 6826 | 5 |
18 | 2 | 5 | 426 | 5000 | 5852 | 5 |
19 | 3 | 5 | 326 | 1000 | 1652 | 5 |
20 | 4 | 5 | 481 | 8000 | 8962 | 5 |
using Distributed
@everywhere using ParallelDataTransfer, Distributed
@everywhere function rule30()
lastv = Main.caa[1]
for i in 2:(length(Main.caa)-1)
current = Main.caa[i]
Main.caa[i] = xor(lastv, Main.caa[i] || Main.caa[i+1])
lastv = current
end
end
@everywhere function getcaa()
Main.caa
end
@everywhere function getsetborder()
#println(myid(),"gs");flush(stdout)
Main.caa[1] = (@fetchfrom Main.neighbours[1] getcaa()[15+1])
#println(myid(),"gs1");flush(stdout)
Main.caa[end] = (@fetchfrom Main.neighbours[2] getcaa()[2])
#println(myid(),"gse");flush(stdout)
end
function printsimdist(workers::Array{Int})
for w in workers
dat = @fetchfrom w caa
for b in dat[2:end-1]
print(b ? "#" : " ")
end
end
println()
flush(stdout)
end
function runca(steps::Int, visualize::Bool)
@sync for w in workers()
@async @fetchfrom w fill!(caa, false)
end
@fetchfrom wks[Int(nwks/2)+1] caa[2]=true
visualize && printsimdist(workers())
for i in 1:steps
@sync for w in workers()
@async @fetchfrom w getsetborder()
end
@sync for w in workers()
@async @fetchfrom w rule30()
end
visualize && printsimdist(workers())
end
end
runca (generic function with 1 method)
wks = workers()
nwks = length(wks)
for i in 1:nwks
sendto(wks[i], neighbours = (i==1 ? wks[nwks] : wks[i-1],
i==nwks ? wks[1] : wks[i+1]))
fetch(@defineat wks[i] const caa = zeros(Bool, 15+2));
end
runca(20,true)
#
###
## #
## ####
## # #
## #### ###
## # # #
## #### ######
## # ### #
## #### ## # ###
## # # #### ## #
## #### ## # # ####
## # ### ## ## # #
## #### ## ### ### ## ###
## # # ### # ### # #
## #### ## # # ##### #######
## # ### #### # ### #
## #### ## ### ## ## # ###
## # # ### # ## ### #### ## #
## #### ## # ###### # # ### ####
## # ### #### #### ### ## # #