Ray is a high-performance distributed execution framework developed by UC Berkeley RISELab.
Ray is a new generation of distributed application framework open source by UC Berkeley’s RISE lab (formerly AMP lab). It was first released as a high-performance distributed computing framework, and changed to distributed application framework in 20 years. It solves complex scenarios with one engine, improves efficiency through dynamic computing and state sharing, and integrates R&D, runtime, and Dr.
class=”pgc-h-arrow-right” data-track=”6″>
Ray includes the following libraries for accelerating machine learning workloads:
- Tune: extensible hyperparameter tuning
- RLlib: Scalable reinforcement learning
- Train: Distributed deep learning (beta)
- Datasets: distributed data loading and computing
and libraries for putting ML and distributed applications into production:
- Serve: extensible and programmable service
- Workflows: Fast, persistent application flow (alpha)
class=”pgc-h-arrow-right” data-track=”15″> Business objectives
Ray is positioned as a distributed application framework, and its main goal is to enable the development and operation of distributed applications.
Ray’s API makes it easy for developers to combine multiple libraries in a single distributed application, for example, Ray tasks and Actors may call into or call from distributed training (e.g. Torque.distributed) or online service loads running on Ray; In this scenario,Ray is used as a “distributed glue” system because it provides a common API interface and is capable of supporting many different workload types.
class=”pgc-h-arrow-right” data-track=”18″>
key concepts
- Task: Ray enables arbitrary functions to be executed asynchronously on a separate Python worker. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPU, GPU, and custom resources. The cluster scheduler uses these resource requests to distribute tasks across the cluster for parallel execution.
- Object: In Ray, tasks and actors are created and computed on objects. We refer to these objects as remote objects because they can be stored anywhere in the Ray cluster, and we refer to them using object references . Remote objects are cached in Ray’s distributed shared memory object library , and each node in the cluster has an object library. In a cluster setting, remote objects can exist on one or more nodes, regardless of who owns the object ref(s).
- Actor: Actor extends the Ray API from functions (tasks) to classes. Actors are essentially stateful workers (or services). When a new actor is instantiated, a new worker is created, and the actor’s methods are scheduled on that particular worker and can access and change the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.
- Placement Groups: Placement groups allow users to atomically reserve resource groups across multiple nodes (i.e., group scheduling). They can then be used to schedule Ray tasks and actors as close to local (PACK) or SPREAD (SPREAD) as possible. Placement groups are commonly used for gang scheduling actors, but also support missions.
- Environment Dependencies: When Ray execits tasks and participants on a remote machine, their environment dependencies (e.g., Python packages, local files, environment variables) must be available for code to run.
class=”pgc-h-arrow-right” data-track=”25″>
As shown in the figure above, the Ray cluster consists of a set of homogeneous worker nodes and a centralized global control storage (GCS) instance. Part of the system metadata is managed by GCS, which are services based on pluggable data stores, and this metadata is also locally cached by workers, such as the address of actors. GCS manages metadata that is accessed less frequently but may be used by most or all workers in the cluster, for example, the current node membership of the cluster. This is to ensure that GCS performance has little impact on application performance.
class=”pgc-h-arrow-right” data-track=”27″>
Execute Python functions in parallel
import ray
ray.init()
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
Model using Ray
import ray
ray.init()
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))
Tune快速入门
Tune is a library for hyperparameter tuning of any size.
- Start a multi-node distributed hyperparameter scan with less than 10 lines of code.
- supports any deep learning framework, including PyTorch, PyTorch Lightning, TensorFlow, and Keras.
- Visualize the results using TensorBoard.
- Choose among scalable SOTA algorithms, such as population Based training (PBT), Vizier’s median stopping rule, HyperBand/ASHA.
- Tune integrates with many optimization libraries, such as FacebookAx, HyperOpt, and Bayesian optimization, enabling you to transparently scale them.
To run this example, you need to install the following:
$ pip install "ray[tune]"
This example runs a parallel grid search to optimize the example objective function.
from ray import tune
def objective(step, alpha, beta):
return (0.1 + alpha * step / 100)**(-1) + beta * 0.1
def training_function(config):
# Hyperparameters
alpha, beta = config["alpha"], config["beta"]
for step in range(10):
# Iterative training function - can be any arbitrary training procedure.
intermediate_score = objective(step, alpha, beta)
# Feed the score back back to Tune.
tune.report(mean_loss=intermediate_score)
analysis = tune.run(
training_function,
config={
"alpha": tune.grid_search([0.001, 0.01, 0.1]),
"beta": tune.choice([1, 2, 3])
})
print("Best config: ", analysis.get_best_config(metric="mean_loss", mode="min"))
# Get a dataframe for analyzing trial results.
df = analysis.results_df
Automatically visualize all test results if TensorBoard is installed:
tensorboard --logdir ~/ray_results
RLlib
RLlib is an industrial-grade reinforcement learning (RL) library built on top of Ray. It provides a highly scalable and unified API for a variety of industry and research applications.
$ pip install "ray[rllib]" tensorflow # or torch
import gym
from ray.rllib.agents.ppo import PPOTrainer
# Define your problem using python and openAI's gym API:
class SimpleCorridor(gym.Env):
"""Corridor in which an agent must learn to move right to reach the exit.
---------------------
| S | 1 | 2 | 3 | G | S=start; G=goal; corridor_length=5
---------------------
Possible actions to chose from are: 0=left; 1=right
Observations are floats indicating the current field index, e.g. 0.0 for
starting position, 1.0 for the field next to the starting position, etc..
Rewards are -0.1 for all steps, except when reaching the goal (+1.0).
"""
def __init__(self, config):
self.end_pos = config["corridor_length"]
self.cur_pos = 0
self.action_space = gym.spaces.Discrete(2) # left and right
self.observation_space = gym.spaces.Box(0.0, self.end_pos, shape=(1,))
def reset(self):
"""Resets the episode and returns the initial observation of the new one.
"""
self.cur_pos = 0
# Return initial observation.
return [self.cur_pos]
def step(self, action):
"""Takes a single step in the episode given `action`
Returns:
New observation, reward, done-flag, info-dict (empty).
"""
# Go left
if action == 0 and self.cur_pos > 0:
self.cur_pos -= 1
# Go right0
elif action == 1:
self.cur_pos += 1
# Set the 'done' flag at the end
done = self.cur_pos >= self.end_pos
# +1 if the target is reached, -1 otherwise.
reward = 1.0 if done else -0.1
return [self.cur_pos], reward, done, {}
# Use has 20 fields (including S and G)
trainer = PPOTrainer(
config={
# Env class to use (here: our gym.Env sub-class from above).
"env": SimpleCorridor,
# Config dict to be passed to our custom env's constructor.
"env_config": {
# Use corridor with 20 fields (including S and G).
"corridor_length": 20
},
# Parallelize environment rollouts.
"num_workers": 3,
})
# Train n iterations and report the results (average plot reward).
# because we have to move around the environment at least 19 times to reach the goal span
# Each step gives us a -0.1 bonus (except for the last step: +1.0),
# We can expect an optimal plot reward of -0.1*18 + 1.0 = -0.8 span
for i in range(5):
results = trainer.train()
print(f"Iter: {i}; avg. reward={results['episode_reward_mean']}")
After training, you may want to perform action computation (inference) in your environment. This is a minimal example of how to do this.
# Perform inference (action computation) based on a given environmental observation.
# Note that we are using a slightly different environment here (len 10 instead of 20),
# However, this should still work because the agent has (hopefully) learned
# "Always right!"
env = SimpleCorridor({"corridor_length": 10})
# Get the initial observation value (should be: [0.0] as the starting position).
obs = env.reset()
done = False
total_reward = 0.0
# Play an episode.
while not done:
# Given the current observation, a single action is computed
# From the environment.
action = trainer.compute_single_action(obs)
# The operation of applying computation to the environment.
obs, reward, done, info = env.step(action)
# Summary awards are used for reporting purposes.
total_reward += reward
# Report results.
print(f"Played 1 episode; total-reward={total_reward}")
Ray Serve
Ray Serve is an extensible model service library based on Ray. It is:
- Framework independent: Use the same toolkit to serve everything from deep learning models built with frameworks like PyTorch or Tensorflow and Keras to Scikit-Learn models or arbitrary business logic.
- Python First: Configure the model service declaratively in pure Python without YAML or JSON configuration.
- Performance oriented: Turn on batch processing, pipeline, and GPU acceleration to improve the throughput of the model.
- Composition native: allows you to create a “model pipeline” by combining multiple models together to drive a single prediction.
- Horizontally scalable: Serve scales linearly as you add more machines. Enable your ML-enabled services to handle increasing traffic.
To run this example, you need to install the following:
$ pip install scikit-learn
$ pip install "ray[serve]"
class=”pgc-h-arrow-right” data-track=”53″> more
- Ray AI Runtime (alpha)
- Distributed Scikit-learn / Joblib
- Distributed LightGBM on Ray
- Distributed multiprocessing.Pool
- Ray Collective Communication Lib
- Distributed PyTorch Lightning Training on Ray
- Using Ray with Pytorch Lightning
- Distributed XGBoost on Ray
- XGBoost-Ray with Dask
- XGBoost-Ray with Modin
—END—
Open source: Apache-2.0 License