Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ grpc::Status FlashService::IsAlive(
return check_result;

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
response->set_mpp_version(DB::GetMppVersion());
return grpc::Status::OK;
}
Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
Expand Down Expand Up @@ -82,6 +83,47 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & context)
{
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = "flash.graceful_wait_shutdown_timeout";
// The default value of flash.graceful_wait_shutdown_timeout
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = 600;
auto graceful_wait_shutdown_timeout
= context->getUsersConfig()->getUInt64(GRACEFUL_WAIT_SHUTDOWN_TIMEOUT, DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT);
LOG_INFO(log, "Start to wait all MPP tasks to finish, timeout={}s", graceful_wait_shutdown_timeout);
UInt64 graceful_wait_shutdown_timeout_ms = graceful_wait_shutdown_timeout * 1000;
Stopwatch watch;
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::seconds(1));
bool all_tasks_finished = false;
while (true)
{
auto elapsed_ms = watch.elapsedMilliseconds();
if (!all_tasks_finished)
{
std::unique_lock lock(mu);
if (monitored_tasks.empty())
all_tasks_finished = true;
}
if (all_tasks_finished)
{
// Also needs to check if all MPP gRPC connections are finished
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
{
LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms);
break;
}
}
if (elapsed_ms >= graceful_wait_shutdown_timeout_ms)
{
LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ struct MPPTaskMonitor
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

void waitAllMPPTasksFinish(const std::unique_ptr<Context> & context);

std::mutex mu;
std::condition_variable cv;
bool is_shutdown = false;
Expand Down Expand Up @@ -220,6 +222,8 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> monitor;

std::atomic<bool> is_available{true};

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

Expand Down Expand Up @@ -270,6 +274,9 @@ class MPPTaskManager : private boost::noncopyable

bool isTaskExists(const MPPTaskId & id);

void setUnavailable() { is_available = false; }
bool isAvailable() { return is_available; }

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
20 changes: 4 additions & 16 deletions dbms/src/Server/FlashGrpcServerHolder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log)
// tells us whether there is any kind of event or cq is shutting down.
if (!curcq->Next(&tag, &ok))
{
LOG_INFO(log, "CQ is fully drained and shut down");
LOG_DEBUG(log, "CQ is fully drained and shut down");
break;
}
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment();
Expand Down Expand Up @@ -217,21 +217,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
{
/// Shut down grpc server.
LOG_INFO(log, "Begin to shut down flash grpc server");
flash_grpc_server->Shutdown();
Stopwatch watch;
*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));
if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1)
LOG_WARNING(
log,
"Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak",
wait_cnt);
else
LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt);
flash_grpc_server->Shutdown();

for (auto & cq : cqs)
cq->Shutdown();
Expand All @@ -249,7 +237,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
GRPCCompletionQueuePool::global_instance->markShutdown();

GRPCCompletionQueuePool::global_instance = nullptr;
LOG_INFO(log, "Shut down flash grpc server");
LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds());

/// Close flash service.
LOG_INFO(log, "Begin to shut down flash service");
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,12 @@ try
LOG_INFO(log, "Start to wait for terminal signal");
waitForTerminationRequest();

// Note: `waitAllMPPTasksFinish` must be called before stopping the proxy.
// Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully.
LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);

{
// Set limiters stopping and wakeup threads in waitting queue.
global_context->getIORateLimiter().setStop();
Expand Down