问题 如何运行tensorflow分布式mnist示例


我是分布式tensorflow的新手。我在这里找到了这个分布式的mnist测试: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.py

但我不知道如何让它运行。我使用了以下脚本:

  python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=0 --worker_grpc_url="grpc://tf-worker0:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=1 --worker_grpc_url="grpc://tf-worker1:2222"\
  & python distributed_mnist.py  --num_workers=3 --num_parameter_servers=1 --worker_index=2 --worker_grpc_url="grpc://tf-worker2:2222"

我刚发现这些参数丢失了,所以我将它们传递给程序。这是发生的事情:

I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
Worker GRPC URL: grpc://tf-worker0:2222
Worker index = 0
Number of workers = 3
Worker GRPC URL: grpc://tf-worker2:2222
Worker index = 2
Number of workers = 3
Worker GRPC URL: grpc://tf-worker1:2222
Worker index = 1
Number of workers = 3
Worker 0: Initializing session...
Worker 2: Waiting for session to be initialized...
Worker 1: Waiting for session to be initialized...
E0608 20:37:13.514249023    7501 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.514287961    7501 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.548052986    7502 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.548091527    7502 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:13.555449386    7503 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:13.555473898    7503 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
^CE0608 20:37:28.517451603    7504 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.517491102    7504 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.551002331    7505 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.551029795    7505 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds
E0608 20:37:28.556681378    7506 resolve_address_posix.c:126] getaddrinfo: Name or service not known
D0608 20:37:28.556709728    7506 dns_resolver.c:189]         dns resolution failed: retrying in 15 seconds

有谁知道如何正确运行它?非常感谢!


12433
2018-06-08 20:40


起源



答案:


的价值观 --worker_grpc_url 命令行中的标志指的是不存在的地址。

这个 脚本 设计用于在特定的Kubernetes环境中运行,而不是独立运行。尤其是 tf-worker0:2222tf-worker1:2222,和 tf-worker2:2222 请参阅由此测试的自动化版本创建的Kubernetes容器的名称。作为独立测试,需要进行大量更改。

分布式TensorFlow的文档包括 示例培训师计划的代码。在分布式TensorFlow上尝试MNIST的最简单方法是将模型粘贴到模板中。例如,类似下面的内容应该有效:

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
      y_ = tf.placeholder(tf.float32, [None, 10])

      hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
      loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.summary.merge_all()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

        batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
        train_feed = {x: batch_xs, y_: batch_ys}

        _, step = sess.run([train_op, global_step], feed_dict=train_feed)
        if step % 100 == 0: 
            print "Done step %d" % step

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

16
2018-06-14 22:46



非常感谢您的解释!我会尝试的。 - xyd
非常感谢!只是贡献我的2美分你的精彩帖子。 tf.merge_all_summaries()似乎已被弃用,并且在最新版本上给出错误要么使用tf.merge_all,要么使用tf.contrib.deprecated.merge_all_summaries - sunil manikani
@sunilmanikani感谢您指出...我已经更新了要使用的代码 tf.summary.merge_all()。 - mrry
FYI,tf.train.Supervisor现已弃用,应改为使用tf.train.MonitoredTrainingSession。看到 tensorflow.org/api_docs/python/tf/train/Supervisor。 - erwaman
我对代码的理解是每个工人都在阅读 充分 输入数据;没有输入数据的分区。我的理解是否正确? - erwaman


答案:


的价值观 --worker_grpc_url 命令行中的标志指的是不存在的地址。

这个 脚本 设计用于在特定的Kubernetes环境中运行,而不是独立运行。尤其是 tf-worker0:2222tf-worker1:2222,和 tf-worker2:2222 请参阅由此测试的自动化版本创建的Kubernetes容器的名称。作为独立测试,需要进行大量更改。

分布式TensorFlow的文档包括 示例培训师计划的代码。在分布式TensorFlow上尝试MNIST的最简单方法是将模型粘贴到模板中。例如,类似下面的内容应该有效:

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
      y_ = tf.placeholder(tf.float32, [None, 10])

      hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
      loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.summary.merge_all()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

        batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
        train_feed = {x: batch_xs, y_: batch_ys}

        _, step = sess.run([train_op, global_step], feed_dict=train_feed)
        if step % 100 == 0: 
            print "Done step %d" % step

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

16
2018-06-14 22:46



非常感谢您的解释!我会尝试的。 - xyd
非常感谢!只是贡献我的2美分你的精彩帖子。 tf.merge_all_summaries()似乎已被弃用,并且在最新版本上给出错误要么使用tf.merge_all,要么使用tf.contrib.deprecated.merge_all_summaries - sunil manikani
@sunilmanikani感谢您指出...我已经更新了要使用的代码 tf.summary.merge_all()。 - mrry
FYI,tf.train.Supervisor现已弃用,应改为使用tf.train.MonitoredTrainingSession。看到 tensorflow.org/api_docs/python/tf/train/Supervisor。 - erwaman
我对代码的理解是每个工人都在阅读 充分 输入数据;没有输入数据的分区。我的理解是否正确? - erwaman