Commit aa3b8775 authored by cypof's avatar cypof
Browse files

Distributed training

parent f7f33dc8
PROJECT := caffe
CONFIG_FILE := Makefile.config
CONFIG_FILE ?= Makefile.config
include $(CONFIG_FILE)
BUILD_DIR_LINK := $(BUILD_DIR)
......@@ -269,7 +269,7 @@ endif
# Debugging
ifeq ($(DEBUG), 1)
COMMON_FLAGS += -DDEBUG -g -O0
COMMON_FLAGS += -DDEBUG -g -O0 -DBOOST_NOINLINE='__attribute__ ((noinline))'
NVCCFLAGS += -G
else
COMMON_FLAGS += -DNDEBUG -O2
......@@ -291,6 +291,11 @@ ifeq ($(CPU_ONLY), 1)
COMMON_FLAGS += -DCPU_ONLY
endif
ifeq ($(RDMA), 1)
COMMON_FLAGS += -DRDMA
LIBRARIES += ibverbs ibumad
endif
# BLAS configuration (default = ATLAS)
BLAS ?= atlas
ifeq ($(BLAS), mkl)
......@@ -549,6 +554,13 @@ $(GTEST_OBJ): $(GTEST_SRC) | $(GTEST_BUILD_DIR)
@ cat $@.$(WARNS_EXT)
@ echo
$(OBJ_BUILD_DIR)/%.cuo: src/$(PROJECT)/%.cu $(HXX_SRCS) \
| $(LAYER_BUILD_DIR)
$(CUDA_DIR)/bin/nvcc $(NVCCFLAGS) $(CUDA_ARCH) -c $< -o $@ 2> $@.$(WARNS_EXT) \
|| (cat $@.$(WARNS_EXT); exit 1)
@ cat $@.$(WARNS_EXT)
@ echo
$(LAYER_BUILD_DIR)/%.cuo: src/$(PROJECT)/layers/%.cu $(HXX_SRCS) \
| $(LAYER_BUILD_DIR)
$(CUDA_DIR)/bin/nvcc $(NVCCFLAGS) $(CUDA_ARCH) -c $< -o $@ 2> $@.$(WARNS_EXT) \
......
......@@ -7,6 +7,9 @@
# CPU-only switch (uncomment to build without GPU support).
# CPU_ONLY := 1
# Parallelization over InfiniBand or RoCE
# RDMA := 1
# To customize your choice of compiler, uncomment and set the following.
# N.B. the default for Linux is g++ and the default for OSX is clang++
# CUSTOM_CXX := g++
......
#include <caffe/parallel.hpp>
#include <caffe/proto/caffe.pb.h>
#include <caffe/solver.hpp>
#include <glog/logging.h>
#include <unistd.h>
#include <sstream>
#include <string>
#include <vector>
using namespace std;
using namespace caffe;
// Shared code for parallel examples. Should be replaced by some kind of cluster
// deployment and visualization solution.
// Context for a solver running in a thread. Both initialization and run
// of the solver are done on the thread, to point to the same instance of the
// thread-local Caffe singleton.
class SolverContext : public Threaded {
public:
// Main solver does testing, display, snapshots etc.
SolverContext(const Params<float>& params,
const SolverParameter& solver_param, Solver<float>* solver)
: params_(params),
solver_param_(solver_param),
worker_(solver == NULL),
solver_(solver) {
if (worker_) {
solver_param_.clear_display();
solver_param_.clear_snapshot();
}
}
virtual void create_solver() {
if (worker_) {
solver_ = new SGDSolver<float>(solver_param_, true);
CHECK(!solver_->test_nets().size()); // Only training
}
}
virtual void delete_solver() {
if (worker_)
delete solver_;
}
inline Solver<float>* solver() const {
return solver_;
}
virtual void stats(ostream& s) const {
}
protected:
const Params<float>& params_;
SolverParameter solver_param_;
const bool worker_;
Solver<float>* solver_;
};
// Runs a CPU solver on a thread
class CPUContext : public SolverContext {
public:
CPUContext(const Params<float>& params, const SolverParameter& solver_param,
Solver<float>* solver = NULL)
: SolverContext(params, solver_param, solver) {
}
void run() {
create_solver();
params_.configure(solver_);
solver_->Solve();
// Wait until asked to stop before destroying, monitor might
// still be accessing fields
if (worker_)
while (!must_stop())
sleep(1);
delete_solver();
}
};
#ifndef CPU_ONLY
// Runs a GPU solver on a thread
class GPUContext : public SolverContext {
public:
GPUContext(const Params<float>& params, const SolverParameter& solver_param,
GPUParams<float>* gpu_params, Solver<float>* solver = NULL)
: SolverContext(params, solver_param, solver),
gpu_params_(gpu_params) {
}
void run() {
create_solver();
gpu_params_->configure(solver_);
solver_->Solve();
// Wait until asked to stop before destroying, monitor might
// still be accessing fields
if (worker_)
while (!must_stop())
sleep(1);
delete_solver();
}
protected:
GPUParams<float>* gpu_params_;
};
// Runs a GPU solver on a thread with CPU sync
class CPUGPUContext : public SolverContext {
public:
CPUGPUContext(const Params<float>& params,
const SolverParameter& solver_param, Solver<float>* solver =
NULL)
: SolverContext(params, solver_param, solver),
gpu_params_(),
sync_() {
}
void run() {
create_solver();
gpu_params_ = new GPUParams<float>(params_, solver_param_.device_id());
sync_ = new CPUGPUSync<float>(*gpu_params_);
gpu_params_->configure(solver_);
sync_->start();
solver_->Solve();
// Wait until asked to stop before destroying, monitor might
// still be accessing fields
if (worker_)
while (!must_stop())
sleep(1);
delete sync_;
delete gpu_params_;
delete_solver();
}
virtual void stats(ostream& s) const {
s << "GPU " << solver_param_.device_id() << " ";
if (sync_) {
sync_->calls().show(s);
s << ", ";
sync_->cycles().show(s);
} else
s << "starting";
s << ", ";
}
protected:
GPUParams<float>* gpu_params_;
CPUGPUSync<float>* sync_;
};
#endif
// Displays stats about a set of solvers. Also keeps track and updates
// the global count of iterations (needed to adjust hyperparams).
class Monitor : public Threaded {
public:
Monitor(Params<float>& params, const vector<SolverContext*>& solvers)
: params_(params),
solvers_(solvers),
total_iters_("total") {
}
virtual ~Monitor() {
}
void step(ostream* s = NULL) {
*s << "Monitor - iters: ";
int total = 0;
bool all = true; // TODO remove
for (int i = 0; i < solvers_.size(); ++i) {
SolverContext* ctx = solvers_[i];
int n = ctx->solver() ? ctx->solver()->iter() : 0;
total += n;
if (s)
*s << n << ", ";
if (!n)
all = false;
}
if (all) {
//cudaProfilerStart();
//LOG(INFO)<< "Started profiler\n";
}
params_.iterations(total);
total_iters_.value(total);
if (s) {
total_iters_.show(*s);
*s << ", ";
for (int i = 0; i < solvers_.size(); ++i)
solvers_[i]->stats(*s);
}
}
void run() {
int every_seconds = 10;
time_t start = time(0);
while (!must_stop()) {
sleep(every_seconds);
ostringstream s;
step(&s);
s << "\n";
LOG(INFO)<< s.str();
LOG(INFO)<< "Training time: " << (time(0) - start);
}
}
protected:
Params<float>& params_;
const vector<SolverContext*>& solvers_;
Meter total_iters_;
};
name: "CIFAR10_full"
layers {
name: "cifar"
type: DATA
top: "data"
top: "label"
data_param {
source: "/scratch/cifar10_train"
backend: LMDB
rand_skip: 10000
batch_size: 16
}
transform_param {
mean_file: "/data/shared/cifar10_mean"
}
include: { phase: TRAIN }
}
layers {
name: "cifar"
type: DATA
top: "data"
top: "label"
data_param {
source: "/scratch/cifar10_val"
backend: LMDB
batch_size: 100
}
transform_param {
mean_file: "/data/shared/cifar10_mean"
}
include: { phase: TEST }
}
layers {
name: "conv1"
type: CONVOLUTION
bottom: "data"
top: "conv1"
blobs_lr: 1
blobs_lr: 2
convolution_param {
num_output: 32
pad: 2
kernel_size: 5
stride: 1
weight_filler {
type: "gaussian"
std: 0.0001
}
bias_filler {
type: "constant"
}
}
}
layers {
name: "pool1"
type: POOLING
bottom: "conv1"
top: "pool1"
pooling_param {
pool: MAX
kernel_size: 3
stride: 2
}
}
layers {
name: "relu1"
type: RELU
bottom: "pool1"
top: "pool1"
}
layers {
name: "norm1"
type: LRN
bottom: "pool1"
top: "norm1"
lrn_param {
norm_region: WITHIN_CHANNEL
local_size: 3
alpha: 5e-05
beta: 0.75
}
}
layers {
name: "conv2"
type: CONVOLUTION
bottom: "norm1"
top: "conv2"
blobs_lr: 1
blobs_lr: 2
convolution_param {
num_output: 32
pad: 2
kernel_size: 5
stride: 1
weight_filler {
type: "gaussian"
std: 0.01
}
bias_filler {
type: "constant"
}
}
}
layers {
name: "relu2"
type: RELU
bottom: "conv2"
top: "conv2"
}
layers {
name: "pool2"
type: POOLING
bottom: "conv2"
top: "pool2"
pooling_param {
pool: AVE
kernel_size: 3
stride: 2
}
}
layers {
name: "norm2"
type: LRN
bottom: "pool2"
top: "norm2"
lrn_param {
norm_region: WITHIN_CHANNEL
local_size: 3
alpha: 5e-05
beta: 0.75
}
}
layers {
name: "conv3"
type: CONVOLUTION
bottom: "norm2"
top: "conv3"
convolution_param {
num_output: 64
pad: 2
kernel_size: 5
stride: 1
weight_filler {
type: "gaussian"
std: 0.01
}
bias_filler {
type: "constant"
}
}
}
layers {
name: "relu3"
type: RELU
bottom: "conv3"
top: "conv3"
}
layers {
name: "pool3"
type: POOLING
bottom: "conv3"
top: "pool3"
pooling_param {
pool: AVE
kernel_size: 3
stride: 2
}
}
layers {
name: "ip1"
type: INNER_PRODUCT
bottom: "pool3"
top: "ip1"
blobs_lr: 1
blobs_lr: 2
weight_decay: 250
weight_decay: 0
inner_product_param {
num_output: 10
weight_filler {
type: "gaussian"
std: 0.01
}
bias_filler {
type: "constant"
}
}
}
layers {
name: "accuracy"
type: ACCURACY
bottom: "ip1"
bottom: "label"
top: "accuracy"
include: { phase: TEST }
}
layers {
name: "loss"
type: SOFTMAX_LOSS
bottom: "ip1"
bottom: "label"
top: "loss"
}
# The train/test net protocol buffer definition
net: "examples/parallel/cifar.prototxt"
# test_iter specifies how many forward passes the test should carry out.
# In the case of CIFAR10, we have test batch size 100 and 100 test iterations,
# covering the full 10,000 testing images.
test_iter: 100
# Carry out testing every 1000 training iterations.
test_interval: 1000
# The base learning rate, momentum and the weight decay of the network.
base_lr: 0.001
momentum: 0.0
weight_decay: 0.004
# The learning rate policy
lr_policy: "fixed"
# Display every 200 iterations
display: 200
# The maximum number of iterations
max_iter: 60000
# snapshot intermediate results
snapshot: 0
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/detail/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <caffe/net.hpp>
#include <caffe/parallel.hpp>
#include <caffe/proto/caffe.pb.h>
#include <caffe/solver.hpp>
#include <caffe/util/io.hpp>
#include <glog/logging.h>
#include <stdio.h>
#include <cstdlib>
#include <string>
#include <vector>
#include "base.hpp"
using namespace std;
using namespace caffe;
#ifndef CPU_ONLY
// Trains a net on multiple GPUs on one box. C.f. GPUSync in parallel.h.
//
// Example launch on GPU 0 and 1:
// make -j
// export LD_LIBRARY_PATH=/usr/local/lib:/usr/local/cuda/lib64
// export GLOG_logtostderr=1
// build/examples/parallel/gpus.bin examples/parallel/mnist_solver.prototxt 0:1
int main(int argc, char** argv) {
::google::InitGoogleLogging(argv[0]);
::google::InstallFailureSignalHandler();
if (argc != 3) {
printf("Usage: gpus.bin solver_proto_file gpu_id[:gpu_id][...]\n");
return 1;
}
SolverParameter solver_param;
ReadProtoFromTextFile(argv[1], &solver_param);
vector<int> gpus;
vector<string> gpu_strings;
boost::split(gpu_strings, argv[2], boost::is_any_of(":"));
for (int i = 0; i < gpu_strings.size(); ++i)
gpus.push_back(atoi(gpu_strings[i].c_str()));
solver_param.set_device_id(gpus[0]);
SGDSolver<float> main(solver_param);
// Shared network weights
Params<float> params(main.net()->params());
// Create contexts
vector<SolverContext*> solvers(gpus.size());
solvers[0] = new CPUGPUContext(params, solver_param, &main);
for (int i = 1; i < gpus.size(); ++i) {
solver_param.set_device_id(gpus[i]);
solvers[i] = new CPUGPUContext(params, solver_param);
solvers[i]->start();
}
// Start monitor
Monitor monitor(params, solvers);
monitor.start();
// Run main on current thread
solvers[0]->run();
monitor.stop();
LOG(INFO)<< "Monitor stop\n";
for (int i = 1; i < solvers.size(); ++i)
solvers[i]->stop();
for (int i = 1; i < solvers.size(); ++i)
delete solvers[i];
}
#else
int main(int argc, char *argv[]) {
}
#endif
#include <cstdlib>
#include <string>
#include <stdio.h>
#include <iostream>
#include <cstring>
#include <sstream>
#include <pthread.h>
#include <glog/logging.h>
#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string.hpp>
#include <sys/socket.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <netdb.h>
#include <caffe/caffe.hpp>
#include "caffe/filler.hpp"
#include "caffe/parallel.hpp"
#include "base.hpp"
using namespace std;
using namespace caffe;
// Trains a net in parallel on multiple CPU cores. C.f. CPUSync in parallel.h.
//
// Your BLAS library needs to let the application manage its threads, e.g.
// for OpenBLAS, compile with no threading (USE_THREAD = 0 in Makefile.rule).
// Performance is linear at first, but then plateaus on large nets as the number
// of cores is increased, probably as the CPU runs out of memory bandwidth.
//
// Example launch on 4 cores:
// make -j
// export LD_LIBRARY_PATH=<single thread BLAS>:/usr/local/lib:/usr/local/cuda/lib64
// export GLOG_logtostderr=1
// build/examples/parallel/hogwild.bin examples/parallel/mnist_solver.prototxt 4
int main(int argc, char** argv) {
::google::InitGoogleLogging(argv[0]);
::google::InstallFailureSignalHandler();
if (argc < 2 || argc > 3) {
printf("Usage: hogwild.bin solver_proto_file [number_of_cores]\n");
return 1;
}
SolverParameter solver_param;
ReadProtoFromTextFile(argv[1], &solver_param);
int cores = argc == 3 ? atoi(argv[2]) : sysconf(_SC_NPROCESSORS_ONLN);
// Override in code so that proto file can be shared with other examples
solver_param.set_solver_mode(SolverParameter::CPU);
// Main solver
SGDSolver<float> main(solver_param);
// Shared network weights
Params<float> params(main.net()->params());