|
import logging |
|
import os |
|
import re |
|
import socket |
|
import sys |
|
import time |
|
import warnings |
|
from datetime import datetime |
|
from enum import Enum |
|
from itertools import cycle, islice |
|
from pathlib import Path |
|
from queue import Queue |
|
from threading import Thread |
|
from typing import Any, Callable, Dict, Optional, Union |
|
|
|
import boto3 |
|
import botocore.exceptions as boto_exceptions |
|
import rich |
|
from botocore.config import Config |
|
from rich.console import Console, ConsoleRenderable |
|
from rich.highlighter import NullHighlighter |
|
from rich.progress import Progress |
|
from rich.text import Text |
|
from rich.traceback import Traceback |
|
|
|
from .aliases import PathOrStr |
|
from .exceptions import ( |
|
OLMoCliError, |
|
OLMoEnvironmentError, |
|
OLMoError, |
|
OLMoNetworkError, |
|
OLMoThreadError, |
|
) |
|
from .torch_util import get_global_rank, get_local_rank, get_node_rank, is_distributed |
|
|
|
try: |
|
from functools import cache |
|
except ImportError: |
|
from functools import lru_cache as cache |
|
|
|
|
|
class StrEnum(str, Enum): |
|
""" |
|
This is equivalent to Python's :class:`enum.StrEnum` since version 3.11. |
|
We include this here for compatibility with older version of Python. |
|
""" |
|
|
|
def __str__(self) -> str: |
|
return self.value |
|
|
|
def __repr__(self) -> str: |
|
return f"'{str(self)}'" |
|
|
|
|
|
_log_extra_fields: Dict[str, Any] = {} |
|
log = logging.getLogger(__name__) |
|
|
|
|
|
class LogFilterType(StrEnum): |
|
rank0_only = "rank0_only" |
|
local_rank0_only = "local_rank0_only" |
|
all_ranks = "all_ranks" |
|
|
|
|
|
def log_extra_field(field_name: str, field_value: Any) -> None: |
|
global _log_extra_fields |
|
if field_value is None: |
|
if field_name in _log_extra_fields: |
|
del _log_extra_fields[field_name] |
|
else: |
|
_log_extra_fields[field_name] = field_value |
|
|
|
|
|
def setup_logging(log_filter_type: LogFilterType = LogFilterType.rank0_only) -> None: |
|
""" |
|
:param rank0_only: INFO and below messages will only be emitted on the rank0 process. |
|
""" |
|
log_extra_field("hostname", socket.gethostname()) |
|
if is_distributed(): |
|
log_extra_field("node_rank", get_node_rank()) |
|
log_extra_field("local_rank", get_local_rank()) |
|
log_extra_field("global_rank", get_global_rank()) |
|
else: |
|
log_extra_field("node_rank", 0) |
|
log_extra_field("local_rank", 0) |
|
log_extra_field("global_rank", 0) |
|
|
|
old_log_record_factory = logging.getLogRecordFactory() |
|
|
|
def log_record_factory(*args, **kwargs) -> logging.LogRecord: |
|
record = old_log_record_factory(*args, **kwargs) |
|
for field_name, field_value in _log_extra_fields.items(): |
|
setattr(record, field_name, field_value) |
|
return record |
|
|
|
logging.setLogRecordFactory(log_record_factory) |
|
|
|
handler: logging.Handler |
|
if ( |
|
os.environ.get("OLMo_NONINTERACTIVE", False) |
|
or os.environ.get("DEBIAN_FRONTEND", None) == "noninteractive" |
|
or not sys.stdout.isatty() |
|
): |
|
handler = logging.StreamHandler(sys.stdout) |
|
formatter = logging.Formatter( |
|
"%(asctime)s\t%(hostname)s:%(local_rank)s\t%(name)s:%(lineno)s\t%(levelname)s\t%(message)s" |
|
) |
|
formatter.default_time_format = "%Y-%m-%d %H:%M:%S" |
|
formatter.default_msec_format = "%s.%03d" |
|
handler.setFormatter(formatter) |
|
else: |
|
handler = RichHandler() |
|
|
|
def rank0_filter(record: logging.LogRecord) -> int: |
|
if record.levelno > logging.INFO: |
|
return 1 |
|
if getattr(record, "global_rank", 0) == 0: |
|
return 1 |
|
else: |
|
return 0 |
|
|
|
def local_rank0_filter(record: logging.LogRecord) -> int: |
|
if record.levelno > logging.INFO: |
|
return 1 |
|
if getattr(record, "local_rank", 0) == 0: |
|
return 1 |
|
else: |
|
return 0 |
|
|
|
if log_filter_type == LogFilterType.rank0_only: |
|
filter = rank0_filter |
|
elif log_filter_type == LogFilterType.local_rank0_only: |
|
filter = local_rank0_filter |
|
elif log_filter_type == LogFilterType.all_ranks: |
|
filter = None |
|
else: |
|
raise ValueError(log_filter_type) |
|
|
|
if filter is not None: |
|
handler.addFilter(filter) |
|
logging.basicConfig(handlers=[handler], level=logging.INFO) |
|
|
|
logging.captureWarnings(True) |
|
logging.getLogger("urllib3").setLevel(logging.ERROR) |
|
|
|
|
|
def excepthook(exctype, value, traceback): |
|
""" |
|
Used to patch `sys.excepthook` in order to log exceptions. |
|
""" |
|
if issubclass(exctype, KeyboardInterrupt): |
|
sys.__excepthook__(exctype, value, traceback) |
|
elif issubclass(exctype, OLMoCliError): |
|
rich.get_console().print(f"[yellow]{value}[/]", highlight=False) |
|
elif issubclass(exctype, OLMoError): |
|
rich.get_console().print(Text(f"{exctype.__name__}:", style="red"), value, highlight=False) |
|
else: |
|
log.critical("Uncaught %s: %s", exctype.__name__, value, exc_info=(exctype, value, traceback)) |
|
|
|
|
|
def install_excepthook(): |
|
sys.excepthook = excepthook |
|
|
|
|
|
def filter_warnings(): |
|
|
|
warnings.filterwarnings( |
|
action="ignore", |
|
category=UserWarning, |
|
message="torch.distributed.*_base is a private function and will be deprecated.*", |
|
) |
|
warnings.filterwarnings( |
|
action="ignore", |
|
category=UserWarning, |
|
message="TypedStorage is deprecated.*", |
|
) |
|
warnings.filterwarnings( |
|
action="ignore", |
|
category=UserWarning, |
|
message="Please use DTensor instead.*", |
|
) |
|
|
|
warnings.filterwarnings( |
|
action="ignore", |
|
message="failed to load.*", |
|
module="torchvision.io.image", |
|
) |
|
|
|
|
|
def set_env_variables(): |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
def prepare_cli_environment(log_filter_type: Optional[LogFilterType] = None): |
|
if log_filter_type is None: |
|
log_filter_type = LogFilterType(os.environ.get("LOG_FILTER_TYPE", "rank0_only")) |
|
rich.reconfigure(width=max(rich.get_console().width, 180), soft_wrap=True) |
|
setup_logging(log_filter_type=log_filter_type) |
|
install_excepthook() |
|
filter_warnings() |
|
set_env_variables() |
|
|
|
|
|
def clean_opt(arg: str) -> str: |
|
if "=" not in arg: |
|
arg = f"{arg}=True" |
|
name, val = arg.split("=", 1) |
|
name = name.strip("-").replace("-", "_") |
|
return f"{name}={val}" |
|
|
|
|
|
class RichHandler(logging.Handler): |
|
""" |
|
A simplified version of rich.logging.RichHandler from |
|
https://github.com/Textualize/rich/blob/master/rich/logging.py |
|
""" |
|
|
|
def __init__( |
|
self, |
|
*, |
|
level: Union[int, str] = logging.NOTSET, |
|
console: Optional[Console] = None, |
|
markup: bool = False, |
|
) -> None: |
|
super().__init__(level=level) |
|
self.console = console or rich.get_console() |
|
self.highlighter = NullHighlighter() |
|
self.markup = markup |
|
|
|
def emit(self, record: logging.LogRecord) -> None: |
|
try: |
|
if hasattr(record.msg, "__rich__") or hasattr(record.msg, "__rich_console__"): |
|
self.console.print(record.msg) |
|
else: |
|
msg: Any = record.msg |
|
if isinstance(record.msg, str): |
|
msg = self.render_message(record=record, message=record.getMessage()) |
|
renderables = [ |
|
self.get_time_text(record), |
|
self.get_level_text(record), |
|
self.get_location_text(record), |
|
msg, |
|
] |
|
if record.exc_info is not None: |
|
tb = Traceback.from_exception(*record.exc_info) |
|
renderables.append(tb) |
|
self.console.print(*renderables) |
|
except Exception: |
|
self.handleError(record) |
|
|
|
def render_message(self, *, record: logging.LogRecord, message: str) -> ConsoleRenderable: |
|
use_markup = getattr(record, "markup", self.markup) |
|
message_text = Text.from_markup(message) if use_markup else Text(message) |
|
|
|
highlighter = getattr(record, "highlighter", self.highlighter) |
|
if highlighter: |
|
message_text = highlighter(message_text) |
|
|
|
return message_text |
|
|
|
def get_time_text(self, record: logging.LogRecord) -> Text: |
|
log_time = datetime.fromtimestamp(record.created) |
|
time_str = log_time.strftime("[%Y-%m-%d %X]") |
|
return Text(time_str, style="log.time", end=" ") |
|
|
|
def get_level_text(self, record: logging.LogRecord) -> Text: |
|
level_name = record.levelname |
|
level_text = Text.styled(level_name.ljust(8), f"logging.level.{level_name.lower()}") |
|
level_text.style = "log.level" |
|
level_text.end = " " |
|
return level_text |
|
|
|
def get_location_text(self, record: logging.LogRecord) -> Text: |
|
name_and_line = f"{record.name}:{record.lineno}" if record.name != "root" else "root" |
|
text = f"[{name_and_line}, rank={record.local_rank}]" |
|
return Text(text, style="log.path") |
|
|
|
|
|
def wait_for(condition: Callable[[], bool], description: str, timeout: float = 10.0): |
|
"""Wait for the condition function to return True.""" |
|
start_time = time.monotonic() |
|
while not condition(): |
|
time.sleep(0.5) |
|
if time.monotonic() - start_time > timeout: |
|
raise TimeoutError(f"{description} timed out") |
|
|
|
|
|
def is_url(path: PathOrStr) -> bool: |
|
return re.match(r"[a-z0-9]+://.*", str(path)) is not None |
|
|
|
|
|
def dir_is_empty(dir: PathOrStr) -> bool: |
|
dir = Path(dir) |
|
if not dir.is_dir(): |
|
return True |
|
try: |
|
next(dir.glob("*")) |
|
return False |
|
except StopIteration: |
|
return True |
|
|
|
|
|
def get_progress_bar() -> Progress: |
|
from cached_path import get_download_progress |
|
|
|
return get_download_progress() |
|
|
|
|
|
def resource_path( |
|
folder: PathOrStr, fname: str, local_cache: Optional[PathOrStr] = None, progress: Optional[Progress] = None |
|
) -> Path: |
|
if local_cache is not None and (local_path := Path(local_cache) / fname).is_file(): |
|
log.info(f"Found local cache of {fname} at {local_path}") |
|
return local_path |
|
else: |
|
from cached_path import cached_path |
|
|
|
return cached_path(f"{str(folder).rstrip('/')}/{fname}", progress=progress) |
|
|
|
|
|
def file_size(path: PathOrStr) -> int: |
|
""" |
|
Get the size of a local or remote file in bytes. |
|
""" |
|
if is_url(path): |
|
from urllib.parse import urlparse |
|
|
|
parsed = urlparse(str(path)) |
|
if parsed.scheme == "gs": |
|
return _gcs_file_size(parsed.netloc, parsed.path.strip("/")) |
|
elif parsed.scheme in ("s3", "r2"): |
|
return _s3_file_size(parsed.scheme, parsed.netloc, parsed.path.strip("/")) |
|
elif parsed.scheme == "file": |
|
return file_size(str(path).replace("file://", "", 1)) |
|
else: |
|
raise NotImplementedError(f"file size not implemented for '{parsed.scheme}' files") |
|
else: |
|
return os.stat(path).st_size |
|
|
|
|
|
def upload(source: PathOrStr, target: str, save_overwrite: bool = False): |
|
"""Upload source file to a target location on GCS or S3.""" |
|
from urllib.parse import urlparse |
|
|
|
source = Path(source) |
|
assert source.is_file() |
|
parsed = urlparse(target) |
|
if parsed.scheme == "gs": |
|
_gcs_upload(source, parsed.netloc, parsed.path.strip("/"), save_overwrite=save_overwrite) |
|
elif parsed.scheme in ("s3", "r2"): |
|
_s3_upload(source, parsed.scheme, parsed.netloc, parsed.path.strip("/"), save_overwrite=save_overwrite) |
|
else: |
|
raise NotImplementedError(f"Upload not implemented for '{parsed.scheme}' scheme") |
|
|
|
|
|
def get_bytes_range(source: PathOrStr, bytes_start: int, num_bytes: int) -> bytes: |
|
if is_url(source): |
|
from urllib.parse import urlparse |
|
|
|
parsed = urlparse(str(source)) |
|
if parsed.scheme == "gs": |
|
return _gcs_get_bytes_range(parsed.netloc, parsed.path.strip("/"), bytes_start, num_bytes) |
|
elif parsed.scheme in ("s3", "r2"): |
|
return _s3_get_bytes_range( |
|
parsed.scheme, parsed.netloc, parsed.path.strip("/"), bytes_start, num_bytes |
|
) |
|
elif parsed.scheme == "file": |
|
return get_bytes_range(str(source).replace("file://", "", 1), bytes_start, num_bytes) |
|
else: |
|
raise NotImplementedError(f"file size not implemented for '{parsed.scheme}' files") |
|
else: |
|
with open(source, "rb") as f: |
|
f.seek(bytes_start) |
|
return f.read(num_bytes) |
|
|
|
|
|
def find_latest_checkpoint(dir: PathOrStr) -> Optional[PathOrStr]: |
|
if is_url(dir): |
|
from urllib.parse import urlparse |
|
|
|
parsed = urlparse(str(dir)) |
|
if parsed.scheme == "gs": |
|
raise NotImplementedError |
|
elif parsed.scheme in ("s3", "r2"): |
|
return _s3_find_latest_checkpoint(parsed.scheme, parsed.netloc, parsed.path.strip("/")) |
|
elif parsed.scheme == "file": |
|
return find_latest_checkpoint(str(dir).replace("file://", "", 1)) |
|
else: |
|
raise NotImplementedError(f"find_latest_checkpoint not implemented for '{parsed.scheme}' files") |
|
else: |
|
latest_step = 0 |
|
latest_checkpoint: Optional[Path] = None |
|
for path in Path(dir).glob("step*"): |
|
if path.is_dir(): |
|
try: |
|
step = int(path.name.replace("step", "").replace("-unsharded", "")) |
|
except ValueError: |
|
continue |
|
|
|
if step > latest_step or (step == latest_step and not path.name.endswith("-unsharded")): |
|
latest_step = step |
|
latest_checkpoint = path |
|
return latest_checkpoint |
|
|
|
|
|
def _gcs_upload(source: Path, bucket_name: str, key: str, save_overwrite: bool = False): |
|
from google.cloud import storage as gcs |
|
|
|
storage_client = gcs.Client() |
|
bucket = storage_client.bucket(bucket_name) |
|
blob = bucket.blob(key) |
|
if not save_overwrite and blob.exists(): |
|
raise FileExistsError(f"gs://{bucket_name}/{key} already exists. Use save_overwrite to overwrite it.") |
|
blob.upload_from_filename(source) |
|
|
|
|
|
def _gcs_file_size(bucket_name: str, key: str) -> int: |
|
from google.api_core.exceptions import NotFound |
|
from google.cloud import storage as gcs |
|
|
|
storage_client = gcs.Client() |
|
bucket = storage_client.bucket(bucket_name) |
|
blob = bucket.blob(key) |
|
try: |
|
blob.reload() |
|
except NotFound: |
|
raise FileNotFoundError(f"gs://{bucket_name}/{key}") |
|
assert blob.size is not None |
|
return blob.size |
|
|
|
|
|
def _gcs_get_bytes_range(bucket_name: str, key: str, bytes_start: int, num_bytes: int) -> bytes: |
|
from google.api_core.exceptions import NotFound |
|
from google.cloud import storage as gcs |
|
|
|
storage_client = gcs.Client() |
|
bucket = storage_client.bucket(bucket_name) |
|
blob = bucket.blob(key) |
|
try: |
|
blob.reload() |
|
except NotFound: |
|
raise FileNotFoundError(f"gs://{bucket_name}/{key}") |
|
return blob.download_as_bytes(start=bytes_start, end=bytes_start + num_bytes - 1) |
|
|
|
|
|
def _get_s3_profile_name(scheme: str) -> Optional[str]: |
|
if scheme == "s3": |
|
|
|
return os.environ.get("S3_PROFILE") |
|
if scheme == "r2": |
|
profile_name = os.environ.get("R2_PROFILE") |
|
if profile_name is None: |
|
raise OLMoEnvironmentError( |
|
"R2 profile name is not set. Did you forget to set the 'R2_PROFILE' env var?" |
|
) |
|
|
|
return profile_name |
|
|
|
raise NotImplementedError(f"Cannot get profile name for scheme {scheme}") |
|
|
|
|
|
def _get_s3_endpoint_url(scheme: str) -> Optional[str]: |
|
if scheme == "s3": |
|
return None |
|
if scheme == "r2": |
|
r2_endpoint_url = os.environ.get("R2_ENDPOINT_URL") |
|
if r2_endpoint_url is None: |
|
raise OLMoEnvironmentError( |
|
"R2 endpoint url is not set. Did you forget to set the 'R2_ENDPOINT_URL' env var?" |
|
) |
|
|
|
return r2_endpoint_url |
|
|
|
raise NotImplementedError(f"Cannot get endpoint url for scheme {scheme}") |
|
|
|
|
|
@cache |
|
def _get_s3_client(scheme: str): |
|
session = boto3.Session(profile_name=_get_s3_profile_name(scheme)) |
|
return session.client( |
|
"s3", |
|
endpoint_url=_get_s3_endpoint_url(scheme), |
|
config=Config(retries={"max_attempts": 10, "mode": "standard"}), |
|
use_ssl=not int(os.environ.get("OLMO_NO_SSL", "0")), |
|
) |
|
|
|
|
|
def _wait_before_retry(attempt: int): |
|
time.sleep(min(0.5 * 2**attempt, 3.0)) |
|
|
|
|
|
def _s3_upload( |
|
source: Path, scheme: str, bucket_name: str, key: str, save_overwrite: bool = False, max_attempts: int = 3 |
|
): |
|
err: Optional[Exception] = None |
|
if not save_overwrite: |
|
for attempt in range(1, max_attempts + 1): |
|
try: |
|
_get_s3_client(scheme).head_object(Bucket=bucket_name, Key=key) |
|
raise FileExistsError( |
|
f"s3://{bucket_name}/{key} already exists. Use save_overwrite to overwrite it." |
|
) |
|
except boto_exceptions.ClientError as e: |
|
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
|
err = None |
|
break |
|
err = e |
|
|
|
if attempt < max_attempts: |
|
log.warning("%s failed attempt %d with retriable error: %s", _s3_upload.__name__, attempt, err) |
|
_wait_before_retry(attempt) |
|
|
|
if err is not None: |
|
raise OLMoNetworkError(f"Failed to check object existence during {scheme} upload") from err |
|
|
|
try: |
|
_get_s3_client(scheme).upload_file(source, bucket_name, key) |
|
except boto_exceptions.ClientError as e: |
|
raise OLMoNetworkError(f"Failed to upload to {scheme}") from e |
|
|
|
|
|
def _s3_file_size(scheme: str, bucket_name: str, key: str, max_attempts: int = 3) -> int: |
|
err: Optional[Exception] = None |
|
for attempt in range(1, max_attempts + 1): |
|
try: |
|
return _get_s3_client(scheme).head_object(Bucket=bucket_name, Key=key)["ContentLength"] |
|
except boto_exceptions.ClientError as e: |
|
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
|
raise FileNotFoundError(f"s3://{bucket_name}/{key}") from e |
|
err = e |
|
|
|
if attempt < max_attempts: |
|
log.warning("%s failed attempt %d with retriable error: %s", _s3_file_size.__name__, attempt, err) |
|
_wait_before_retry(attempt) |
|
|
|
raise OLMoNetworkError(f"Failed to get {scheme} file size") from err |
|
|
|
|
|
def _s3_get_bytes_range( |
|
scheme: str, bucket_name: str, key: str, bytes_start: int, num_bytes: int, max_attempts: int = 3 |
|
) -> bytes: |
|
err: Optional[Exception] = None |
|
for attempt in range(1, max_attempts + 1): |
|
try: |
|
return ( |
|
_get_s3_client(scheme) |
|
.get_object( |
|
Bucket=bucket_name, Key=key, Range=f"bytes={bytes_start}-{bytes_start + num_bytes - 1}" |
|
)["Body"] |
|
.read() |
|
) |
|
except boto_exceptions.ClientError as e: |
|
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
|
raise FileNotFoundError(f"{scheme}://{bucket_name}/{key}") from e |
|
err = e |
|
except (boto_exceptions.HTTPClientError, boto_exceptions.ConnectionError) as e: |
|
|
|
|
|
|
|
err = e |
|
|
|
if attempt < max_attempts: |
|
log.warning( |
|
"%s failed attempt %d with retriable error: %s", _s3_get_bytes_range.__name__, attempt, err |
|
) |
|
_wait_before_retry(attempt) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise OLMoNetworkError(f"Failed to get bytes range from {scheme}") from err |
|
|
|
|
|
def _s3_find_latest_checkpoint(scheme: str, bucket_name: str, prefix: str) -> Optional[str]: |
|
if not prefix.endswith("/"): |
|
prefix = f"{prefix}/" |
|
response = _get_s3_client(scheme).list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter="/") |
|
assert not response["IsTruncated"] |
|
latest_step = 0 |
|
latest_checkpoint: Optional[str] = None |
|
for item in response["CommonPrefixes"]: |
|
prefix = item["Prefix"].strip("/") |
|
checkpoint_name = os.path.split(prefix)[-1] |
|
if not checkpoint_name.startswith("step"): |
|
continue |
|
try: |
|
step = int(checkpoint_name.replace("step", "").replace("-unsharded", "")) |
|
except ValueError: |
|
continue |
|
|
|
|
|
try: |
|
_s3_file_size(scheme, bucket_name, f"{prefix}/config.yaml") |
|
except FileNotFoundError: |
|
continue |
|
|
|
if step > latest_step or (step == latest_step and not checkpoint_name.endswith("-unsharded")): |
|
latest_step = step |
|
latest_checkpoint = f"{scheme}://ai2-llm/{prefix}" |
|
return latest_checkpoint |
|
|
|
|
|
def default_thread_count() -> int: |
|
return int(os.environ.get("OLMO_NUM_THREADS") or min(32, (os.cpu_count() or 1) + 4)) |
|
|
|
|
|
def pass_through_fn(fn, *args, **kwargs): |
|
return fn(*args, **kwargs) |
|
|
|
|
|
def threaded_generator(g, maxsize: int = 16, thread_name: Optional[str] = None): |
|
q: Queue = Queue(maxsize=maxsize) |
|
|
|
sentinel = object() |
|
|
|
def fill_queue(): |
|
try: |
|
for value in g: |
|
q.put(value) |
|
except Exception as e: |
|
q.put(e) |
|
finally: |
|
q.put(sentinel) |
|
|
|
thread_name = thread_name or repr(g) |
|
thread = Thread(name=thread_name, target=fill_queue, daemon=True) |
|
thread.start() |
|
|
|
for x in iter(q.get, sentinel): |
|
if isinstance(x, Exception): |
|
raise OLMoThreadError(f"generator thread {thread_name} failed") from x |
|
else: |
|
yield x |
|
|
|
|
|
def roundrobin(*iterables): |
|
""" |
|
Call the given iterables in a round-robin fashion. For example: |
|
``roundrobin('ABC', 'D', 'EF') --> A D E B F C`` |
|
""" |
|
|
|
num_active = len(iterables) |
|
nexts = cycle(iter(it).__next__ for it in iterables) |
|
while num_active: |
|
try: |
|
for next in nexts: |
|
yield next() |
|
except StopIteration: |
|
|
|
num_active -= 1 |
|
nexts = cycle(islice(nexts, num_active)) |
|
|