Discussion:
[Rd] [parallel] fixes load balancing of parLapplyLB
(too old to reply)
Christian Krause
2018-02-12 19:08:15 UTC
Permalink
Dear R-Devel List,

**TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].

[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792


## The Call Chain

First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`:

1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB**
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4. **dynamicClusterApply:** clusterApply.R:39


## splitList

We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5
workers. First, lets take a look at **splitList**:

```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```

As we can see in the examples, the work is distributed as equally as possible.


## dynamicClusterApply

**dynamicClusterApply** works this way (simplified):

1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next chunk

**This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing.


## parLapplyLB

This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)

chunks <- splitList(X, length(cl))

do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```

For our examples, the chunks have these sizes:

```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```

There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are no more chunks.

Instead, **parLapplyLB** should look like this (patch is attached):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)

chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))

chunks <- splitList(X, chunkSize)

do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```

Examples with a cluster of 5 workers:

```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5

# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```

With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing
should work.

Best Regards
--
Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------

Phone: +49 341 97 33144

Email: ***@idiv.de

------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der UniversitÀt Leipzig im Sinne des § 92 Abs. 1 SÀchsHSFG und wird zusammen mit der
Martin-Luther-UniversitÀt Halle-Wittenberg und der Friedrich-Schiller-UniversitÀt Jena betrieben sowie in Kooperation
mit dem Helmholtz-Zentrum fÃŒr Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden
außeruniversitÀren Forschungseinrichtungen: das Helmholtz-Zentrum fÃŒr Umweltforschung GmbH - UFZ, das
Max-Planck-Institut fÃŒr Biogeochemie (MPI BGC), das Max-Planck-Institut fÃŒr chemische Ökologie (MPI CE), das
Max-Planck-Institut fÌr evolutionÀre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen
und Zellkulturen (DSMZ), das Leibniz-Institut fÃŒr Pflanzenbiochemie (IPB), das Leibniz-Institut fÃŒr Pflanzengenetik und
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum fÌr Naturkunde Görlitz (SMNG). USt-IdNr. DE
141510383
Christian Krause
2018-02-19 18:21:24 UTC
Permalink
Dear R-Devel List,

I have installed R 3.4.3 with the patch applied on our cluster and ran a *real-world* job of one of our users to confirm that the patch works to my satisfaction. Here are the results.

The original was a series of jobs, all essentially doing the same stuff using bootstrapped data, so for the original there is more data and I show the arithmetic mean with standard deviation. The confirmation with the patched R was only a single instance of that series of jobs.

## Job Efficiency

The job efficiency is defined as (this is what the `qacct-efficiency` tool below does):

```
efficiency = cputime / cores / wallclocktime * 100%
```

In simpler words: how well did the job utilize its CPU cores. It shows the percentage of time the job was actually doing stuff, as opposed to the difference:

```
wasted = 100% - efficiency
```

... which, essentially, tells us how much of the resources were wasted, i.e. CPU cores just idling, without being used by anyone. We care a lot about that because, for our scientific computing cluster, wasted resources is like burning money.

### original

This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and then shows the average and standard deviation of the efficiency:

```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```

This is the entire series from our job accounting database, filteres the successful jobs, calculates efficiency and does sort of a histogram-like binning before calculation of mean and standard deviation (to get a more detailed impression of the distribution when standard deviation of the previous command is comparatively high):

```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | sort -gk1 | column -t
10 - 20 -> n=3 ∅ 19.21666666666667 ± 0.9112811494447459
20 - 30 -> n=6 ∅ 26.418333333333333 ± 2.665996374091058
30 - 40 -> n=12 ∅ 35.11583333333334 ± 2.8575783082671196
40 - 50 -> n=14 ∅ 45.35285714285715 ± 2.98623361591005
50 - 60 -> n=344 ∅ 57.114593023255814 ± 2.1922005551774415
60 - 70 -> n=453 ∅ 64.29536423841049 ± 2.8334788433963856
70 - 80 -> n=108 ∅ 72.95592592592598 ± 2.5219474143639276
80 - 90 -> n=5 ∅ 81.526 ± 1.2802265424525452
```

I have attached an example graph from our monitoring system of a single instance in my previous mail. There you can see that the load balancing does not actually work, i.e. same as `parLapply`. This reflects in the job efficiency.

### patch applied

This is the single instance I used to confirm that the patch works:

```
$ qacct -j 4562202 | qacct-efficiency
97.36
```

The graph from our monitoring system is attached. As you can see, the load balancing works to a satisfying degree and the efficiency is well above 90% which was what I had hoped for :-)

## Additional Notes

The list used in this jobs `parLapplyLB` is 5812 elements long. With the `splitList`-chunking from the patch, you'll get 208 lists of about 28 elements (208 chunks of size 28). The job ran on 28 CPU cores and had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we apply to our list takes about 580 seconds per list element, i.e. about 10 minutes. I suppose, for that runtime, we would get even better load balancing if we would reduce the chunk size even further, maybe even down to 1, thus getting our efficiency even closer to 100%.

Of course, for really short-running functions, a higher chunk size may be more efficient because of the overhead. In our case, the overhead is negligible and that is why the low chunk size works really well. In contrast, for smallish lists with short-running functions, you might not even need load balancing and `parLapply` suffices. It only becomes an issue, when the runtime of the function is high and / or varying.

In our case, the entire runtime of the entire series of jobs was:

```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, "seconds" }'
4.72439e+09 seconds
```

Thats about 150 years on a single core or 7.5 years on a 20 core server! Our user was constantly using about 500 cores, so this took about 110 days. If you compare this to my 97% efficiency example, the jobs could have been finished in 75 days instead ;-)

## Upcoming Patch

