Skip to content

Commit 072200f

Browse files
pcmoritzrobertnishihara
authored andcommitted
ARROW-4296: [Plasma] Use one mmap file by default, prevent crash with -f
This PR is similar to apache#3434 but also makes sure we only have one well-tested code path to go through. Author: Philipp Moritz <pcmoritz@gmail.com> Closes apache#3490 from pcmoritz/one-mmap-file and squashes the following commits: c447af5 <Philipp Moritz> remove --verbose f885f49 <Philipp Moritz> reduce plasma store size for test 6072e8f <Philipp Moritz> add verbose flag 990700f <Philipp Moritz> use only one mmapped file
1 parent de84293 commit 072200f

File tree

4 files changed

+33
-41
lines changed

4 files changed

+33
-41
lines changed

cpp/src/plasma/store.cc

+19-23
Original file line numberDiff line numberDiff line change
@@ -905,21 +905,22 @@ class PlasmaStoreRunner {
905905
PlasmaStoreRunner() {}
906906

907907
void Start(char* socket_name, int64_t system_memory, std::string directory,
908-
bool hugepages_enabled, bool use_one_memory_mapped_file) {
908+
bool hugepages_enabled) {
909909
// Create the event loop.
910910
loop_.reset(new EventLoop);
911911
store_.reset(
912912
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
913913
plasma_config = store_->GetPlasmaStoreInfo();
914914

915-
// If the store is configured to use a single memory-mapped file, then we
916-
// achieve that by mallocing and freeing a single large amount of space.
917-
// that maximum allowed size up front.
918-
if (use_one_memory_mapped_file) {
919-
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
920-
ARROW_CHECK(pointer != nullptr);
921-
plasma::dlfree(pointer);
922-
}
915+
// We are using a single memory-mapped file by mallocing and freeing a single
916+
// large amount of space up front. According to the documentation,
917+
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
918+
// bookkeeping.
919+
void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t));
920+
ARROW_CHECK(pointer != nullptr);
921+
// This will unmap the file, but the next one created will be as large
922+
// as this one (this is an implementation detail of dlmalloc).
923+
plasma::dlfree(pointer);
923924

924925
int socket = BindIpcSock(socket_name, true);
925926
// TODO(pcm): Check return value.
@@ -955,15 +956,14 @@ void HandleSignal(int signal) {
955956
}
956957

957958
void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
958-
bool hugepages_enabled, bool use_one_memory_mapped_file) {
959+
bool hugepages_enabled) {
959960
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
960961
// to a client that has already died, the store could die.
961962
signal(SIGPIPE, SIG_IGN);
962963

963964
g_runner.reset(new PlasmaStoreRunner());
964965
signal(SIGTERM, HandleSignal);
965-
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
966-
use_one_memory_mapped_file);
966+
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
967967
}
968968

969969
} // namespace plasma
@@ -975,11 +975,9 @@ int main(int argc, char* argv[]) {
975975
// Directory where plasma memory mapped files are stored.
976976
std::string plasma_directory;
977977
bool hugepages_enabled = false;
978-
// True if a single large memory-mapped file should be created at startup.
979-
bool use_one_memory_mapped_file = false;
980978
int64_t system_memory = -1;
981979
int c;
982-
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
980+
while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
983981
switch (c) {
984982
case 'd':
985983
plasma_directory = std::string(optarg);
@@ -994,14 +992,16 @@ int main(int argc, char* argv[]) {
994992
char extra;
995993
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
996994
ARROW_CHECK(scanned == 1);
995+
// Set system memory, potentially rounding it to a page size
996+
// Also make it so dlmalloc fails if we try to request more memory than
997+
// is available.
998+
system_memory =
999+
plasma::dlmalloc_set_footprint_limit(static_cast<size_t>(system_memory));
9971000
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
9981001
<< static_cast<double>(system_memory) / 1000000000
9991002
<< "GB of memory.";
10001003
break;
10011004
}
1002-
case 'f':
1003-
use_one_memory_mapped_file = true;
1004-
break;
10051005
default:
10061006
exit(-1);
10071007
}
@@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) {
10511051
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
10521052
}
10531053
#endif
1054-
// Make it so dlmalloc fails if we try to request more memory than is
1055-
// available.
1056-
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
10571054
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
1058-
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
1059-
use_one_memory_mapped_file);
1055+
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled);
10601056
plasma::g_runner->Shutdown();
10611057
plasma::g_runner = nullptr;
10621058

