Handling large objects with Ray Pool map

I am looking to use Ray's pool mapping programming to perform a CPU heavy task on lots of Learning data. My initial attempt looked like Earhost this:

import ray
from ray.util.multiprocessing import Pool
from functools import partial

def compute(series1, series2):
 return ***computationally heavy task***
def preprocess(seriesseq, indeces):
    series1 = seriesseq[indeces[0]]
    series2 = seriesseq[indeces[1]]
    return compute(series1, series2)

func = partial(preprocess, datablock) 
with Pool() as p:
    p.imap(func, combos, chunksize=10000)

The issue with this particular implementation is that in my case, datablock is 27GB large and combos is 1.2GB large. This caused my machine with 64GB of RAM to run out of memory. Looking at the Ray dashboard, it took around 40 minutes to start the computation, and it rapidly sucked down memory causing a OOM system lockup. I can only assume it was trying to copy the 27GB dataset to every worker which caused the issue.

I later modified the function to handle Ray references to get around the memory issue as such:


def preprocess_reference(seriesref, indeces):
    seriesseq=ray.get(seriesref)
    series1 = seriesseq[indeces[0]]
    series2 = seriesseq[indeces[1]]
    return compute(series1, series2)

func = partial(preprocess_reference, datablock_ref) 
   with Pool() as p:
    p.imap(func, combos, chunksize=10000)


This actually worked to keep the memory in check, and I suspect it only copied the 1.2GB combos object rather than the 27GB dataset. However, compared to smaller datasets that I could run with the first code snippet in memory, this took longer than I expected, far more than one would expect given the increased data sizes. Strangely, I noticed that my cpu was pinned at 100% the whole time it was calculating, but the computer was completely responsive, and starting other tasks on it actually decreased the reported CPU usage. These counter-intuitive observations indicate to me that the computer is spending a lot of time simply waiting on ray.get() calls, which would be reported as cpu usage, but is actually not doing any computation. This obviously is very inefficient and likely the reason the computation took so long.

My question is if this approach is the correct one for situations like my own when Ray is operating on datasets that do not fit into memory, and if I am correct that this method suffers from lethargy in the ray.get() function. If this is the case, is there some way to asynchronously queue up chunks for the preprocess/compute function to work on the data faster?

