加入收藏 | 设为首页 | 会员中心 | 我要投稿 银川站长网 (https://www.0951zz.com/)- 云通信、基础存储、云上网络、机器学习、视觉智能!
当前位置: 首页 > 站长学院 > MySql教程 > 正文

mysql的thread_running数量解析

发布时间:2023-07-15 13:55:06 所属栏目:MySql教程 来源:
导读:本篇内容主要讲解“mysql的thread_running数量分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“mysql的thread_running数量分析”吧!thread

本篇内容主要讲解“mysql的thread_running数量分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“mysql的thread_running数量分析”吧!

thread pool的原理:

已在server层完成解析;

层创建多组常驻线程,用于接收客户端连接发送的query并代为执行,而不是为每个连接单独创建一个线程。

层进行running thread数量判断,如果达到阈值则直接报错或sleep。

状态变量记录了当前并发执行stmt/command的数量,执行前加1执行后减1;

突然飙高的诱因:

客户端连接暴增;

系统性能瓶颈,如CPU,IO或者mem swap;

异常sql;

会表现出hang住的假象。

执行,为此引入两个阈值low_watermark和high_watermark,以及变量threads_running_ctl_mode(selects或者all );

前,检查thread_running,

若其已达high_watermark阈值则直接拒绝执行并返回错误:mysql server is too busy

若其位于low和high之间,则sleep 5ms,然后继续尝试,累计等待100ms后则执行

对于已经开启事务和super用户,不做限制

控制query类型:SELECTS/ALL,默认为SELECTS,表示只影响SELECT语句

源码见注1

优化为基于FIFO的cond-wait/signal(实现8个FIFO);

高水位限流(这点保持不变);

低水位优化;其他解决方案:mariadb开发thread pool,percona在其上实现了优先队列;

优势:思路与thread pool一致,但代码更简洁(不到1000行);而且增加了特定query的过滤;

代码见注2

新增thread_active记录并发线程数,位于mysql_execute_command(sql解析之后),高水位则在query解析之前判断;

只统计select/DML,而commit/rollback则放过。

采用FIFO,当thread_active >= thread_running_low_watermark时进程进入FIFO等待,其他线程执行完sql后唤醒FIFO;

内,同时引入threads_running_wait_timeout控制线程在FIFO最大等待时间,超时则直接报错返回。

引入8个FIFO,降低了进出FIFO的锁竞争,线程采用RR分配到不同fifo,每个队列限制并发运行线程为threads_running_low_watermark/8。

,开始执行query,[解析后进行低水位判断,若通过则执行],执行当前sql完毕后,thread可能发起新query,则重复[]过程。

:进入FIFO排队最长时间,等待超时后sql被拒,默认100,单位为毫秒ms。

当前并发SELECT/INSERT/UPDATE/DELETE执行的线程数目;

:当前进入到FIFO中等待的线程数目;

未打补丁版本,设置innodb_thread_concurrency=0

未打补丁版本,innodb_thread_concurrency=32

低水位限流补丁版本(活跃线程数不超过64)

注1

http://www.gpfeng.com/wp-content/uploads/2013/09/threads_running_control.txt

+static my_bool thread_running_control(THD *thd, ulong tr)

+{

+ int slept_cnt= 0;

+ ulong tr_low, tr_high;

+ DBUG_ENTER("thread_running_control");

+ /* 

+  Super user/slave thread will not be affected at any time,

+  transactions that have already started will continue.

+ */

+ if ( thd->security_ctx->master_access & SUPER_ACL|| --对于super权限的用户和已经开启的事务不做限制

+    thd->in_active_multi_stmt_transaction() ||

+   thd->slave_thread) 

+  DBUG_RETURN(FALSE);

+

+ /* 

+  To promise that tr_low will never be greater than tr_high, 

+  as values may be changed between these two statements.

+  eg. 

+    (low, high) = (200, 500)

+    1. read low = 200

+    2. other sessions: set low = 20; set high = 80

+    3. read high = 80

+  Don't take a lock here to avoid lock contention.

+ */

+ do 

+ {

+  tr_low= thread_running_low_watermark;

+  tr_high= thread_running_high_watermark;

+

+ } while (tr_low > tr_high);

+

+check_buzy:

+ /* tr_high is promised to be non-zero.*/ 

+ if ((tr_low == 0 && tr < tr_high) || (tr_low != 0 && tr < tr_low))

+  DBUG_RETURN(FALSE);

+ if (tr >= tr_high)

+ { 

+  int can_reject= 1;

+

+  /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */

+  if (thread_running_ctl_mode == 0)

+  {

+   int query_is_select= 0;

+   if (thd->query_length() >= 8)

+   {

+    char *p= thd->query(); --读取query text的前6个字符,以判断是否为select

+    if (my_toupper(system_charset_info, p[0]) == 'S' &&

+      my_toupper(system_charset_info, p[1]) == 'E' &&

+      my_toupper(system_charset_info, p[2]) == 'L' &&

+      my_toupper(system_charset_info, p[3]) == 'E' &&

+      my_toupper(system_charset_info, p[4]) == 'C' &&

+      my_toupper(system_charset_info, p[5]) == 'T')

+

+     query_is_select= 1;

+   }

+

+   if (!query_is_select)

+    can_reject= 0;

+  }

+

+  if (can_reject)

+  {

+   inc_thread_rejected();

+   DBUG_RETURN(TRUE);

+  }

+  else

+   DBUG_RETURN(FALSE);

+ }

+  

+ if (tr_low != 0 && tr >= tr_low)

+ {

+  /* 

+   If total slept time exceed 100ms and thread running does not

+   reach high watermark, let it in.

+  */

+  if (slept_cnt >= 20)

+   DBUG_RETURN(FALSE);

+  

+  dec_thread_running()

+  

+  /* wait for 5ms. */

+  my_sleep(5000UL); 

+

+  slept_cnt++;

+  tr= inc_thread_running() - 1;

+  

+  goto check_buzy;

+ }

+

+ DBUG_RETURN(FALSE);

+}

+

+/**

  Perform one connection-level (COM_XXXX) command.

  @param command    type of command to perform

@@ -1016,7 +1126,8 @@

  thd->set_query_id(get_query_id());

  if (!(server_command_flags[command] & CF_SKIP_QUERY_ID))

   next_query_id();

- inc_thread_running();

+ /* remember old value of thread_running for *thread_running_control*. */

+ int32 tr= inc_thread_running() - 1;

  if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))

   statistic_increment(thd->status_var.questions, &LOCK_status);

