# Copyright (c) 2024, EleutherAI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time import shutil import itertools from pathlib import Path from abc import ABC, abstractmethod from deepspeed.accelerator import get_accelerator import pytest from _pytest.outcomes import Skipped from _pytest.fixtures import FixtureLookupError, FixtureFunctionMarker import random import train import torch import torch.distributed as dist from torch.multiprocessing import Process import torch.multiprocessing as mp from yaml import load try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: from yaml import Loader, Dumper from copy import deepcopy import deepspeed TEST_CHECKPOINT_DIR = "test_checkpoint" TEST_LOG_DIR = "test_logs" TEST_TENSORBOARD_DIR = "test_tensorboard" # Worker timeout *after* the first worker has completed. DEEPSPEED_UNIT_WORKER_TIMEOUT = 120 DEEPSPEED_TEST_TIMEOUT = 600 def get_xdist_worker_id(): xdist_worker = os.environ.get("PYTEST_XDIST_WORKER", None) if xdist_worker is not None: xdist_worker_id = xdist_worker.replace("gw", "") return int(xdist_worker_id) return None def get_master_port(): master_port = os.environ.get("DS_TEST_PORT", "29503") xdist_worker_id = get_xdist_worker_id() if xdist_worker_id is not None: master_port = str(int(master_port) + xdist_worker_id) return master_port _num_gpus = None def set_accelerator_visible(): cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES", None) xdist_worker_id = get_xdist_worker_id() if xdist_worker_id is None: xdist_worker_id = 0 if cuda_visible is None: # CUDA_VISIBLE_DEVICES is not set, discover it using accelerator specific command instead if get_accelerator().device_name() == "cuda": if is_rocm_pytorch(): rocm_smi = subprocess.check_output(["rocm-smi", "--showid"]) gpu_ids = filter( lambda s: "GPU" in s, rocm_smi.decode("utf-8").strip().split("\n") ) num_accelerators = len(list(gpu_ids)) else: nvidia_smi = subprocess.check_output(["nvidia-smi", "--list-gpus"]) num_accelerators = len(nvidia_smi.decode("utf-8").strip().split("\n")) elif get_accelerator().device_name() == "xpu": clinfo = subprocess.check_output(["clinfo"]) lines = clinfo.decode("utf-8").strip().split("\n") num_accelerators = 0 for line in lines: match = re.search("Device Type.*GPU", line) if match: num_accelerators += 1 elif get_accelerator().device_name() == "npu": npu_smi = subprocess.check_output(["npu-smi", "info", "-l"]) num_accelerators = int( npu_smi.decode("utf-8").strip().split("\n")[0].split(":")[1].strip() ) else: assert get_accelerator().device_name() == "cpu" cpu_sockets = int( subprocess.check_output( 'cat /proc/cpuinfo | grep "physical id" | sort -u | wc -l', shell=True, ) ) num_accelerators = cpu_sockets cuda_visible = ",".join(map(str, range(num_accelerators))) # rotate list based on xdist worker id, example below # wid=0 -> ['0', '1', '2', '3'] # wid=1 -> ['1', '2', '3', '0'] # wid=2 -> ['2', '3', '0', '1'] # wid=3 -> ['3', '0', '1', '2'] dev_id_list = cuda_visible.split(",") dev_id_list = dev_id_list[xdist_worker_id:] + dev_id_list[:xdist_worker_id] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) def count_gpus(): global _num_gpus if _num_gpus is None: import subprocess nvidia_smi = subprocess.check_output(["nvidia-smi", "--list-gpus"]) _num_gpus = len(nvidia_smi.decode("utf-8").strip().split("\n")) return _num_gpus def set_cuda_visibile(): cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES", None) xdist_worker_id = get_xdist_worker_id() if xdist_worker_id is None: xdist_worker_id = 0 if cuda_visible is None: # CUDA_VISIBLE_DEVICES is not set, discover it from nvidia-smi instead import subprocess nvidia_smi = subprocess.check_output(["nvidia-smi", "--list-gpus"]) num_gpus = len(nvidia_smi.decode("utf-8").strip().split("\n")) cuda_visible = ",".join(map(str, range(num_gpus))) # rotate list based on xdist worker id, example below # wid=0 -> ['0', '1', '2', '3'] # wid=1 -> ['1', '2', '3', '0'] # wid=2 -> ['2', '3', '0', '1'] # wid=3 -> ['3', '0', '1', '2'] dev_id_list = cuda_visible.split(",") dev_id_list = dev_id_list[xdist_worker_id:] + dev_id_list[:xdist_worker_id] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) def get_root_directory(): return Path(__file__).parents[1] def get_config_directory(): return get_root_directory() / "configs" def get_configs_with_path(configs): return [str(get_config_directory() / cfg) for cfg in configs] def clear_test_dirs(): log_dir = os.path.join(get_root_directory(), TEST_LOG_DIR) if os.path.isdir(log_dir): shutil.rmtree(log_dir) checkpoint_dir = os.path.join(get_root_directory(), TEST_CHECKPOINT_DIR) if os.path.isdir(checkpoint_dir): shutil.rmtree(checkpoint_dir) tensorboard_dir = os.path.join(get_root_directory(), TEST_TENSORBOARD_DIR) if os.path.isdir(tensorboard_dir): shutil.rmtree(tensorboard_dir) class DistributedExec(ABC): """ Base class for distributed execution of functions/methods. Contains common methods needed for DistributedTest and DistributedFixture. """ world_size = 2 backend = get_accelerator().communication_backend_name() init_distributed = True set_dist_env = True requires_cuda_env = True reuse_dist_env = False _pool_cache = {} exec_timeout = DEEPSPEED_TEST_TIMEOUT @abstractmethod def run(self): ... def __call__(self, request=None): self._fixture_kwargs = self._get_fixture_kwargs(request, self.run) world_size = self.world_size if self.requires_cuda_env and not get_accelerator().is_available(): pytest.skip("only supported in accelerator environments.") if isinstance(world_size, int): world_size = [world_size] for procs in world_size: self._launch_procs(procs) def _get_fixture_kwargs(self, request, func): if not request: return {} # Grab fixture / parametrize kwargs from pytest request object fixture_kwargs = {} params = inspect.getfullargspec(func).args params.remove("self") for p in params: try: fixture_kwargs[p] = request.getfixturevalue(p) except FixtureLookupError: pass # test methods can have kwargs that are not fixtures return fixture_kwargs def _launch_procs(self, num_procs): # Verify we have enough accelerator devices to run this test if ( get_accelerator().is_available() and get_accelerator().device_count() < num_procs ): pytest.skip( f"Skipping test because not enough GPUs are available: {num_procs} required, {get_accelerator().device_count()} available" ) mp.set_start_method("spawn", force=True) # Create process pool or use cached one master_port = None if self.reuse_dist_env: if num_procs not in self._pool_cache: self._pool_cache[num_procs] = mp.Pool(processes=num_procs) master_port = get_master_port() pool = self._pool_cache[num_procs] else: pool = mp.Pool(processes=num_procs) master_port = get_master_port() # Run the test args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)] skip_msgs_async = pool.starmap_async(self._dist_run, args) try: skip_msgs = skip_msgs_async.get(self.exec_timeout) except mp.TimeoutError: # Shortcut to exit pytest in the case of a hanged test. This # usually means an environment error and the rest of tests will # hang (causing super long unit test runtimes) pytest.exit("Test hanged, exiting", returncode=0) # Tear down distributed environment and close process pools self._close_pool(pool, num_procs) # If we skipped a test, propagate that to this process if any(skip_msgs): assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" pytest.skip(skip_msgs[0]) def _dist_run(self, local_rank, num_procs, master_port): skip_msg = "" if not dist.is_initialized(): """Initialize deepspeed.comm and execute the user function.""" if self.set_dist_env: os.environ["MASTER_ADDR"] = "127.0.0.1" os.environ["MASTER_PORT"] = str(master_port) os.environ["LOCAL_RANK"] = str(local_rank) # NOTE: unit tests don't support multi-node so local_rank == global rank os.environ["RANK"] = str(local_rank) # In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE # DeepSpeed single node launcher would also set LOCAL_SIZE accordingly os.environ["LOCAL_SIZE"] = str(num_procs) os.environ["WORLD_SIZE"] = str(num_procs) # turn off NCCL logging if set os.environ.pop("NCCL_DEBUG", None) if get_accelerator().is_available(): set_accelerator_visible() if get_accelerator().is_available(): get_accelerator().set_device(local_rank) if self.init_distributed: deepspeed.init_distributed(dist_backend=self.backend) dist.barrier() try: self.run(**self._fixture_kwargs) except BaseException as e: if isinstance(e, Skipped): skip_msg = e.msg else: raise e return skip_msg def _dist_destroy(self): if (dist is not None) and dist.is_initialized(): dist.barrier() dist.destroy_process_group() def _close_pool(self, pool, num_procs, force=False): if force or not self.reuse_dist_env: msg = pool.starmap(self._dist_destroy, [() for _ in range(num_procs)]) pool.close() pool.join() class DistributedFixture(DistributedExec): """ Implementation that extends @pytest.fixture to allow for distributed execution. This is primarily meant to be used when a test requires executing two pieces of code with different world sizes. There are 2 parameters that can be modified: - world_size: int = 2 -- the number of processes to launch - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use Features: - able to call pytest.skip() inside fixture - can be reused by multiple tests - can accept other fixtures as input Limitations: - cannot use @pytest.mark.parametrize - world_size cannot be modified after definition and only one world_size value is accepted - any fixtures used must also be used in the test that uses this fixture (see example below) - return values cannot be returned. Passing values to a DistributedTest object can be achieved using class_tmpdir and writing to file (see example below) Usage: - must implement a run(self, ...) method - fixture can be used by making the class name input to a test function Example: @pytest.fixture(params=[10,20]) def regular_pytest_fixture(request): return request.param class distributed_fixture_example(DistributedFixture): world_size = 4 def run(self, regular_pytest_fixture, class_tmpdir): assert int(os.environ["WORLD_SIZE"]) == self.world_size local_rank = os.environ["LOCAL_RANK"] print(f"Rank {local_rank} with value {regular_pytest_fixture}") with open(os.path.join(class_tmpdir, f"{local_rank}.txt"), "w") as f: f.write(f"{local_rank},{regular_pytest_fixture}") class TestExample(DistributedTest): world_size = 1 def test(self, distributed_fixture_example, regular_pytest_fixture, class_tmpdir): assert int(os.environ["WORLD_SIZE"]) == self.world_size for rank in range(4): with open(os.path.join(class_tmpdir, f"{rank}.txt"), "r") as f: assert f.read() == f"{rank},{regular_pytest_fixture}" """ is_dist_fixture = True # These values are just placeholders so that pytest recognizes this as a fixture _pytestfixturefunction = FixtureFunctionMarker(scope="function", params=None) __name__ = "" def __init__(self): assert isinstance( self.world_size, int ), "Only one world size is allowed for distributed fixtures" self.__name__ = type(self).__name__ _pytestfixturefunction = FixtureFunctionMarker( scope="function", params=None, name=self.__name__ ) class DistributedTest(DistributedExec): """ Implementation for running pytest with distributed execution. There are 2 parameters that can be modified: - world_size: Union[int,List[int]] = 2 -- the number of processes to launch - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use Features: - able to call pytest.skip() inside tests - works with pytest fixtures, parametrize, mark, etc. - can contain multiple tests (each of which can be parametrized separately) - class methods can be fixtures (usable by tests in this class only) - world_size can be changed for individual tests using @pytest.mark.world_size(world_size) - class_tmpdir is a fixture that can be used to get a tmpdir shared among all tests (including DistributedFixture) Usage: - class name must start with "Test" - must implement one or more test*(self, ...) methods Example: @pytest.fixture(params=[10,20]) def val1(request): return request.param @pytest.mark.fast @pytest.mark.parametrize("val2", [30,40]) class TestExample(DistributedTest): world_size = 2 @pytest.fixture(params=[50,60]) def val3(self, request): return request.param def test_1(self, val1, val2, str1="hello world"): assert int(os.environ["WORLD_SIZE"]) == self.world_size assert all(val1, val2, str1) @pytest.mark.world_size(1) @pytest.mark.parametrize("val4", [70,80]) def test_2(self, val1, val2, val3, val4): assert int(os.environ["WORLD_SIZE"]) == 1 assert all(val1, val2, val3, val4) """ def __init__(self): self.is_dist_test = True # Temporary directory that is shared among test methods in a class @pytest.fixture(autouse=True, scope="class") def class_tmpdir(self, tmpdir_factory): fn = tmpdir_factory.mktemp(self.__class__.__name__) return fn def run(self, **fixture_kwargs): self._current_test(**fixture_kwargs) def __call__(self, request): self._current_test = self._get_current_test_func(request) self._fixture_kwargs = self._get_fixture_kwargs(request, self._current_test) if self.requires_cuda_env and not get_accelerator().is_available(): pytest.skip("only supported in accelerator environments.") # Catch world_size override pytest mark for mark in getattr(request.function, "pytestmark", []): if mark.name == "world_size": world_size = mark.args[0] break else: world_size = self.world_size if isinstance(world_size, int): world_size = [world_size] for procs in world_size: self._launch_procs(procs) time.sleep(0.5) def _get_current_test_func(self, request): # DistributedTest subclasses may have multiple test methods func_name = request.function.__name__ return getattr(self, func_name) def get_test_path(filename): curr_path = Path(__file__).parent return str(curr_path.joinpath(filename)) def model_setup(yaml_list=None, param_dict=None, clear_data=True): from megatron.neox_arguments import NeoXArgs from megatron.mpu import destroy_model_parallel from megatron import initialize_megatron from megatron.training import setup_model_and_optimizer destroy_model_parallel() # mpu model parallel contains remaining global vars if clear_data and ( not torch.distributed.is_initialized() or torch.distributed.get_world_size() == 1 or torch.distributed.get_rank() == 0 ): clear_test_dirs() overwrite_values = { "user_script": str(get_root_directory() / "train.py"), "save": TEST_CHECKPOINT_DIR, "load": TEST_CHECKPOINT_DIR, "log_dir": TEST_LOG_DIR, "tensorboard_dir": TEST_TENSORBOARD_DIR, } # should not both be none assert yaml_list is not None or param_dict is not None # initially load config from files as would be the case in deepy.py if yaml_list is not None: args_loaded = NeoXArgs.from_ymls(yaml_list, overwrite_values=overwrite_values) else: p_dict = param_dict.copy() p_dict.update(overwrite_values) args_loaded = NeoXArgs.from_dict(p_dict) args_loaded.build_tokenizer() initialize_megatron(neox_args=args_loaded) model, optimizer, lr_scheduler = setup_model_and_optimizer( neox_args=args_loaded, use_cache=True ) return model, optimizer, lr_scheduler, args_loaded def simulate_deepy_env(monkeypatch, input_args): from megatron.neox_arguments import NeoXArgs monkeypatch.setenv("WORLD_SIZE", "1") monkeypatch.setenv("RANK", "0") neox_args = NeoXArgs.consume_deepy_args(input_args) deepspeed_main_args = neox_args.get_deepspeed_main_args() return deepspeed_main_args def save_random_model(input_args, model_dir, train_iters=0): # Save randomly initialised model train_args = { "do_train": False, "train_iters": train_iters, "save": model_dir, "extra_save_iters": [train_iters], } train.main(input_args=input_args, overwrite_values=train_args) def bounded_product(sequence, n=None, seed=None): """ Returns a shuffled, bounded cartesian product of the input sequence. Designed to cover as wide a range of permutations as possible with a limited number of iterations. Will manifest the whole list in memory, so not suitable for super large sequences. :param sequence: iterable :param n: length of returned list :param seed: random seed for reproducibility :return: list """ p = list(itertools.product(*sequence)) if seed is not None: random.seed(seed) random.shuffle(p) return p if n is None else p[:n] def model_setup_simple(deepspeed_main_args, overwrite_values, iteration=None): from megatron.neox_arguments import NeoXArgs from megatron import initialize_megatron from megatron.training import setup_model_and_optimizer neox_args = NeoXArgs.consume_neox_args( input_args=deepspeed_main_args, overwrite_values=overwrite_values ) neox_args.configure_distributed_args() neox_args.build_tokenizer() initialize_megatron(neox_args=neox_args) model, optimizer, lr_scheduler = setup_model_and_optimizer( neox_args=neox_args, use_cache=False ) return model, optimizer, lr_scheduler, neox_args def parametrize( params_to_test: dict, max_tests: int = 50, seed: int = None, with_names=True ): """ Generates a random sample of max_tests length of all possible combinations of values in `params_to_test`. In `params_to_test` you can either specify one value, and all possible settings of that value, or two values separated by a comma, and all possible combinations of those two values in tandem. i.e "hidden_size,num_heads": [[768,12], [1024,32], [2048, 64]] so the first item in each list is a value of `hidden_size` and the second a value of `num_heads` this is useful for reducing the size of possible tests for values we know are unlikely to interact beforehand, since the cartesian product can grow very large. :param params_to_test: dict of neox params :param max_tests: maximum number of tests to run :param seed: random seed :return: a list of neox param dicts to pass to a parametrized unit test """ keys, values = zip(*params_to_test.items()) ret = [] if with_names: experiments = [] for p in bounded_product(values, n=max_tests, seed=seed): experiment = dict(zip(keys, p)) to_pop = [] to_add = {} for k, v in experiment.items(): if "," in k: keys_split = [i.strip() for i in k.split(",")] values_separated = experiment[k] to_pop.append(k) assert len(values_separated) == len(keys_split) new_dict = dict(zip(keys_split, values_separated)) to_add.update(new_dict) experiment.update(to_add) for k in to_pop: experiment.pop(k) base = deepcopy(BASE_CONFIG) base.update(experiment) ret.append(base) if with_names: experiments.append(experiment) if with_names: return ret, [dict_repr(d) for d in experiments] return ret def dict_repr(d): return " ".join([f"{str(k)} : {str(v)}" for k, v in d.items()]) binary = [True, False] with open("tests/config/test_setup.yml", "r") as f: BASE_CONFIG = load(f, Loader=Loader) print(f"Base Config:\n{BASE_CONFIG}")