blob: 7aaa577057d12f36fb975d9df0cb2e082321ebea [file] [log] [blame]
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/fuchsia/async_dispatcher.h"
#include <lib/async/default.h>
#include <lib/async/task.h>
#include <lib/async/wait.h>
#include <lib/zx/job.h>
#include <lib/zx/socket.h>
#include "base/callback.h"
#include "base/process/launch.h"
#include "base/test/multiprocess_test.h"
#include "base/test/test_timeouts.h"
#include "base/time/time.h"
#include "starboard/types.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/multiprocess_func_list.h"
namespace base {
namespace {
struct TestTask : public async_task_t {
explicit TestTask() {
state = ASYNC_STATE_INIT;
handler = &TaskProc;
deadline = 0;
}
static void TaskProc(async_dispatcher_t* async,
async_task_t* task,
zx_status_t status);
int num_calls = 0;
int repeats = 1;
OnceClosure on_call;
zx_status_t last_status = ZX_OK;
};
// static
void TestTask::TaskProc(async_dispatcher_t* async,
async_task_t* task,
zx_status_t status) {
EXPECT_EQ(async, async_get_default_dispatcher());
EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED)
<< "status: " << status;
auto* test_task = static_cast<TestTask*>(task);
test_task->num_calls++;
test_task->last_status = status;
if (test_task->on_call)
std::move(test_task->on_call).Run();
if (test_task->num_calls < test_task->repeats)
async_post_task(async, task);
};
struct TestWait : public async_wait_t {
TestWait(zx_handle_t handle,
zx_signals_t signals) {
state = ASYNC_STATE_INIT;
handler = &HandleProc;
object = handle;
trigger = signals;
}
static void HandleProc(async_dispatcher_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal);
int num_calls = 0;
OnceClosure on_call;
zx_status_t last_status = ZX_OK;
};
// static
void TestWait::HandleProc(async_dispatcher_t* async,
async_wait_t* wait,
zx_status_t status,
const zx_packet_signal_t* signal) {
EXPECT_EQ(async, async_get_default_dispatcher());
EXPECT_TRUE(status == ZX_OK || status == ZX_ERR_CANCELED)
<< "status: " << status;
auto* test_wait = static_cast<TestWait*>(wait);
test_wait->num_calls++;
test_wait->last_status = status;
if (test_wait->on_call)
std::move(test_wait->on_call).Run();
}
struct TestException : public async_exception_t {
TestException(zx_handle_t handle) {
state = ASYNC_STATE_INIT;
handler = &HandleProc;
task = handle;
options = 0;
}
static void HandleProc(async_dispatcher_t* async,
async_exception_t* wait,
zx_status_t status,
const zx_port_packet_t* packet);
int num_calls = 0;
OnceClosure on_call;
zx_status_t last_status = ZX_OK;
};
// static
void TestException::HandleProc(async_dispatcher_t* async,
async_exception_t* wait,
zx_status_t status,
const zx_port_packet_t* packet) {
EXPECT_EQ(async, async_get_default_dispatcher());
auto* test_wait = static_cast<TestException*>(wait);
test_wait->num_calls++;
test_wait->last_status = status;
if (test_wait->on_call)
std::move(test_wait->on_call).Run();
}
} // namespace
class AsyncDispatcherTest : public MultiProcessTest {
public:
AsyncDispatcherTest() {
dispatcher_ = std::make_unique<AsyncDispatcher>();
async_ = async_get_default_dispatcher();
EXPECT_TRUE(async_);
EXPECT_EQ(zx::socket::create(ZX_SOCKET_DATAGRAM, &socket1_, &socket2_),
ZX_OK);
}
~AsyncDispatcherTest() override = default;
void RunUntilIdle() {
while (true) {
zx_status_t status = dispatcher_->DispatchOrWaitUntil(0);
if (status != ZX_OK) {
EXPECT_EQ(status, ZX_ERR_TIMED_OUT);
break;
}
}
}
protected:
std::unique_ptr<AsyncDispatcher> dispatcher_;
async_dispatcher_t* async_ = nullptr;
zx::socket socket1_;
zx::socket socket2_;
};
TEST_F(AsyncDispatcherTest, PostTask) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
dispatcher_->DispatchOrWaitUntil(0);
EXPECT_EQ(task.num_calls, 1);
EXPECT_EQ(task.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, TaskRepeat) {
TestTask task;
task.repeats = 2;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
RunUntilIdle();
EXPECT_EQ(task.num_calls, 2);
EXPECT_EQ(task.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, DelayedTask) {
TestTask task;
constexpr auto kDelay = TimeDelta::FromMilliseconds(5);
TimeTicks started = TimeTicks::Now();
task.deadline = zx_deadline_after(kDelay.InNanoseconds());
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
zx_status_t status = dispatcher_->DispatchOrWaitUntil(zx_deadline_after(
(kDelay + TestTimeouts::tiny_timeout()).InNanoseconds()));
EXPECT_EQ(status, ZX_OK);
EXPECT_GE(TimeTicks::Now() - started, kDelay);
}
TEST_F(AsyncDispatcherTest, CancelTask) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
ASSERT_EQ(async_cancel_task(async_, &task), ZX_OK);
RunUntilIdle();
EXPECT_EQ(task.num_calls, 0);
}
TEST_F(AsyncDispatcherTest, TaskObserveShutdown) {
TestTask task;
ASSERT_EQ(async_post_task(async_, &task), ZX_OK);
dispatcher_.reset();
EXPECT_EQ(task.num_calls, 1);
EXPECT_EQ(task.last_status, ZX_ERR_CANCELED);
}
TEST_F(AsyncDispatcherTest, Wait) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
// Handler shouldn't be called because the event wasn't signaled.
RunUntilIdle();
EXPECT_EQ(wait.num_calls, 0);
char byte = 0;
EXPECT_EQ(socket2_.write(/*options=*/0, &byte, sizeof(byte),
/*actual=*/nullptr),
ZX_OK);
zx_status_t status = dispatcher_->DispatchOrWaitUntil(
zx_deadline_after(TestTimeouts::tiny_timeout().InNanoseconds()));
EXPECT_EQ(status, ZX_OK);
EXPECT_EQ(wait.num_calls, 1);
EXPECT_EQ(wait.last_status, ZX_OK);
}
TEST_F(AsyncDispatcherTest, CancelWait) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
char byte = 0;
EXPECT_EQ(socket2_.write(/*options=*/0, &byte, sizeof(byte),
/*actual=*/nullptr),
ZX_OK);
EXPECT_EQ(async_cancel_wait(async_, &wait), ZX_OK);
RunUntilIdle();
EXPECT_EQ(wait.num_calls, 0);
}
TEST_F(AsyncDispatcherTest, WaitShutdown) {
TestWait wait(socket1_.get(), ZX_SOCKET_READABLE);
EXPECT_EQ(async_begin_wait(async_, &wait), ZX_OK);
RunUntilIdle();
dispatcher_.reset();
EXPECT_EQ(wait.num_calls, 1);
EXPECT_EQ(wait.last_status, ZX_ERR_CANCELED);
}
// Sub-process which crashes itself, to generate an exception-port event.
MULTIPROCESS_TEST_MAIN(AsyncDispatcherCrashingChild) {
IMMEDIATE_CRASH();
return 0;
}
TEST_F(AsyncDispatcherTest, BindExceptionPort) {
zx::job child_job;
ASSERT_EQ(zx::job::create(*zx::job::default_job(), 0, &child_job), ZX_OK);
// Bind |child_job|'s exception port to the dispatcher.
TestException exception(child_job.get());
EXPECT_EQ(async_bind_exception_port(async_, &exception), ZX_OK);
// Launch a child process in the job, that will immediately crash.
LaunchOptions options;
options.job_handle = child_job.get();
Process child =
SpawnChildWithOptions("AsyncDispatcherCrashingChild", options);
ASSERT_TRUE(child.IsValid());
// Wait for the exception event to be handled.
EXPECT_EQ(
dispatcher_->DispatchOrWaitUntil(
(TimeTicks::Now() + TestTimeouts::action_max_timeout()).ToZxTime()),
ZX_OK);
EXPECT_EQ(exception.num_calls, 1);
EXPECT_EQ(exception.last_status, ZX_OK);
EXPECT_EQ(async_unbind_exception_port(async_, &exception), ZX_OK);
ASSERT_EQ(child_job.kill(), ZX_OK);
}
TEST_F(AsyncDispatcherTest, CancelExceptionPort) {
zx::job child_job;
ASSERT_EQ(zx::job::create(*zx::job::default_job(), 0, &child_job), ZX_OK);
// Bind |child_job|'s exception port to the dispatcher.
TestException exception(child_job.get());
EXPECT_EQ(async_bind_exception_port(async_, &exception), ZX_OK);
// Tear-down the dispatcher, and verify that the |exception| is cancelled.
dispatcher_ = nullptr;
EXPECT_EQ(exception.num_calls, 1);
EXPECT_EQ(exception.last_status, ZX_ERR_CANCELED);
ASSERT_EQ(child_job.kill(), ZX_OK);
}
} // namespace base