@@ -1129,6 +1240,13 @@

  {

   if (alloc_query(thd, packet, packet_length))

    break;                // fatal error is set

+

+  if (thread_running_control(thd, (ulong)tr))

+  {

+   my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));

+   break;

+  }

+

   MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""), &thd->security_ctx->priv_user[0])

注2 

http://www.gpfeng.com/wp-content/uploads/2014/01/tr-control.diff_.txt 

+/**

  Perform one connection-level (COM_XXXX) command.

  @param command    type of command to perform

@@ -1177,7 +1401,7 @@

   command= COM_SHUTDOWN;

  }

  thd->set_query_id(next_query_id());

- inc_thread_running();

+ int32 tr= inc_thread_running();

 

  if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))

   statistic_increment(thd->status_var.questions, &LOCK_status);

@@ -1209,6 +1433,15 @@

   goto done;

  }

+ if (command == COM_QUERY && alloc_query(thd, packet, packet_length))

+  goto endof_case;        // fatal error is set

+

+ if (thread_running_control_high(thd, tr))

+ {

+  my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));

+  goto endof_case;

+ }

+

  switch (command) {

  case COM_INIT_DB:

  {

@@ -1311,8 +1544,6 @@

  }

  case COM_QUERY:

  {

-  if (alloc_query(thd, packet, packet_length))

-   break;                // fatal error is set

   MYSQL_QUERY_START(thd->query(), thd->thread_id,

            (char *) (thd->db ? thd->db : ""),

            &thd->security_ctx->priv_user[0],

@@ -1751,6 +1982,7 @@

   my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));

   break;

  }

+endof_case:

 done:

  DBUG_ASSERT(thd->derived_tables == NULL &&

@@ -2502,12 +2734,37 @@

  Opt_trace_array trace_command_steps(&thd->opt_trace, "steps");

  DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE);

+ bool count_active= false;

  if (need_traffic_control(thd, lex->sql_command))

  {

   thd->killed = THD::KILL_QUERY;

   goto error;

  }

+

+ switch (lex->sql_command) {

+

+ case SQLCOM_SELECT:

+ case SQLCOM_UPDATE:

+ case SQLCOM_UPDATE_MULTI:

+ case SQLCOM_DELETE:

+ case SQLCOM_DELETE_MULTI:

+ case SQLCOM_INSERT:

+ case SQLCOM_INSERT_SELECT:

+ case SQLCOM_REPLACE:

+ case SQLCOM_REPLACE_SELECT:

+  count_active= true;

+  break;

+ default:

+  break;

+ }

+

+ if (count_active && thread_running_control_low_enter(thd))

+ {

+  my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0));

+  goto error;

+ }

+

  status_var_increment(thd->status_var.com_stat[lex->sql_command]);

  switch (gtid_pre_statement_checks(thd))

@@ -4990,6 +5247,9 @@

 finish:

+ if (count_active)

+  thread_running_control_low_exit(thd);

+

  DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() ||

        thd->in_multi_stmt_transaction_mode());

+static my_bool thread_running_control_high(THD *thd, int32 tr)

+{

+ int32 tr_high;

+ DBUG_ENTER("thread_running_control_high");

+

+ tr_high= (int32)thread_running_high_watermark;

+

+ /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */

+ if ((!tr_high || tr <= tr_high) ||

+   thd->transaction.is_active() ||

+   thd->get_command() != COM_QUERY ||

+   thd->security_ctx->master_access & SUPER_ACL ||

+   thd->slave_thread)

+  DBUG_RETURN(FALSE);

+

+ const char *query= thd->query();

+ uint32 len= thd->query_length();

+

+ if ((!has_prefix(query, len, "SELECT", 6) && thread_running_ctl_mode == 0) || --不再是逐个字符判断

+   has_prefix(query, len, "COMMIT", 6) ||

+   has_prefix(query, len, "ROLLBACK", 8))

+  DBUG_RETURN(FALSE);

+

+ /* confirm again*/

+ if (tr > tr_high && get_thread_running() > tr_high)

+ {

+  __sync_add_and_fetch(&thread_rejected, 1);

+  DBUG_RETURN(TRUE);

+ }

+

+ DBUG_RETURN(FALSE);

+}

+

+static my_bool thread_running_control_low_enter(THD *thd)

+{

+ int res= 0;

+ int32 tr_low;

+ my_bool ret= FALSE;

+ my_bool slept= FALSE;

+ struct timespec timeout;

+ Thread_conc_queue *queue;

+ DBUG_ENTER("thread_running_control_low_enter");

+

+ /* update global status */

+ __sync_add_and_fetch(&thread_active, 1);

+

+ tr_low= (int32)queue_tr_low_watermark;

+ queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE;

+

+ queue->lock();--问1:在进行低水位判断前,先锁定FIFO,避免低水位验证失败时无法获取FIFO锁进而不能放入FIFO;

+

+retry:

+

+ if ((!tr_low || queue->thread_active < tr_low) ||

+   (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) ||

+   (!slept && (thd->transaction.is_active() ||

+    thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread)))

+ {

+  queue->thread_active++; --判断是否满足进入FIFO条件,如不满足则立即更新thread_active++,解锁queue并退出;

+  queue->unlock();

+  DBUG_RETURN(ret);

+ }

+

+ if (!slept)

+ {

+  queue->unlock();

+

+  /* sleep for 500 us */

+  my_sleep(500);

+  slept= TRUE;

+  queue->lock();

+

+  goto retry;

+ }

+

+ /* get a free wait-slot */

+ Thread_wait_slot *slot= queue->pop_free();

+

+ /* can't find a free wait slot, must let the query enter */

+ if (!slot)-- 当FIFO都满了,即无法把当前线程放入,则必须放行让该sql正常执行

+ {

+  queue->thread_active++;

+  queue->unlock();

+  DBUG_RETURN(ret);

+ }

+

+ slot->signaled= false;

+ slot->wait_ended= false;

+

+ /* put slot into waiting queue. */

+ queue->push_back_wait(slot);

+ queue->thread_wait++;

+

+ queue->unlock();

+

+ /* update global status */

+ thd_proc_info(thd, "waiting in server fifo");

+ __sync_sub_and_fetch(&thread_active, 1);

+ __sync_add_and_fetch(&thread_wait, 1);

+

+ /* cond-wait for at most thread_running_wait_timeout(ms). */

+ set_timespec_nsec(timeout, thread_running_wait_timeout_ns);

+

+ mysql_mutex_lock(&slot->mutex);

+ while (!slot->signaled)

+ {

+  res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout);

+  /* no need to signal if cond-wait timedout */

+  slot->signaled= true;

+ }

+ mysql_mutex_unlock(&slot->mutex);

+

+ queue->lock();

+ queue->thread_wait--;

+ queue->thread_active++;

+

+ /* remove slot from waiting queue. */

+ queue->remove_wait(slot);

+ /* put slot into the free queue for reuse. */

+ queue->push_back_free(slot);

+

+ queue->unlock();

+

+ /* update global status */

+ __sync_sub_and_fetch(&thread_wait, 1);

+ __sync_add_and_fetch(&thread_active, 1);

+ thd_proc_info(thd, 0);

+

+ if (res == ETIMEDOUT || res == ETIME)

+ {

+  ret= TRUE; // indicate that query is rejected.

+  __sync_add_and_fetch(&thread_rejected, 1);

+ }

+

+ DBUG_RETURN(ret);

+}

到此,相信大家对“mysql的thread_running数量分析”有了更深的了解,不妨来实际操作一番吧!

(编辑:银川站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章