Skip to content

Commit 6e3e8f0

Browse files
peterfu0facebook-github-bot
authored andcommitted
Reduce bulk init time and fix OOM (pytorch#3828)
Summary: X-link: facebookresearch/FBGEMM#911 X-link: facebookresearch/FBGEMM#909 X-link: facebookresearch/FBGEMM#908 Disable compaction when bulk initialize TBE in SSD, this reduce the initialization time from over 5mins to 2-3 mins. Also use bytes as the chunk size rather than row count, as each row might have different dimensions in different TBE, to avoid OOM issue. Differential Revision: D70921864
1 parent b7a4e51 commit 6e3e8f0

File tree

6 files changed

+276
-51
lines changed

6 files changed

+276
-51
lines changed

fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import tempfile
1717
import threading
1818
import time
19-
from math import log2
19+
from math import floor, log2
2020
from typing import Any, Callable, List, Optional, Tuple, Type, Union
2121
import torch # usort:skip
2222

@@ -148,7 +148,8 @@ def __init__(
148148
# Set to False to use cudaMallocManaged
149149
uvm_host_mapped: bool = False,
150150
enable_async_update: bool = True, # whether enable L2/rocksdb write to async background thread
151-
# if > 0, insert all kv pairs to rocksdb at init time, in chunks of *bulk_init_chunk_size* rows
151+
# if > 0, insert all kv pairs to rocksdb at init time, in chunks of *bulk_init_chunk_size* bytes
152+
# number of rows will be decided by bulk_init_chunk_size / size_of_each_row
152153
bulk_init_chunk_size: int = 0,
153154
lazy_bulk_init_enabled: bool = False,
154155
) -> None:
@@ -245,7 +246,7 @@ def __init__(
245246
f"{cache_size / 1024.0 / 1024.0 / 1024.0 : .2f}GB, "
246247
f"weights precision: {weights_precision}, "
247248
f"output dtype: {output_dtype}, "
248-
f"chunk size in bulk init: {bulk_init_chunk_size} rows"
249+
f"chunk size in bulk init: {bulk_init_chunk_size} bytes"
249250
)
250251
self.register_buffer(
251252
"lxu_cache_state",
@@ -766,21 +767,24 @@ def _insert_all_kv(self) -> None:
766767
initailization time.
767768
"""
768769
row_offset = 0
769-
chunk_size = self.bulk_init_chunk_size
770+
row_count = floor(
771+
self.bulk_init_chunk_size
772+
/ (self.max_D * self.weights_precision.as_dtype().itemsize)
773+
)
770774
total_dim0 = 0
771775
for dim0, _ in self.embedding_specs:
772776
total_dim0 += dim0
773777

774778
start_ts = time.time()
775779
chunk_tensor = torch.empty(
776-
chunk_size,
780+
row_count,
777781
self.max_D,
778782
dtype=self.weights_precision.as_dtype(),
779783
device="cuda",
780784
)
781785
cpu_tensor = torch.empty_like(chunk_tensor, device="cpu")
782-
for row_offset in range(0, total_dim0, chunk_size):
783-
actual_dim0 = min(total_dim0 - row_offset, chunk_size)
786+
for row_offset in range(0, total_dim0, row_count):
787+
actual_dim0 = min(total_dim0 - row_offset, row_count)
784788
chunk_tensor.uniform_(
785789
self.ssd_uniform_init_lower, self.ssd_uniform_init_upper
786790
)
@@ -789,9 +793,12 @@ def _insert_all_kv(self) -> None:
789793
# This code is intentionally not calling through the getter property
790794
# to avoid the lazy initialization thread from joining with itself.
791795
self._ssd_db.set_range_to_storage(rand_val, row_offset, actual_dim0)
796+
self.ssd_db.toggle_compaction(True)
792797
end_ts = time.time()
793798
elapsed = int((end_ts - start_ts) * 1e6)
794-
logging.info(f"TBE bulk initialization took {elapsed:_} us")
799+
logging.info(
800+
f"TBE bulk initialization took {elapsed:_} us, bulk_init_chunk_size={self.bulk_init_chunk_size}, each batch of {row_count} rows, total rows of {total_dim0}"
801+
)
795802

796803
@torch.jit.ignore
797804
def _report_duration(

fbgemm_gpu/src/ssd_split_embeddings_cache/embedding_rocksdb_wrapper.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ class EmbeddingRocksDBWrapper : public torch::jit::CustomClassHolder {
8282
return impl_->set_range_to_storage(weights, start, length);
8383
}
8484

85+
void toggle_compaction(bool enable) {
86+
impl_->toggle_compaction(enable);
87+
}
88+
8589
void get(
8690
at::Tensor indices,
8791
at::Tensor weights,

fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_split_table_batched_embeddings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ static auto embedding_rocks_db_wrapper =
460460
.def(
461461
"set_range_to_storage",
462462
&EmbeddingRocksDBWrapper::set_range_to_storage)
463+
.def("toggle_compaction", &EmbeddingRocksDBWrapper::toggle_compaction)
463464
.def(
464465
"get",
465466
&EmbeddingRocksDBWrapper::get,

fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h

Lines changed: 109 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,67 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
290290
options.memtable_prefix_bloom_size_ratio = 0.05;
291291
options.memtable_whole_key_filtering = true;
292292
options.max_background_jobs = num_threads;
293+
// disable auto compactions during bulk init, re-enable once done
294+
// maximum number of concurrent flush operations
295+
options.max_background_flushes = num_threads;
296+
options.disable_auto_compactions = true;
293297
options.env->SetBackgroundThreads(4, rocksdb::Env::HIGH);
294298
options.env->SetBackgroundThreads(1, rocksdb::Env::LOW);
295-
296299
options.max_open_files = -1;
297300

301+
initialize_dbs(num_shards, path, options, use_passed_in_path);
302+
initialize_initializers(
303+
num_shards,
304+
max_D,
305+
uniform_init_lower,
306+
uniform_init_upper,
307+
row_storage_bitwidth);
308+
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(num_shards);
309+
ro_.verify_checksums = false;
310+
ro_.async_io = true;
311+
wo_.disableWAL = true;
312+
wo_.sync = false;
313+
314+
// Setup staggered manual compaction data members
315+
memtable_flush_period_ = memtable_flush_period;
316+
if (memtable_flush_period_ > 0) {
317+
done_staggered_flushes_ = false;
318+
memtable_flush_offset_ = memtable_flush_offset;
319+
l0_files_per_compact_ = l0_files_per_compact;
320+
compaction_period_ = memtable_flush_period_ * l0_files_per_compact *
321+
options.min_write_buffer_number_to_merge;
322+
int64_t period_per_shard = memtable_flush_period_ / num_shards;
323+
CHECK_GT(period_per_shard, 0);
324+
// We want to stagger memory flushes (and then later
325+
// stagger all compactions)
326+
327+
for (int64_t i = 0; i < num_shards; i++) {
328+
shard_flush_compaction_deadlines_.push_back(
329+
memtable_flush_offset_ + (i * period_per_shard));
330+
}
331+
}
332+
}
333+
334+
~EmbeddingRocksDB() override {
335+
// clear all the snapshots if not released
336+
if (snapshots_.size() > 0) {
337+
LOG(WARNING)
338+
<< snapshots_.size()
339+
<< " snapshots have not been released when db is closing. Releasing them now.";
340+
}
341+
snapshots_.clear();
342+
for (auto shard = 0; shard < dbs_.size(); ++shard) {
343+
dbs_[shard]->Close();
344+
}
345+
}
346+
347+
void initialize_dbs(
348+
int64_t num_shards,
349+
std::string path,
350+
rocksdb::Options& options,
351+
bool use_passed_in_path) {
298352
#ifdef FBGEMM_FBCODE
353+
std::string used_path = "";
299354
auto serviceInfo = std::make_shared<facebook::fb_rocksdb::ServiceInfo>();
300355
serviceInfo->oncall = "pyper_training";
301356
serviceInfo->service_name = "ssd_offloading_rocksb";
@@ -307,7 +362,6 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
307362
path = ssd_mount_point;
308363
tbe_uuid = facebook::strings::generateUUID();
309364
}
310-
std::string used_path = "";
311365
#endif
312366
for (auto i = 0; i < num_shards; ++i) {
313367
#ifdef FBGEMM_FBCODE
@@ -350,6 +404,19 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
350404
}
351405
CHECK(s.ok()) << s.ToString();
352406
dbs_.emplace_back(db);
407+
}
408+
#ifdef FBGEMM_FBCODE
409+
LOG(INFO) << "TBE actual used_path: " << used_path;
410+
#endif
411+
}
412+
413+
void initialize_initializers(
414+
int64_t num_shards,
415+
int64_t max_D,
416+
float uniform_init_lower,
417+
float uniform_init_upper,
418+
int64_t row_storage_bitwidth) {
419+
for (auto i = 0; i < num_shards; ++i) {
353420
auto* gen = at::check_generator<at::CPUGeneratorImpl>(
354421
at::detail::getDefaultCPUGenerator());
355422
{
@@ -362,46 +429,6 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
362429
row_storage_bitwidth));
363430
}
364431
}
365-
#ifdef FBGEMM_FBCODE
366-
LOG(INFO) << "TBE actual used_path: " << used_path;
367-
#endif
368-
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(num_shards);
369-
ro_.verify_checksums = false;
370-
ro_.async_io = true;
371-
wo_.disableWAL = true;
372-
wo_.sync = false;
373-
374-
// Setup staggered manual compaction data members
375-
memtable_flush_period_ = memtable_flush_period;
376-
if (memtable_flush_period_ > 0) {
377-
done_staggered_flushes_ = false;
378-
memtable_flush_offset_ = memtable_flush_offset;
379-
l0_files_per_compact_ = l0_files_per_compact;
380-
compaction_period_ = memtable_flush_period_ * l0_files_per_compact *
381-
options.min_write_buffer_number_to_merge;
382-
int64_t period_per_shard = memtable_flush_period_ / num_shards;
383-
CHECK_GT(period_per_shard, 0);
384-
// We want to stagger memory flushes (and then later
385-
// stagger all compactions)
386-
387-
for (int64_t i = 0; i < num_shards; i++) {
388-
shard_flush_compaction_deadlines_.push_back(
389-
memtable_flush_offset_ + (i * period_per_shard));
390-
}
391-
}
392-
}
393-
394-
~EmbeddingRocksDB() override {
395-
// clear all the snapshots if not released
396-
if (snapshots_.size() > 0) {
397-
LOG(WARNING)
398-
<< snapshots_.size()
399-
<< " snapshots have not been released when db is closing. Releasing them now.";
400-
}
401-
snapshots_.clear();
402-
for (auto shard = 0; shard < dbs_.size(); ++shard) {
403-
dbs_[shard]->Close();
404-
}
405432
}
406433

407434
folly::SemiFuture<std::vector<folly::Unit>> get_kv_db_async(
@@ -549,6 +576,46 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
549576
folly::coro::blockingWait(set_kv_db_async(seq_indices, weights, count));
550577
}
551578

579+
virtual rocksdb::Status set_rocksdb_option(
580+
int shard,
581+
const std::string& key,
582+
const std::string& value) {
583+
return dbs_[shard]->SetOptions({{key, value}});
584+
}
585+
586+
void toggle_compaction(bool enable) {
587+
int max_retries = 10;
588+
std::vector<folly::Future<bool>> futures;
589+
for (auto shard = 0; shard < dbs_.size(); ++shard) {
590+
auto f = folly::via(executor_.get()).thenValue([=](folly::Unit) -> bool {
591+
for (int attempt = 0; attempt < max_retries; ++attempt) {
592+
auto val = enable ? "false" : "true";
593+
auto s = set_rocksdb_option(shard, "disable_auto_compactions", val);
594+
if (s.ok()) {
595+
return true;
596+
}
597+
LOG(WARNING) << "Failed to toggle compaction to " << enable
598+
<< " for shard " << shard << ", attempt=" << attempt
599+
<< ", max_retries=" << max_retries << std::endl;
600+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
601+
}
602+
return false;
603+
});
604+
futures.push_back(std::move(f));
605+
}
606+
auto results = folly::coro::blockingWait(folly::collectAll(futures));
607+
for (auto& result : results) {
608+
if (result.hasValue()) {
609+
CHECK(result.value())
610+
<< "Failed to toggle compaction to " << enable << std::endl;
611+
} else {
612+
CHECK(false) << "Failed to toggle compaction to " << enable
613+
<< " with exception " << result.exception().what()
614+
<< std::endl;
615+
}
616+
}
617+
}
618+
552619
int64_t get_max_D() {
553620
return max_D_;
554621
}

0 commit comments

Comments
 (0)