cpp/src/plasma/test/client_tests.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class TestPlasmaStore : public ::testing::Test {
6060
std::string plasma_directory =
6161
test_executable.substr(0, test_executable.find_last_of("/"));
6262
std::string plasma_command = plasma_directory +
63-
"/plasma_store_server -m 1000000000 -s " +
63+
"/plasma_store_server -m 10000000 -s " +
6464
store_socket_name_ + " 1> /dev/null 2> /dev/null &";
6565
system(plasma_command.c_str());
6666
ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));

python/pyarrow/plasma.py

-5
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def build_plasma_tensorflow_op():
7878
@contextlib.contextmanager
7979
def start_plasma_store(plasma_store_memory,
8080
use_valgrind=False, use_profiler=False,
81-
use_one_memory_mapped_file=False,
8281
plasma_directory=None, use_hugepages=False):
8382
"""Start a plasma store process.
8483
Args:
@@ -87,8 +86,6 @@ def start_plasma_store(plasma_store_memory,
8786
of valgrind. If this is True, use_profiler must be False.
8887
use_profiler (bool): True if the plasma store should be started inside
8988
a profiler. If this is True, use_valgrind must be False.
90-
use_one_memory_mapped_file: If True, then the store will use only a
91-
single memory-mapped file.
9289
plasma_directory (str): Directory where plasma memory mapped files
9390
will be stored.
9491
use_hugepages (bool): True if the plasma store should use huge pages.
@@ -107,8 +104,6 @@ def start_plasma_store(plasma_store_memory,
107104
command = [plasma_store_executable,
108105
"-s", plasma_store_name,
109106
"-m", str(plasma_store_memory)]
110-
if use_one_memory_mapped_file:
111-
command += ["-f"]
112107
if plasma_directory:
113108
command += ["-d", plasma_directory]
114109
if use_hugepages:

python/pyarrow/tests/test_plasma.py

+13-12
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
3939
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
40+
SMALL_OBJECT_SIZE = 9000
4041

4142

4243
def random_name():
@@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
110111
class TestPlasmaClient(object):
111112

112113
def setup_method(self, test_method):
113-
use_one_memory_mapped_file = (test_method ==
114-
self.test_use_one_memory_mapped_file)
115-
116114
import pyarrow.plasma as plasma
117115
# Start Plasma store.
118116
self.plasma_store_ctx = plasma.start_plasma_store(
119117
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
120-
use_valgrind=USE_VALGRIND,
121-
use_one_memory_mapped_file=use_one_memory_mapped_file)
118+
use_valgrind=USE_VALGRIND)
122119
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
123120
# Connect to Plasma.
124121
self.plasma_client = plasma.connect(self.plasma_store_name)
@@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size):
471468
memory_buffers.append(memory_buffer)
472469
# Remaining space is 50%. Make sure that we can't create an
473470
# object of size 50% + 1, but we can create one of size 20%.
474-
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
471+
assert_create_raises_plasma_full(
472+
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
475473
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
476474
del memory_buffer
477475
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
478476
del memory_buffer
479-
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
477+
assert_create_raises_plasma_full(
478+
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
480479

481480
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
482481
memory_buffers.append(memory_buffer)
483482
# Remaining space is 30%.
484-
assert_create_raises_plasma_full(self, 30 * PERCENT + 1)
483+
assert_create_raises_plasma_full(
484+
self, 30 * PERCENT + SMALL_OBJECT_SIZE)
485485

486486
_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
487487
memory_buffers.append(memory_buffer)
488488
# Remaining space is 20%.
489-
assert_create_raises_plasma_full(self, 20 * PERCENT + 1)
489+
assert_create_raises_plasma_full(
490+
self, 20 * PERCENT + SMALL_OBJECT_SIZE)
490491

491492
def test_contains(self):
492493
fake_object_ids = [random_object_id() for _ in range(100)]
@@ -838,7 +839,7 @@ def test_subscribe_deletions(self):
838839
assert -1 == recv_dsize
839840
assert -1 == recv_msize
840841

841-
def test_use_one_memory_mapped_file(self):
842+
def test_use_full_memory(self):
842843
# Fill the object store up with a large number of small objects and let
843844
# them go out of scope.
844845
for _ in range(100):
@@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self):
851852
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
852853
# Verify that an object that is too large does not fit.
853854
with pytest.raises(pa.lib.PlasmaStoreFull):
854-
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
855-
0)
855+
create_object(self.plasma_client2,
856+
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)
856857

857858
def test_client_death_during_get(self):
858859
import pyarrow.plasma as plasma

0 commit comments

Comments
 (0)