Reputation: 31
When I try to train on a single machine with two GPUs using the PyTorch framework, the program gets stuck at the _init_dist_pytorch('nccl')
step. Single-step debugging shows that the program actually gets stuck at
return TCPStore(
hostname, port, world_size, start_daemon, timeout, multi_tenant=True, use_libuv=use_libuv
)
Here, if I set world_size=1, the program can run normally and successfully initialize the entire process group. However, when world_size=2, the program gets stuck and triggers the error DistStoreError: Timed out after 300 seconds waiting for clients. 1/2 clients joined.
I have tried the following to solve this problem:
Expected output:
Additional information:
I am hoping to find a solution that will allow me to train on a single machine with two GPUs without encountering this error.
When I was trying to reproduce this bug in Jupyter Notebook, I found that if I call the TCPStore constructor by
tmp = TCPStore('localhost', 2313, 1, True, timeout=default_pg_timeout,
multi_tenant=True,use_libuv=False)
print('world_size=1 done')
tmp2 = TCPStore('localhost', 2313, 2, True, timeout=default_pg_timeout,
multi_tenant=True,use_libuv=False)
print('world_size=2 done')
Both lines of code can be executed normally. But if I call the TCPStore constructor by
os.environ['RANK']='0'
os.environ['WORLD_SIZE']='2'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='12340'
os.environ['CUDA_VISIBLE_DEVICES']='0,1'
_init_dist_pytorch('nccl')
print('world_size=2 process group initialized!')
The timeout error would be triggered.
Here is the minimum reproducible example (To make it easier to reproduce the problem, I set default_pg_timeout = timedelta(seconds=60) to set the wait time to 60 seconds. This variable is 300 seconds by default. If necessary, please adjust this statement directly.)
In my environment, the output is as follows: output 1 output 2
Entire output 2:
{
"name": "DistStoreError",
"message": "Timed out after 61 seconds waiting for clients. 1/2 clients joined.",
"stack": "---------------------------------------------------------------------------
DistStoreError Traceback (most recent call last)
Cell In[10], line 15
13 os.environ['MASTER_PORT']='12340'
14 os.environ['CUDA_VISIBLE_DEVICES']='0,1'
---> 15 _init_dist_pytorch('nccl')
16 print('world_size=2 process group initialized!')
Cell In[9], line 6, in _init_dist_pytorch(backend, **kwargs)
4 num_gpus = torch.cuda.device_count()
5 torch.cuda.set_device(rank % num_gpus)
----> 6 dist.init_process_group(backend=backend, rank=rank, world_size=world_size)
File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/c10d_logger.py:86, in _time_logger.<locals>.wrapper(*args, **kwargs)
83 @functools.wraps(func)
84 def wrapper(*args, **kwargs):
85 t1 = time.time_ns()
---> 86 func_return = func(*args, **kwargs)
87 time_spent = time.time_ns() - t1
89 msg_dict = _get_msg_dict(func.__name__, *args, **kwargs)
File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py:1177, in init_process_group(backend, init_method, timeout, world_size, rank, store, group_name, pg_options)
1173 if store is None:
1174 rendezvous_iterator = rendezvous(
1175 init_method, rank, world_size, timeout=timeout
1176 )
-> 1177 store, rank, world_size = next(rendezvous_iterator)
1178 store.set_timeout(timeout)
1180 # Use a PrefixStore to avoid accidental overrides of keys used by
1181 # different systems (e.g. RPC) in case the store is multi-tenant.
File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/rendezvous.py:246, in _env_rendezvous_handler(url, timeout, **kwargs)
243 master_port = int(_get_env_or_raise(\"MASTER_PORT\"))
244 use_libuv = query_dict.get(\"use_libuv\", os.environ.get(\"USE_LIBUV\", \"0\")) == \"1\"
--> 246 store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)
248 yield (store, rank, world_size)
250 # If this configuration is invalidated, there is nothing we can do about it
File ~/miniconda3/envs/visualtext/lib/python3.10/site-packages/torch/distributed/rendezvous.py:174, in _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv)
172 else:
173 start_daemon = rank == 0
--> 174 return TCPStore(
175 hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
176 )
DistStoreError: Timed out after 61 seconds waiting for clients. 1/2 clients joined."
}
Code:
try:
from urllib.parse import urlparse, urlunparse
except ImportError as e:
raise ImportError(
"urllib cannot be found, urlparse from python2 is no longer supported."
) from e
import numbers
import os
import sys
from datetime import timedelta
from typing import Dict, Optional
from torch.distributed import FileStore, PrefixStore, Store, TCPStore
default_pg_timeout = timedelta(seconds=60)
# from .constants import default_pg_timeout
_rendezvous_handlers = {}
def register_rendezvous_handler(scheme, handler):
"""
Register a new rendezvous handler.
Before we can run collective algorithms, participating processes
need to find each other and exchange information to be able to
communicate. We call this process rendezvous.
The outcome of the rendezvous process is a triplet containing a
shared key/value store, the rank of the process, and the total
number of participating processes.
If none of the bundled rendezvous methods apply to your execution
environment you can opt to register your own rendezvous handler.
Pick a unique name and use the URL scheme to identify it when
calling the `rendezvous()` function.
Args:
scheme (str): URL scheme to identify your rendezvous handler.
handler (function): Handler that is invoked when the
`rendezvous()` function is called with a URL that uses
the corresponding scheme. It must be a generator function
that yields the triplet.
"""
global _rendezvous_handlers
if scheme in _rendezvous_handlers:
raise RuntimeError(
f"Rendezvous handler for {scheme}:// already registered"
)
_rendezvous_handlers[scheme] = handler
# Query will have format "rank=0&world_size=1" and is
# converted into {"rank": 0, "world_size": 1}
def _query_to_dict(query: str) -> Dict[str, str]:
return {pair[0]: pair[1] for pair in (pair.split("=") for pair in filter(None, query.split("&")))}
def _rendezvous_helper(url: str, rank: int, world_size_opt: Optional[int], **kwargs):
result = urlparse(url)
if world_size_opt is None:
world_size = -1
if result.scheme == "env":
rank = int(os.environ.get("RANK", rank))
# If the world_size env variable is not present then it is a dynamic group
world_size = int(os.environ.get("WORLD_SIZE", world_size))
else:
world_size = world_size_opt
if rank != -1 or world_size != -1 or world_size_opt is None:
query_dict = _query_to_dict(result.query)
assert (
"rank" not in query_dict and "world_size" not in query_dict
), f"The url: {url} has node-specific arguments(rank, world_size) already."
if rank != -1:
query_dict["rank"] = str(rank)
if world_size != -1 or world_size_opt is None:
query_dict["world_size"] = str(world_size)
result = result._replace(
query=f"{'&'.join([f'{k}={v}' for k, v in query_dict.items()])}"
)
url = urlunparse(result)
if result.scheme not in _rendezvous_handlers:
raise RuntimeError(f"No rendezvous handler for {result.scheme}://")
return _rendezvous_handlers[result.scheme](url, **kwargs)
def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
if not isinstance(url, (str, bytes)):
raise RuntimeError(f"`url` must be a string. {type(url)}: {url}")
if not isinstance(rank, numbers.Integral):
raise RuntimeError(f"`rank` must be an integer. {rank}")
if not isinstance(world_size, numbers.Integral):
raise RuntimeError(f"`world_size` must be an integer. {world_size}")
return _rendezvous_helper(url, rank, world_size, **kwargs)
def _create_store_from_options(backend_options, rank):
store, _, _ = next(_rendezvous_helper(backend_options.init_method, rank, None))
return store
def _rendezvous_error(msg):
return ValueError("Error initializing torch.distributed using " + msg)
def _file_rendezvous_handler(url: str, **kwargs):
def _error(msg):
return _rendezvous_error("file:// rendezvous: " + msg)
result = urlparse(url)
path = result.path
if sys.platform == "win32":
import urllib.request
full_path = result.netloc + result.path
path = urllib.request.url2pathname(full_path)
if path:
# Normalizing an empty string produces ".", which is not expected.
path = os.path.normpath(path)
if not path:
raise _error("path missing")
query_dict = _query_to_dict(result.query)
if "rank" not in query_dict:
raise _error("rank parameter missing")
if "world_size" not in query_dict:
raise _error("world size parameter missing")
rank = int(query_dict["rank"])
world_size = int(query_dict["world_size"])
store = FileStore(path, world_size)
yield (store, rank, world_size)
# If this configuration is invalidated, there is nothing we can do about it
raise RuntimeError("Unable to perform rerendezvous using file:// method")
def _torchelastic_use_agent_store() -> bool:
return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)
import datetime
def _create_c10d_store(hostname, port, rank, world_size, timeout, use_libuv=False) -> Store:
"""
Smartly creates a c10d Store object on ``rank`` based on whether we need to re-use agent store.
The TCPStore server is assumed to be hosted
on ``hostname:port``.
If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
the agent leader (node rank 0) hosts the TCPStore server (for which the
endpoint is specified by the given ``hostname:port``). Hence
ALL ranks will create and return a TCPStore client (e.g. ``start_daemon=False``).
If ``torchelastic_use_agent_store()`` is ``False``, then rank 0 will host
the TCPStore (with multi-tenancy) and it is assumed that rank 0's hostname
and port are correctly passed via ``hostname`` and ``port``. All
non-zero ranks will create and return a TCPStore client.
"""
# check if port is uint16_t
if not 0 <= port < 2**16:
raise ValueError(f"port must have value from 0 to 65535 but was {port}.")
if _torchelastic_use_agent_store():
attempt = os.environ["TORCHELASTIC_RESTART_COUNT"]
tcp_store = TCPStore(hostname, port, world_size, False, timeout)
return PrefixStore(f"/worker/attempt_{attempt}", tcp_store)
else:
start_daemon = rank == 0
return TCPStore(
hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
)
def _tcp_rendezvous_handler(
url: str, timeout: timedelta = timedelta, **kwargs
):
def _error(msg):
return _rendezvous_error("tcp:// rendezvous: " + msg)
result = urlparse(url)
if not result.port:
raise _error("port number missing")
query_dict = _query_to_dict(result.query)
if "rank" not in query_dict:
raise _error("rank parameter missing")
if "world_size" not in query_dict:
raise _error("world size parameter missing")
rank = int(query_dict["rank"])
world_size = int(query_dict["world_size"])
use_libuv = query_dict.get("use_libuv", "0") == "1"
assert result.hostname is not None
store = _create_c10d_store(result.hostname, result.port, rank, world_size, timeout, use_libuv)
yield (store, rank, world_size)
# If this configuration is invalidated, there is nothing we can do about it
raise RuntimeError("Unable to perform re-rendezvous using tcp:// method")
def _env_rendezvous_handler(
url: str, timeout: timedelta = default_pg_timeout, **kwargs
):
def _error(msg):
return _rendezvous_error("env:// rendezvous: " + msg)
def _env_error(var):
return _error(f"environment variable {var} expected, but not set")
def _get_env_or_raise(env_var: str) -> str:
env_val = os.environ.get(env_var, None)
if not env_val:
raise _env_error(env_var)
else:
return env_val
result = urlparse(url)
query_dict = _query_to_dict(result.query)
rank: int
world_size: int
master_port: int
master_addr: str
if "rank" in query_dict:
rank = int(query_dict["rank"])
else:
rank = int(_get_env_or_raise("RANK"))
if "world_size" in query_dict:
world_size = int(query_dict["world_size"])
else:
world_size = int(_get_env_or_raise("WORLD_SIZE"))
master_addr = _get_env_or_raise("MASTER_ADDR")
master_port = int(_get_env_or_raise("MASTER_PORT"))
use_libuv = query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "0")) == "1"
store = _create_c10d_store(master_addr, master_port, rank, world_size, timeout, use_libuv)
yield (store, rank, world_size)
# If this configuration is invalidated, there is nothing we can do about it
raise RuntimeError("Unable to perform re-rendezvous using env:// method")
register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
register_rendezvous_handler("env", _env_rendezvous_handler)
register_rendezvous_handler("file", _file_rendezvous_handler)
""" return TCPStore(
hostname, port, world_size, start_daemon, timeout=datetime.timedelta(seconds=60), multi_tenant=True, use_libuv=use_libuv
) """
tmp = TCPStore('localhost', 2313, 1, True, timeout=default_pg_timeout,
multi_tenant=True,use_libuv=False)
print('world_size=1 done')
tmp2 = TCPStore('localhost', 2313, 2, True, timeout=default_pg_timeout,
multi_tenant=True,use_libuv=False)
print('world_size=2 done')
import time
import accelerate
import torch.distributed as dist
import argparse
import logging
import os
import os.path as osp
import torch
import fastai
from basicsr.utils import (get_env_info, get_root_logger, get_time_str,
scandir)
from basicsr.utils.options import copy_opt_file, dict2str
from omegaconf import OmegaConf
from ldm.data.dataset_depth import DepthDataset
from ldm.data.dataset_sketch import SketchDataset
from basicsr.utils.dist_util import get_dist_info, init_dist, master_only
from ldm.modules.encoders.adapter import Adapter
from ldm.util import load_model_from_config
import random
def _init_dist_pytorch(backend, **kwargs):
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
num_gpus = torch.cuda.device_count()
torch.cuda.set_device(rank % num_gpus)
dist.init_process_group(backend=backend, rank=rank, world_size=world_size)
# world_size=1 process group could be initialized successfully
""" os.environ['RANK']='0'
os.environ['WORLD_SIZE']='1'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='23144'
os.environ['CUDA_VISIBLE_DEVICES']='0'
_init_dist_pytorch('nccl')
print('world_size=1 process group initialized!') """
os.environ['RANK']='0'
os.environ['WORLD_SIZE']='2'
os.environ['MASTER_ADDR']='localhost'
os.environ['MASTER_PORT']='12340'
os.environ['CUDA_VISIBLE_DEVICES']='0,1'
_init_dist_pytorch('nccl')
print('world_size=2 process group initialized!')
Environment:
Python 3.10.14
requirements:
transformers==4.19.2
diffusers==0.11.1
invisible_watermark==0.1.5
basicsr==1.4.2
einops==0.6.0
omegaconf==2.3.0
pytorch_lightning==1.5.9
gradio
opencv-python
pudb
imageio
imageio-ffmpeg
k-diffusion
webdataset
open-clip-torch
kornia
safetensors
timm
torch
torchvision
numpy
matplotlib
accelerate
Upvotes: 3
Views: 2468
Reputation: 21
I solved a similar problem using
torchrun your_script.py
According to https://pytorch.org/docs/stable/elastic/run.html, you don't need to set all those environment variables.
Upvotes: 2