If this patch gets applied to the R code base (and I hope it will :-)) my colleague and I will submit another patch that adds the chunk size as an optional parameter to all off the load balancing functions. With that parameter, users of these functions *can* decide for themselves which chunk size they prefer for their code. As mentioned before, the most efficient chunk size depends on the used functions runtime, which is the only thing R does not know and users really should be allowed to specify explicitly. The default of this new optional parameter would be the one we used here and this would make that upcoming patch fully source-compatible.

Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].
[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB**
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as possible.
## dynamicClusterApply
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next chunk
**This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing.
## parLapplyLB
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are no more chunks.
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Email: ***@idiv.de

Office: BioCity Leipzig 5e, Room 3.201.3

Phone: +49 341 97 33144

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der UniversitÀt Leipzig im Sinne des § 92 Abs. 1 SÀchsHSFG und wird zusammen mit der Martin-Luther-UniversitÀt Halle-Wittenberg und der Friedrich-Schiller-UniversitÀt Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum fÃŒr Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden außeruniversitÀren Forschungseinrichtungen: das Helmholtz-Zentrum fÃŒr Umweltforschung GmbH - UFZ, das Max-Planck-Institut fÃŒr Biogeochemie (MPI BGC), das Max-Planck-Institut fÃŒr chemische Ökologie (MPI CE), das Max-Planck-Institut fÃŒr evolutionÀre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut fÃŒr Pflanzenbiochemie (IPB), das Leibniz-Institut fÃŒr Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum fÃŒr Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383
Henrik Bengtsson
2018-02-19 21:11:04 UTC
Permalink
Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.

If I put labels on your example numbers in one of your previous post:

nbrOfElements <- 97
nbrOfWorkers <- 5

With these, there are two extremes in how you can split up the
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20

I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements / nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5

However, there are multiple alternatives between the two extremes, e.g.
nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
So, is there a reason why you argue for scale = 1.0 to be the optimal?

FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A). Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C). (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)

/Henrik

