One strategy to speed up or scale a machine learning workflow is parallel/distributed processing. In python, the multiprocessing module can serve as a solution for this purpose. However, it falls short (and can even harm overal performance) for parallel functions that requires heavy workloads or costly initialization due to data copying, moving and overhead input serialization/deserialization.

Meanwhile, Ray is perfectly suited to such scenarios. Let’s work on two toy examples to illustrate that.

Two core concepts (among others) of Ray that make it powerful in distrubed programming are:

  • Task: like an asynchronous function that can be executed in a seperate process or a remote machine.
  • Actor: like an asynchronous stateful class that can run in a seperate process or remotely together with its own methods. Particularly, other actors and tasks from different processes can acess and mutate actor’s states.

Table of Contents

1. Parallelize a bundle of matrix multiplication functions

Let’s parallelize a set of functions described by \(f_i = x*y_i\) where \(x\) is a fixed 10240x10240 float matrix (~800MB), representing heavy input, and \(y_i\) denotes a variable 10240x1024 float matrix for each function \(f_i\).

With Multiprocessing:

import time
import tracemalloc
from multiprocessing import Pool

import numpy as np
import psutil

num_cpus = psutil.cpu_count(logical=False)  # 8
num_workers = num_cpus // 2
pool = Pool(num_workers) # multiprocessing pool


np.random.seed(1234)
x = np.random.randn(10240, 10240)  # x takes 800MB

def task(x, y):  # noqa: D103
    z = np.matmul(x, y)
    return z

def run_multiple_tasks_in_parallel(i_trial):
    np.random.seed(i_trial)
    y_s = [np.random.rand(10240, 128) for _ in range(num_workers)]  # each y takes 10MB
    results = pool.starmap(task, zip([x] * len(y_s), y_s, strict=True))
    return results

if __name__ == "__main__":
    # benchmark
    num_trials = 10
    start_time = time.perf_counter()

    # tracemalloc.start()
    for i_trial in range(num_trials):
        run_multiple_tasks_in_parallel(i_trial)
    # current_mem, peak_mem = tracemalloc.get_traced_memory()
    # tracemalloc.stop()

    end_time = time.perf_counter()
    print(
        f"Avg Time: {(end_time - start_time)/num_trials:.2f} (s)"
    )  # while benchmarking time, disable mem_usage to avoid additional calculation.
    # print(f"Peak memory: {peak_mem/(1024*1024):.2f} (MB)")
Avg Time: 7.31 (s)
Peak memory: 1770.71 (MB)

With Ray:

import ray

num_cpus = psutil.cpu_count(logical=False)  # 8
num_workers = num_cpus // 2  # max parallel tasks
ray.init(num_cpus=num_workers)  # init ray

np.random.seed(1234)
x = np.random.randn(10240, 10240)  # x takes 800MB
x_ref = ray.put(x)  # put x in ray's object store and return its reference.

@ray.remote  # convert func to ray's remote task
def task(x, y):  # noqa: D103
    z = np.matmul(x, y)
    return z

def run_multiple_tasks_in_parallel(i_run):
    np.random.seed(i_run)
    y_s = [np.random.rand(10240, 128) for _ in range(num_workers)]  # each y takes 10MB
    ray_task_list = [task.remote(x_ref, y) for y in y_s]  # pass reference of x instead of x itself
    results = ray.get(ray_task_list)
    return results

Avg Time: 0.95 (s)
Peak memory: 44.38 (MB)

Clearly, Ray is much faster and much more memory-efficient than Multiprocessing. This is due to the fact that in Multiprocessing, each process worker has to copy and pass expensive input data (i.e. \(x\)) from main process, which ends up with overhead serialization/deserialization and high memory usage. On the contrary, in Ray, the main process puts the fixed matrix \(x\) in a shared object store and pass the reference of \(x\) to each worker. This reduces memory usage as each worker now uses the same object \(x\), without duplication. Morever, for object of primitive datatypes, such as numpy array, Ray avoids serializing them, allowing process workers to read them directly without deserialization, leading to significant performance gains.

2. Parallelize a bunch of Named Entity Recognition (NER) models

Considering the scenarios where a server is asked to tag named entities in a batch of text. The server calls on workers, each of which load a NER model and process a text in batch. When a batch is completed, another batch arrives and the workers continue their works.

With Multiprocessing:

import time
import tracemalloc
from multiprocessing import Pool

import numpy as np
import psutil
import spacy

num_cpus = psutil.cpu_count(logical=False)  # 8
num_workers = num_cpus // 2
pool = Pool(num_workers) # multiprocessing pool

def task(text):
    ner = spacy.load("en_core_web_sm") # load NER model
    entities = ner(text)
    return entities

def run_multiple_tasks_in_parallel(i_batch):
    np.random.seed(i_batch)
    text_s = ["Paris is the capital of France."] * num_workers
    results = pool.map(task, text_s)
    return results

if __name__ == "__main__":
    # benchmark
    num_batches = 10
    start_time = time.perf_counter()

    # tracemalloc.start()
    for i_batch in range(num_batches): # one batch is completed, another arrives.
        run_multiple_tasks_in_parallel(i_batch)
    # current_mem, peak_mem = tracemalloc.get_traced_memory()
    # tracemalloc.stop()

    end_time = time.perf_counter()
    print(
        f"Total Time for processing {num_batches} batches of texts: {end_time - start_time:.2f} (s)"
    )  # while benchmarking time, disable mem_usage to avoid additional calculation.
    # print(f"Peak memory: {peak_mem/(1024*1024):.2f} (MB)")
Total Time for processing 10 batches of texts: 38.17 (s)
Peak memory: 143.63 (MB)

With Ray:

import ray

num_cpus = psutil.cpu_count(logical=False)  # 8
num_workers = num_cpus // 2
ray.init(num_cpus=num_workers)

@ray.remote
class NER:
    def __init__(self):
        self.ner = spacy.load("en_core_web_sm")  # load NER model

    def tag(self, text):  # named entity tag function
        entities = self.ner(text)
        return entities

# creat ray workers via actor, the ner models are loaded once at actor's construction time
ner_actors = [NER.remote() for _ in range(num_workers)]

def run_multiple_tasks_in_parallel(i_batch):
    np.random.seed(i_batch)
    y_s = ["Paris is the capital of France."] * num_workers
    results = ray.get([actor.tag.remote(y) for actor, y in zip(ner_actors, y_s)])
    return results
Total Time for processing 10 batches of texts: 8.73 (s)
Peak memory: 144.62 (MB)

Worker processes in Multiprocessing.Pool are stateless, thus, for every pool.map call for every batch, the NER models need to be reloaded. Meanwhile, Ray’s actors are stateful, NER models are loaded only once at actor’s construction time (i.e. __init__function). Future batches are then processed by just calling tag function. This explains the outperformance of Ray over Multiprocessing. Additionally, in term of memory usage, both frameworks observe similary memory peaks, as the task does not involve any large data objects.