On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
Dear R-Devel List,
I have installed R 3.4.3 with the patch applied on our cluster and ran a *real-world* job of one of our users to confirm that the patch works to my satisfaction. Here are the results.
The original was a series of jobs, all essentially doing the same stuff using bootstrapped data, so for the original there is more data and I show the arithmetic mean with standard deviation. The confirmation with the patched R was only a single instance of that series of jobs.
## Job Efficiency
```
efficiency = cputime / cores / wallclocktime * 100%
```
```
wasted = 100% - efficiency
```
... which, essentially, tells us how much of the resources were wasted, i.e. CPU cores just idling, without being used by anyone. We care a lot about that because, for our scientific computing cluster, wasted resources is like burning money.
### original
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w 10 | sort -gk1 | column -t
10 - 20 -> n=3 ∅ 19.21666666666667 ± 0.9112811494447459
20 - 30 -> n=6 ∅ 26.418333333333333 ± 2.665996374091058
30 - 40 -> n=12 ∅ 35.11583333333334 ± 2.8575783082671196
40 - 50 -> n=14 ∅ 45.35285714285715 ± 2.98623361591005
50 - 60 -> n=344 ∅ 57.114593023255814 ± 2.1922005551774415
60 - 70 -> n=453 ∅ 64.29536423841049 ± 2.8334788433963856
70 - 80 -> n=108 ∅ 72.95592592592598 ± 2.5219474143639276
80 - 90 -> n=5 ∅ 81.526 ± 1.2802265424525452
```
I have attached an example graph from our monitoring system of a single instance in my previous mail. There you can see that the load balancing does not actually work, i.e. same as `parLapply`. This reflects in the job efficiency.
### patch applied
```
$ qacct -j 4562202 | qacct-efficiency
97.36
```
The graph from our monitoring system is attached. As you can see, the load balancing works to a satisfying degree and the efficiency is well above 90% which was what I had hoped for :-)
## Additional Notes
The list used in this jobs `parLapplyLB` is 5812 elements long. With the `splitList`-chunking from the patch, you'll get 208 lists of about 28 elements (208 chunks of size 28). The job ran on 28 CPU cores and had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the function we apply to our list takes about 580 seconds per list element, i.e. about 10 minutes. I suppose, for that runtime, we would get even better load balancing if we would reduce the chunk size even further, maybe even down to 1, thus getting our efficiency even closer to 100%.
Of course, for really short-running functions, a higher chunk size may be more efficient because of the overhead. In our case, the overhead is negligible and that is why the low chunk size works really well. In contrast, for smallish lists with short-running functions, you might not even need load balancing and `parLapply` suffices. It only becomes an issue, when the runtime of the function is high and / or varying.
```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print sum, "seconds" }'
4.72439e+09 seconds
```
Thats about 150 years on a single core or 7.5 years on a 20 core server! Our user was constantly using about 500 cores, so this took about 110 days. If you compare this to my 97% efficiency example, the jobs could have been finished in 75 days instead ;-)
## Upcoming Patch
If this patch gets applied to the R code base (and I hope it will :-)) my colleague and I will submit another patch that adds the chunk size as an optional parameter to all off the load balancing functions. With that parameter, users of these functions *can* decide for themselves which chunk size they prefer for their code. As mentioned before, the most efficient chunk size depends on the used functions runtime, which is the only thing R does not know and users really should be allowed to specify explicitly. The default of this new optional parameter would be the one we used here and this would make that upcoming patch fully source-compatible.
Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].
[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB**
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as possible.
## dynamicClusterApply
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next chunk
**This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing.
## parLapplyLB
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are no more chunks.
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Office: BioCity Leipzig 5e, Room 3.201.3
Phone: +49 341 97 33144
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 Abs. 1 SächsHSFG und wird zusammen mit der Martin-Luther-Universität Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für chemische Ökologie (MPI CE), das Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
Christian Krause
2018-02-20 10:45:38 UTC
Permalink
Dear Henrik,

The rationale is just that it is within these extremes and that it is really simple to calculate, without making any assumptions and knowing that it won't be perfect.

The extremes A and B you are mentioning are special cases based on assumptions. Case A is based on the assumption that the function has a long runtime or varying runtime, then you are likely to get the best load balancing with really small chunks. Case B is based on the assumption that the function runtime is the same for each list element, i.e. where you don't actually need load balancing, i.e. just use `parLapply` without load balancing.

This new default is **not the best one**. It's just a better one than we had before. There is no best one we can use as default because **we don't know the function runtime and how it varies**. The user needs to decide that because he/she knows the function. As mentioned before, I will write a patch that makes the chunk size an optional argument, so the user can decide because only he/she has all the information to choose the best chunk size, just like you did with the `future.scheduling` parameter.

Best Regards
Post by Henrik Bengtsson
Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.
nbrOfElements <- 97
nbrOfWorkers <- 5
With these, there are two extremes in how you can split up the
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20
I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements / nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
However, there are multiple alternatives between the two extremes, e.g.
Post by Henrik Bengtsson
nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
So, is there a reason why you argue for scale = 1.0 to be the optimal?
FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A). Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C). (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)
/Henrik
On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
Post by Henrik Bengtsson
Dear R-Devel List,
I have installed R 3.4.3 with the patch applied on our cluster and
ran a *real-world* job of one of our users to confirm that the patch
works to my satisfaction. Here are the results.
Post by Henrik Bengtsson
The original was a series of jobs, all essentially doing the same
stuff using bootstrapped data, so for the original there is more data
and I show the arithmetic mean with standard deviation. The
confirmation with the patched R was only a single instance of that
series of jobs.
Post by Henrik Bengtsson
## Job Efficiency
The job efficiency is defined as (this is what the `qacct-efficiency`
```
efficiency = cputime / cores / wallclocktime * 100%
```
In simpler words: how well did the job utilize its CPU cores. It
shows the percentage of time the job was actually doing stuff, as
Post by Henrik Bengtsson
```
wasted = 100% - efficiency
```
... which, essentially, tells us how much of the resources were
wasted, i.e. CPU cores just idling, without being used by anyone. We
care a lot about that because, for our scientific computing cluster,
wasted resources is like burning money.
Post by Henrik Bengtsson
### original
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and then shows the average
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and does sort of a
histogram-like binning before calculation of mean and standard
deviation (to get a more detailed impression of the distribution when
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w
10 | sort -gk1 | column -t
Post by Henrik Bengtsson
10 - 20 -> n=3 ∅ 19.21666666666667 ± 0.9112811494447459
20 - 30 -> n=6 ∅ 26.418333333333333 ± 2.665996374091058
30 - 40 -> n=12 ∅ 35.11583333333334 ± 2.8575783082671196
40 - 50 -> n=14 ∅ 45.35285714285715 ± 2.98623361591005
50 - 60 -> n=344 ∅ 57.114593023255814 ± 2.1922005551774415
60 - 70 -> n=453 ∅ 64.29536423841049 ± 2.8334788433963856
70 - 80 -> n=108 ∅ 72.95592592592598 ± 2.5219474143639276
80 - 90 -> n=5 ∅ 81.526 ± 1.2802265424525452
```
I have attached an example graph from our monitoring system of a
single instance in my previous mail. There you can see that the load
balancing does not actually work, i.e. same as `parLapply`. This
reflects in the job efficiency.
Post by Henrik Bengtsson
### patch applied
```
$ qacct -j 4562202 | qacct-efficiency
97.36
```
The graph from our monitoring system is attached. As you can see, the
load balancing works to a satisfying degree and the efficiency is well
above 90% which was what I had hoped for :-)
Post by Henrik Bengtsson
## Additional Notes
The list used in this jobs `parLapplyLB` is 5812 elements long. With
the `splitList`-chunking from the patch, you'll get 208 lists of about
28 elements (208 chunks of size 28). The job ran on 28 CPU cores and
had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the
function we apply to our list takes about 580 seconds per list element,
i.e. about 10 minutes. I suppose, for that runtime, we would get even
better load balancing if we would reduce the chunk size even further,
maybe even down to 1, thus getting our efficiency even closer to 100%.
Post by Henrik Bengtsson
Of course, for really short-running functions, a higher chunk size
may be more efficient because of the overhead. In our case, the
overhead is negligible and that is why the low chunk size works really
well. In contrast, for smallish lists with short-running functions, you
might not even need load balancing and `parLapply` suffices. It only
becomes an issue, when the runtime of the function is high and / or
varying.
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print
sum, "seconds" }'
Post by Henrik Bengtsson
4.72439e+09 seconds
```
Thats about 150 years on a single core or 7.5 years on a 20 core
server! Our user was constantly using about 500 cores, so this took
about 110 days. If you compare this to my 97% efficiency example, the
jobs could have been finished in 75 days instead ;-)
Post by Henrik Bengtsson
## Upcoming Patch
If this patch gets applied to the R code base (and I hope it will
:-)) my colleague and I will submit another patch that adds the chunk
size as an optional parameter to all off the load balancing functions.
With that parameter, users of these functions *can* decide for
themselves which chunk size they prefer for their code. As mentioned
before, the most efficient chunk size depends on the used functions
runtime, which is the only thing R does not know and users really
should be allowed to specify explicitly. The default of this new
optional parameter would be the one we used here and this would make
that upcoming patch fully source-compatible.
Post by Henrik Bengtsson
Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has
[reportedly][1] (see also attached RRD output) not
Post by Henrik Bengtsson
Post by Christian Krause
been doing its job, i.e. not actually balancing the load. My
colleague Dirk Sarpe and I found the cause of the problem
Post by Henrik Bengtsson
Post by Christian Krause
and we also have a patch to fix it (attached). A similar fix has
also been provided [here][2].
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
Post by Henrik Bengtsson
Post by Christian Krause
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code,
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
**clusterApplyLB**
Post by Henrik Bengtsson
Post by Christian Krause
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls
**dynamicClusterApply**
Post by Henrik Bengtsson
Post by Christian Krause
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few
examples. We were using lists of 100 elements and 5
Post by Henrik Bengtsson
Post by Christian Krause
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as
possible.
Post by Henrik Bengtsson
Post by Christian Krause
## dynamicClusterApply
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next
chunk
Post by Henrik Bengtsson
Post by Christian Krause
**This is the important part:** As long as there are **more** chunks
than workers, there will be load balancing. If
Post by Henrik Bengtsson
Post by Christian Krause
there are fewer chunks than workers, each worker will get **at most
one chunk** and there is **no** load balancing.
Post by Henrik Bengtsson
Post by Christian Krause
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution,
there can't possibly be any load balancing, because
Post by Henrik Bengtsson
Post by Christian Krause
each worker is given a single chunk and then it stops working
because there are no more chunks.
Post by Henrik Bengtsson
Post by Christian Krause
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of
workers, if possible at all, and then load balancing
Post by Henrik Bengtsson
Post by Christian Krause
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
Office: BioCity Leipzig 5e, Room 3.201.3
Phone: +49 341 97 33144
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
German Centre for Integrative Biodiversity Research (iDiv)
Halle-Jena-Leipzig
Post by Henrik Bengtsson
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
iDiv is a research centre of the DFG – Deutsche
Forschungsgemeinschaft
Post by Henrik Bengtsson
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne
des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der
Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit
dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte
Kooperationspartner sind die folgenden außeruniversitären
Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH
- UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das
Max-Planck-Institut für chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das
Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen
(DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das
Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK)
und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz
(SMNG). USt-IdNr. DE 141510383
Post by Henrik Bengtsson
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Sent from my mobile device, please excuse my brevity.
Tomas Kalibera
2018-02-26 15:01:21 UTC
Permalink
Dear Christian and Henrik,

thank you for spotting the problem and suggestions for a fix. We'll
probably add a chunk.size argument to parLapplyLB and parLapply to
follow OpenMP terminology, which has already been an inspiration for the
present code (parLapply already implements static scheduling via
internal function staticClusterApply, yet with a fixed chunk size;
parLapplyLB already implements dynamic scheduling via internal function
dynamicClusterApply, but with a fixed chunk size set to an unlucky value
so that it behaves like static scheduling). The default chunk size for
parallelLapplyLB will be set so that there is some dynamism in the
schedule even by default. I am now testing a patch with these changes.

Best
Tomas
Post by Christian Krause
Dear Henrik,
The rationale is just that it is within these extremes and that it is really simple to calculate, without making any assumptions and knowing that it won't be perfect.
The extremes A and B you are mentioning are special cases based on assumptions. Case A is based on the assumption that the function has a long runtime or varying runtime, then you are likely to get the best load balancing with really small chunks. Case B is based on the assumption that the function runtime is the same for each list element, i.e. where you don't actually need load balancing, i.e. just use `parLapply` without load balancing.
This new default is **not the best one**. It's just a better one than we had before. There is no best one we can use as default because **we don't know the function runtime and how it varies**. The user needs to decide that because he/she knows the function. As mentioned before, I will write a patch that makes the chunk size an optional argument, so the user can decide because only he/she has all the information to choose the best chunk size, just like you did with the `future.scheduling` parameter.
Best Regards
Post by Henrik Bengtsson
Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.
nbrOfElements <- 97
nbrOfWorkers <- 5
With these, there are two extremes in how you can split up the
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20
I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements / nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
However, there are multiple alternatives between the two extremes, e.g.
Post by Henrik Bengtsson
nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
So, is there a reason why you argue for scale = 1.0 to be the optimal?
FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A). Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C). (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)
/Henrik
On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
Post by Henrik Bengtsson
Dear R-Devel List,
I have installed R 3.4.3 with the patch applied on our cluster and
ran a *real-world* job of one of our users to confirm that the patch
works to my satisfaction. Here are the results.
Post by Henrik Bengtsson
The original was a series of jobs, all essentially doing the same
stuff using bootstrapped data, so for the original there is more data
and I show the arithmetic mean with standard deviation. The
confirmation with the patched R was only a single instance of that
series of jobs.
Post by Henrik Bengtsson
## Job Efficiency
The job efficiency is defined as (this is what the `qacct-efficiency`
```
efficiency = cputime / cores / wallclocktime * 100%
```
In simpler words: how well did the job utilize its CPU cores. It
shows the percentage of time the job was actually doing stuff, as
Post by Henrik Bengtsson
```
wasted = 100% - efficiency
```
... which, essentially, tells us how much of the resources were
wasted, i.e. CPU cores just idling, without being used by anyone. We
care a lot about that because, for our scientific computing cluster,
wasted resources is like burning money.
Post by Henrik Bengtsson
### original
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and then shows the average
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and does sort of a
histogram-like binning before calculation of mean and standard
deviation (to get a more detailed impression of the distribution when
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w
10 | sort -gk1 | column -t
Post by Henrik Bengtsson
10 - 20 -> n=3 ∅ 19.21666666666667 ± 0.9112811494447459
20 - 30 -> n=6 ∅ 26.418333333333333 ± 2.665996374091058
30 - 40 -> n=12 ∅ 35.11583333333334 ± 2.8575783082671196
40 - 50 -> n=14 ∅ 45.35285714285715 ± 2.98623361591005
50 - 60 -> n=344 ∅ 57.114593023255814 ± 2.1922005551774415
60 - 70 -> n=453 ∅ 64.29536423841049 ± 2.8334788433963856
70 - 80 -> n=108 ∅ 72.95592592592598 ± 2.5219474143639276
80 - 90 -> n=5 ∅ 81.526 ± 1.2802265424525452
```
I have attached an example graph from our monitoring system of a
single instance in my previous mail. There you can see that the load
balancing does not actually work, i.e. same as `parLapply`. This
reflects in the job efficiency.
Post by Henrik Bengtsson
### patch applied
```
$ qacct -j 4562202 | qacct-efficiency
97.36
```
The graph from our monitoring system is attached. As you can see, the
load balancing works to a satisfying degree and the efficiency is well
above 90% which was what I had hoped for :-)
Post by Henrik Bengtsson
## Additional Notes
The list used in this jobs `parLapplyLB` is 5812 elements long. With
the `splitList`-chunking from the patch, you'll get 208 lists of about
28 elements (208 chunks of size 28). The job ran on 28 CPU cores and
had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the
function we apply to our list takes about 580 seconds per list element,
i.e. about 10 minutes. I suppose, for that runtime, we would get even
better load balancing if we would reduce the chunk size even further,
maybe even down to 1, thus getting our efficiency even closer to 100%.
Post by Henrik Bengtsson
Of course, for really short-running functions, a higher chunk size
may be more efficient because of the overhead. In our case, the
overhead is negligible and that is why the low chunk size works really
well. In contrast, for smallish lists with short-running functions, you
might not even need load balancing and `parLapply` suffices. It only
becomes an issue, when the runtime of the function is high and / or
varying.
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print
sum, "seconds" }'
Post by Henrik Bengtsson
4.72439e+09 seconds
```
Thats about 150 years on a single core or 7.5 years on a 20 core
server! Our user was constantly using about 500 cores, so this took
about 110 days. If you compare this to my 97% efficiency example, the
jobs could have been finished in 75 days instead ;-)
Post by Henrik Bengtsson
## Upcoming Patch
If this patch gets applied to the R code base (and I hope it will
:-)) my colleague and I will submit another patch that adds the chunk
size as an optional parameter to all off the load balancing functions.
With that parameter, users of these functions *can* decide for
themselves which chunk size they prefer for their code. As mentioned
before, the most efficient chunk size depends on the used functions
runtime, which is the only thing R does not know and users really
should be allowed to specify explicitly. The default of this new
optional parameter would be the one we used here and this would make
that upcoming patch fully source-compatible.
Post by Henrik Bengtsson
Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has
[reportedly][1] (see also attached RRD output) not
Post by Henrik Bengtsson
Post by Christian Krause
been doing its job, i.e. not actually balancing the load. My
colleague Dirk Sarpe and I found the cause of the problem
Post by Henrik Bengtsson
Post by Christian Krause
and we also have a patch to fix it (attached). A similar fix has
also been provided [here][2].
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
Post by Henrik Bengtsson
Post by Christian Krause
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code,
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
**clusterApplyLB**
Post by Henrik Bengtsson
Post by Christian Krause
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls
**dynamicClusterApply**
Post by Henrik Bengtsson
Post by Christian Krause
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few
examples. We were using lists of 100 elements and 5
Post by Henrik Bengtsson
Post by Christian Krause
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as
possible.
Post by Henrik Bengtsson
Post by Christian Krause
## dynamicClusterApply
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next
chunk
Post by Henrik Bengtsson
Post by Christian Krause
**This is the important part:** As long as there are **more** chunks
than workers, there will be load balancing. If
Post by Henrik Bengtsson
Post by Christian Krause
there are fewer chunks than workers, each worker will get **at most
one chunk** and there is **no** load balancing.
Post by Henrik Bengtsson
Post by Christian Krause
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution,
there can't possibly be any load balancing, because
Post by Henrik Bengtsson
Post by Christian Krause
each worker is given a single chunk and then it stops working
because there are no more chunks.
Post by Henrik Bengtsson
Post by Christian Krause
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of
workers, if possible at all, and then load balancing
Post by Henrik Bengtsson
Post by Christian Krause
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
Office: BioCity Leipzig 5e, Room 3.201.3
Phone: +49 341 97 33144
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
German Centre for Integrative Biodiversity Research (iDiv)
Halle-Jena-Leipzig
Post by Henrik Bengtsson
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
iDiv is a research centre of the DFG – Deutsche
Forschungsgemeinschaft
Post by Henrik Bengtsson
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne
des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der
Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit
dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte
Kooperationspartner sind die folgenden außeruniversitären
Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH
- UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das
Max-Planck-Institut für chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das
Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen
(DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das
Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK)
und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz
(SMNG). USt-IdNr. DE 141510383
Post by Henrik Bengtsson
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
Christian Krause
2018-03-01 08:19:33 UTC
Permalink
Dear Tomas,

Thanks for your commitment to fix this issue and also to add the chunk size as an argument. If you want our input, let us know ;)

Best Regards
Post by Tomas Kalibera
Dear Christian and Henrik,
thank you for spotting the problem and suggestions for a fix. We'll probably add a chunk.size argument to parLapplyLB and parLapply to follow OpenMP terminology, which has already been an inspiration for the present code (parLapply already implements static scheduling via internal function staticClusterApply, yet with a fixed chunk size; parLapplyLB already implements dynamic scheduling via internal function dynamicClusterApply, but with a fixed chunk size set to an unlucky value so that it behaves like static scheduling). The default chunk size for parallelLapplyLB will be set so that there is some dynamism in the schedule even by default. I am now testing a patch with these changes.
Best
Tomas
Post by Christian Krause
Dear Henrik,
The rationale is just that it is within these extremes and that it is really simple to calculate, without making any assumptions and knowing that it won't be perfect.
The extremes A and B you are mentioning are special cases based on assumptions. Case A is based on the assumption that the function has a long runtime or varying runtime, then you are likely to get the best load balancing with really small chunks. Case B is based on the assumption that the function runtime is the same for each list element, i.e. where you don't actually need load balancing, i.e. just use `parLapply` without load balancing.
This new default is **not the best one**. It's just a better one than we had before. There is no best one we can use as default because **we don't know the function runtime and how it varies**. The user needs to decide that because he/she knows the function. As mentioned before, I will write a patch that makes the chunk size an optional argument, so the user can decide because only he/she has all the information to choose the best chunk size, just like you did with the `future.scheduling` parameter.
Best Regards
Post by Henrik Bengtsson
Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.
nbrOfElements <- 97
nbrOfWorkers <- 5
With these, there are two extremes in how you can split up the
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20
I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements / nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
However, there are multiple alternatives between the two extremes, e.g.
Post by Henrik Bengtsson
nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
So, is there a reason why you argue for scale = 1.0 to be the optimal?
FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A).  Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C).   (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)
/Henrik
On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
Post by Henrik Bengtsson
Dear R-Devel List,
I have installed R 3.4.3 with the patch applied on our cluster and
ran a *real-world* job of one of our users to confirm that the patch
works to my satisfaction. Here are the results.
Post by Henrik Bengtsson
The original was a series of jobs, all essentially doing the same
stuff using bootstrapped data, so for the original there is more data
and I show the arithmetic mean with standard deviation. The
confirmation with the patched R was only a single instance of that
series of jobs.
Post by Henrik Bengtsson
## Job Efficiency
The job efficiency is defined as (this is what the `qacct-efficiency`
```
efficiency = cputime / cores / wallclocktime * 100%
```
In simpler words: how well did the job utilize its CPU cores. It
shows the percentage of time the job was actually doing stuff, as
Post by Henrik Bengtsson
```
wasted = 100% - efficiency
```
... which, essentially, tells us how much of the resources were
wasted, i.e. CPU cores just idling, without being used by anyone. We
care a lot about that because, for our scientific computing cluster,
wasted resources is like burning money.
Post by Henrik Bengtsson
### original
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and then shows the average
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and does sort of a
histogram-like binning before calculation of mean and standard
deviation (to get a more detailed impression of the distribution when
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w
10 | sort -gk1 | column -t
Post by Henrik Bengtsson
10  -  20  ->  n=3    ∅  19.21666666666667   ±  0.9112811494447459
20  -  30  ->  n=6    ∅  26.418333333333333  ±  2.665996374091058
30  -  40  ->  n=12   ∅  35.11583333333334   ±  2.8575783082671196
40  -  50  ->  n=14   ∅  45.35285714285715   ±  2.98623361591005
50  -  60  ->  n=344  ∅  57.114593023255814  ±  2.1922005551774415
60  -  70  ->  n=453  ∅  64.29536423841049   ±  2.8334788433963856
70  -  80  ->  n=108  ∅  72.95592592592598   ±  2.5219474143639276
80  -  90  ->  n=5    ∅  81.526              ±  1.2802265424525452
```
I have attached an example graph from our monitoring system of a
single instance in my previous mail. There you can see that the load
balancing does not actually work, i.e. same as `parLapply`. This
reflects in the job efficiency.
Post by Henrik Bengtsson
### patch applied
```
$ qacct -j 4562202 | qacct-efficiency
97.36
```
The graph from our monitoring system is attached. As you can see, the
load balancing works to a satisfying degree and the efficiency is well
above 90% which was what I had hoped for :-)
Post by Henrik Bengtsson
## Additional Notes
The list used in this jobs `parLapplyLB` is 5812 elements long. With
the `splitList`-chunking from the patch, you'll get 208 lists of about
28 elements (208 chunks of size 28). The job ran on 28 CPU cores and
had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the
function we apply to our list takes about 580 seconds per list element,
i.e. about 10 minutes. I suppose, for that runtime, we would get even
better load balancing if we would reduce the chunk size even further,
maybe even down to 1, thus getting our efficiency even closer to 100%.
Post by Henrik Bengtsson
Of course, for really short-running functions, a higher chunk size
may be more efficient because of the overhead. In our case, the
overhead is negligible and that is why the low chunk size works really
well. In contrast, for smallish lists with short-running functions, you
might not even need load balancing and `parLapply` suffices. It only
becomes an issue, when the runtime of the function is high and / or
varying.
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print
sum, "seconds" }'
Post by Henrik Bengtsson
4.72439e+09 seconds
```
Thats about 150 years on a single core or 7.5 years on a 20 core
server! Our user was constantly using about 500 cores, so this took
about 110 days. If you compare this to my 97% efficiency example, the
jobs could have been finished in 75 days instead ;-)
Post by Henrik Bengtsson
## Upcoming Patch
If this patch gets applied to the R code base (and I hope it will
:-)) my colleague and I will submit another patch that adds the chunk
size as an optional parameter to all off the load balancing functions.
With that parameter, users of these functions *can* decide for
themselves which chunk size they prefer for their code. As mentioned
before, the most efficient chunk size depends on the used functions
runtime, which is the only thing R does not know and users really
should be allowed to specify explicitly. The default of this new
optional parameter would be the one we used here and this would make
that upcoming patch fully source-compatible.
Post by Henrik Bengtsson
Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has
[reportedly][1] (see also attached RRD output) not
Post by Henrik Bengtsson
Post by Christian Krause
been doing its job, i.e. not actually balancing the load. My
colleague Dirk Sarpe and I found the cause of the problem
Post by Henrik Bengtsson
Post by Christian Krause
and we also have a patch to fix it (attached). A similar fix has
also been provided [here][2].
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
Post by Henrik Bengtsson
Post by Christian Krause
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code,
1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
**clusterApplyLB**
Post by Henrik Bengtsson
Post by Christian Krause
2.  **splitList:** clusterApply.R:157
3.  **clusterApplyLB:** clusterApply.R:87, calls
**dynamicClusterApply**
Post by Henrik Bengtsson
Post by Christian Krause
4.  **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few
examples. We were using lists of 100 elements and 5
Post by Henrik Bengtsson
Post by Christian Krause
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
  [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as
possible.
Post by Henrik Bengtsson
Post by Christian Krause
## dynamicClusterApply
1.  it first gives a chunk to each worker
2.  once a worker comes back with the result, it is given the next
chunk
Post by Henrik Bengtsson
Post by Christian Krause
**This is the important part:** As long as there are **more** chunks
than workers, there will be load balancing. If
Post by Henrik Bengtsson
Post by Christian Krause
there are fewer chunks than workers, each worker will get **at most
one chunk** and there is **no** load balancing.
Post by Henrik Bengtsson
Post by Christian Krause
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
     cl <- defaultCluster(cl)
     chunks <- splitList(X, length(cl))
     do.call(c,
             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
             quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution,
there can't possibly be any load balancing, because
Post by Henrik Bengtsson
Post by Christian Krause
each worker is given a single chunk and then it stops working
because there are no more chunks.
Post by Henrik Bengtsson
Post by Christian Krause
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
     cl <- defaultCluster(cl)
     chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
     chunks <- splitList(X, chunkSize)
     do.call(c,
             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
             quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
  [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of
workers, if possible at all, and then load balancing
Post by Henrik Bengtsson
Post by Christian Krause
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
Office: BioCity Leipzig 5e, Room 3.201.3
Phone: +49 341 97 33144
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
German Centre for Integrative Biodiversity Research (iDiv)
Halle-Jena-Leipzig
Post by Henrik Bengtsson
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
iDiv is a research centre of the DFG – Deutsche
Forschungsgemeinschaft
Post by Henrik Bengtsson
iDiv ist eine zentrale Einrichtung der UniversitÀt Leipzig im Sinne
des § 92 Abs. 1 SÀchsHSFG und wird zusammen mit der
Martin-Luther-UniversitÀt Halle-Wittenberg und der
Friedrich-Schiller-UniversitÀt Jena betrieben sowie in Kooperation mit
dem Helmholtz-Zentrum fÃŒr Umweltforschung GmbH – UFZ. Beteiligte
Kooperationspartner sind die folgenden außeruniversitÀren
Forschungseinrichtungen: das Helmholtz-Zentrum fÃŒr Umweltforschung GmbH
- UFZ, das Max-Planck-Institut fÃŒr Biogeochemie (MPI BGC), das
Max-Planck-Institut fÃŒr chemische Ökologie (MPI CE), das
Max-Planck-Institut fÌr evolutionÀre Anthropologie (MPI EVA), das
Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen
(DSMZ), das Leibniz-Institut fÃŒr Pflanzenbiochemie (IPB), das
Leibniz-Institut fÃŒr Pflanzengenetik und Kulturpflanzenforschung (IPK)
und das Leibniz-Institut Senckenberg Museum fÌr Naturkunde Görlitz
(SMNG). USt-IdNr. DE 141510383
Post by Henrik Bengtsson
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Email: ***@idiv.de

Office: BioCity Leipzig 5e, Room 3.201.3

Phone: +49 341 97 33144

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der UniversitÀt Leipzig im Sinne des § 92 Abs. 1 SÀchsHSFG und wird zusammen mit der Martin-Luther-UniversitÀt Halle-Wittenberg und der Friedrich-Schiller-UniversitÀt Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum fÃŒr Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden außeruniversitÀren Forschungseinrichtungen: das Helmholtz-Zentrum fÃŒr Umweltforschung GmbH - UFZ, das Max-Planck-Institut fÃŒr Biogeochemie (MPI BGC), das Max-Planck-Institut fÃŒr chemische Ökologie (MPI CE), das Max-Planck-Institut fÃŒr evolutionÀre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut fÃŒr Pflanzenbiochemie (IPB), das Leibniz-Institut fÃŒr Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum fÃŒr Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383
Tomas Kalibera
2018-03-13 08:26:22 UTC
Permalink
Chunk size support has been added in R-devel 74353. Please let me know
if you find any problem.

Thanks,
Tomas
Post by Christian Krause
Dear Tomas,
Thanks for your commitment to fix this issue and also to add the chunk size as an argument. If you want our input, let us know ;)
Best Regards
Post by Tomas Kalibera
Dear Christian and Henrik,
thank you for spotting the problem and suggestions for a fix. We'll probably add a chunk.size argument to parLapplyLB and parLapply to follow OpenMP terminology, which has already been an inspiration for the present code (parLapply already implements static scheduling via internal function staticClusterApply, yet with a fixed chunk size; parLapplyLB already implements dynamic scheduling via internal function dynamicClusterApply, but with a fixed chunk size set to an unlucky value so that it behaves like static scheduling). The default chunk size for parallelLapplyLB will be set so that there is some dynamism in the schedule even by default. I am now testing a patch with these changes.
Best
Tomas
Post by Christian Krause
Dear Henrik,
The rationale is just that it is within these extremes and that it is really simple to calculate, without making any assumptions and knowing that it won't be perfect.
The extremes A and B you are mentioning are special cases based on assumptions. Case A is based on the assumption that the function has a long runtime or varying runtime, then you are likely to get the best load balancing with really small chunks. Case B is based on the assumption that the function runtime is the same for each list element, i.e. where you don't actually need load balancing, i.e. just use `parLapply` without load balancing.
This new default is **not the best one**. It's just a better one than we had before. There is no best one we can use as default because **we don't know the function runtime and how it varies**. The user needs to decide that because he/she knows the function. As mentioned before, I will write a patch that makes the chunk size an optional argument, so the user can decide because only he/she has all the information to choose the best chunk size, just like you did with the `future.scheduling` parameter.
Best Regards
Post by Henrik Bengtsson
Hi, I'm trying to understand the rationale for your proposed amount of
splitting and more precisely why that one is THE one.
nbrOfElements <- 97
nbrOfWorkers <- 5
With these, there are two extremes in how you can split up the
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
[88] 1 1 1 1 1 1 1 1 1 1
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 20 19 19 19 20
I understand that neither of these two extremes may be the best when
it comes to orchestration overhead and load balancing. Instead, the
best might be somewhere in-between, e.g.
Post by Henrik Bengtsson
nbrOfElements <- 97
nbrOfWorkers <- 5
nbrOfChunks <- nbrOfElements / nbrOfWorkers
sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
However, there are multiple alternatives between the two extremes, e.g.
Post by Henrik Bengtsson
nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
So, is there a reason why you argue for scale = 1.0 to be the optimal?
FYI, In future.apply::future_lapply(X, FUN, ...) there is a
'future.scheduling' scale factor(*) argument where default
future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
to (A).  Using future.scheduling = 4 achieves the amount of
load-balancing you propose in (C).   (*) Different definition from the
above 'scale'. (Disclaimer: I'm the author)
/Henrik
On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
Post by Henrik Bengtsson
Dear R-Devel List,
I have installed R 3.4.3 with the patch applied on our cluster and
ran a *real-world* job of one of our users to confirm that the patch
works to my satisfaction. Here are the results.
Post by Henrik Bengtsson
The original was a series of jobs, all essentially doing the same
stuff using bootstrapped data, so for the original there is more data
and I show the arithmetic mean with standard deviation. The
confirmation with the patched R was only a single instance of that
series of jobs.
Post by Henrik Bengtsson
## Job Efficiency
The job efficiency is defined as (this is what the `qacct-efficiency`
```
efficiency = cputime / cores / wallclocktime * 100%
```
In simpler words: how well did the job utilize its CPU cores. It
shows the percentage of time the job was actually doing stuff, as
Post by Henrik Bengtsson
```
wasted = 100% - efficiency
```
... which, essentially, tells us how much of the resources were
wasted, i.e. CPU cores just idling, without being used by anyone. We
care a lot about that because, for our scientific computing cluster,
wasted resources is like burning money.
Post by Henrik Bengtsson
### original
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and then shows the average
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
n=945 ∅ 61.7276 ± 7.78719
```
This is the entire series from our job accounting database, filteres
the successful jobs, calculates efficiency and does sort of a
histogram-like binning before calculation of mean and standard
deviation (to get a more detailed impression of the distribution when
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w
10 | sort -gk1 | column -t
Post by Henrik Bengtsson
10  -  20  ->  n=3    ∅  19.21666666666667   ±  0.9112811494447459
20  -  30  ->  n=6    ∅  26.418333333333333  ±  2.665996374091058
30  -  40  ->  n=12   ∅  35.11583333333334   ±  2.8575783082671196
40  -  50  ->  n=14   ∅  45.35285714285715   ±  2.98623361591005
50  -  60  ->  n=344  ∅  57.114593023255814  ±  2.1922005551774415
60  -  70  ->  n=453  ∅  64.29536423841049   ±  2.8334788433963856
70  -  80  ->  n=108  ∅  72.95592592592598   ±  2.5219474143639276
80  -  90  ->  n=5    ∅  81.526              ±  1.2802265424525452
```
I have attached an example graph from our monitoring system of a
single instance in my previous mail. There you can see that the load
balancing does not actually work, i.e. same as `parLapply`. This
reflects in the job efficiency.
Post by Henrik Bengtsson
### patch applied
```
$ qacct -j 4562202 | qacct-efficiency
97.36
```
The graph from our monitoring system is attached. As you can see, the
load balancing works to a satisfying degree and the efficiency is well
above 90% which was what I had hoped for :-)
Post by Henrik Bengtsson
## Additional Notes
The list used in this jobs `parLapplyLB` is 5812 elements long. With
the `splitList`-chunking from the patch, you'll get 208 lists of about
28 elements (208 chunks of size 28). The job ran on 28 CPU cores and
had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the
function we apply to our list takes about 580 seconds per list element,
i.e. about 10 minutes. I suppose, for that runtime, we would get even
better load balancing if we would reduce the chunk size even further,
maybe even down to 1, thus getting our efficiency even closer to 100%.
Post by Henrik Bengtsson
Of course, for really short-running functions, a higher chunk size
may be more efficient because of the overhead. In our case, the
overhead is negligible and that is why the low chunk size works really
well. In contrast, for smallish lists with short-running functions, you
might not even need load balancing and `parLapply` suffices. It only
becomes an issue, when the runtime of the function is high and / or
varying.
Post by Henrik Bengtsson
```
$ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print
sum, "seconds" }'
Post by Henrik Bengtsson
4.72439e+09 seconds
```
Thats about 150 years on a single core or 7.5 years on a 20 core
server! Our user was constantly using about 500 cores, so this took
about 110 days. If you compare this to my 97% efficiency example, the
jobs could have been finished in 75 days instead ;-)
Post by Henrik Bengtsson
## Upcoming Patch
If this patch gets applied to the R code base (and I hope it will
:-)) my colleague and I will submit another patch that adds the chunk
size as an optional parameter to all off the load balancing functions.
With that parameter, users of these functions *can* decide for
themselves which chunk size they prefer for their code. As mentioned
before, the most efficient chunk size depends on the used functions
runtime, which is the only thing R does not know and users really
should be allowed to specify explicitly. The default of this new
optional parameter would be the one we used here and this would make
that upcoming patch fully source-compatible.
Post by Henrik Bengtsson
Best Regards
Post by Christian Krause
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has
[reportedly][1] (see also attached RRD output) not
Post by Henrik Bengtsson
Post by Christian Krause
been doing its job, i.e. not actually balancing the load. My
colleague Dirk Sarpe and I found the cause of the problem
Post by Henrik Bengtsson
Post by Christian Krause
and we also have a patch to fix it (attached). A similar fix has
also been provided [here][2].
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
Post by Henrik Bengtsson
Post by Christian Krause
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code,
1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
**clusterApplyLB**
Post by Henrik Bengtsson
Post by Christian Krause
2.  **splitList:** clusterApply.R:157
3.  **clusterApplyLB:** clusterApply.R:87, calls
**dynamicClusterApply**
Post by Henrik Bengtsson
Post by Christian Krause
4.  **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few
examples. We were using lists of 100 elements and 5
Post by Henrik Bengtsson
Post by Christian Krause
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
sapply(parallel:::splitList(1:97, 20), length)
  [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as
possible.
Post by Henrik Bengtsson
Post by Christian Krause
## dynamicClusterApply
1.  it first gives a chunk to each worker
2.  once a worker comes back with the result, it is given the next
chunk
Post by Henrik Bengtsson
Post by Christian Krause
**This is the important part:** As long as there are **more** chunks
than workers, there will be load balancing. If
Post by Henrik Bengtsson
Post by Christian Krause
there are fewer chunks than workers, each worker will get **at most
one chunk** and there is **no** load balancing.
Post by Henrik Bengtsson
Post by Christian Krause
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
     cl <- defaultCluster(cl)
     chunks <- splitList(X, length(cl))
     do.call(c,
             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
             quote = TRUE)
}
```
```r
sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution,
there can't possibly be any load balancing, because
Post by Henrik Bengtsson
Post by Christian Krause
each worker is given a single chunk and then it stops working
because there are no more chunks.
Post by Henrik Bengtsson
Post by Christian Krause
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
     cl <- defaultCluster(cl)
     chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
     chunks <- splitList(X, chunkSize)
     do.call(c,
             clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
             quote = TRUE)
}
```
```r
# length(cl) < length(X)
sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
  [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of
workers, if possible at all, and then load balancing
Post by Henrik Bengtsson
Post by Christian Krause
should work.
Best Regards
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
Office: BioCity Leipzig 5e, Room 3.201.3
Phone: +49 341 97 33144
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
German Centre for Integrative Biodiversity Research (iDiv)
Halle-Jena-Leipzig
Post by Henrik Bengtsson
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Post by Henrik Bengtsson
iDiv is a research centre of the DFG – Deutsche
Forschungsgemeinschaft
Post by Henrik Bengtsson
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne
des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der
Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit
dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte
Kooperationspartner sind die folgenden außeruniversitären
Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH
- UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das
Max-Planck-Institut für chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das
Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen
(DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das
Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK)
und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz
(SMNG). USt-IdNr. DE 141510383
Post by Henrik Bengtsson
______________________________________________
https://stat.ethz.ch/mailman/listinfo/r-devel
Loading...