Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4630] Fix concurrency problem and split task handle threadpool #4679

Merged
merged 3 commits into from
Dec 19, 2023

Conversation

lrhkobe
Copy link
Contributor

@lrhkobe lrhkobe commented Dec 19, 2023

Fixes #4630.

Motivation

  • Fix the concurrency problem of clients in the same group frequently subscribing and unsubscribing.
  • Split the task handle thread pool to avoid mutual influence between operation commands and message sending and receiving

Modifications

  • Add concurrency control for operation of ClientGroupWrapper when close session;
  • Split task handle thread pool to istinguish between operation commands and message processing, add sendExecutorService,replyExecutorService and ackExecutorService for msg processing.

Comment on lines 61 to +72
@ConfigFiled(field = "tcp.taskHandleExecutorPoolSize")
private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
private int eventMeshTcpTaskHandleExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.sendExecutorPoolSize")
private int eventMeshTcpMsgSendExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.replyExecutorPoolSize")
private int eventMeshTcpMsgReplyExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.ackExecutorPoolSize")
private int eventMeshTcpMsgAckExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is an IO-intensive scenario, with the original single thread pool being split into four, and each thread pool having twice the coreSize as before, the number of threads is 8 times the original. Considering that the 1-RTT time in EventMesh is relatively short, I suggest setting the coreSize of each thread pool to the number of physical CPU cores and the maxSize to double that. What do you think?

@Pil0tXia
Copy link
Member

Pil0tXia commented Dec 19, 2023

CI failed; please have a check.

Besides, the Fixes #<4630>. in your PR text should be Fixes #4630.~

Copy link
Member

@mxsm mxsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lrhkobe CI needs to fix it

log.error("client purpose config is error:{}", session.getClient().getPurpose());
final String clientGroup = session.getClient().getGroup();
if (!lockMap.containsKey(clientGroup)) {
lockMap.putIfAbsent(clientGroup, new Object());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can if (!lockMap.containsKey(clientGroup)) be omitted here?

eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorQueueSize()),
new EventMeshThreadFactory("eventMesh-tcp-msg-ack", true));

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the queue length from a hard-coded value to a configurable value for this ExecutorService, like that of ExecutorServices above?

Copy link

codecov bot commented Dec 19, 2023

Codecov Report

Attention: 55 lines in your changes are missing coverage. Please review.

Comparison is base (03aa825) 17.39% compared to head (fd97a7c) 17.40%.
Report is 1 commits behind head on master.

Files Patch % Lines
...che/eventmesh/runtime/boot/TCPThreadPoolGroup.java 0.00% 19 Missing ⚠️
...che/eventmesh/runtime/boot/EventMeshTCPServer.java 0.00% 11 Missing ⚠️
...ol/tcp/client/group/ClientSessionGroupMapping.java 0.00% 11 Missing ⚠️
...tmesh/runtime/metrics/tcp/EventMeshTcpMonitor.java 0.00% 9 Missing ⚠️
...ntime/configuration/EventMeshTCPConfiguration.java 73.33% 4 Missing ⚠️
...cp/client/rebalance/EventMeshRebalanceService.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4679      +/-   ##
============================================
+ Coverage     17.39%   17.40%   +0.01%     
- Complexity     1759     1760       +1     
============================================
  Files           797      797              
  Lines         29795    29844      +49     
  Branches       2578     2579       +1     
============================================
+ Hits           5182     5194      +12     
- Misses        24132    24170      +38     
+ Partials        481      480       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pandaapo pandaapo merged commit e056d7a into apache:master Dec 19, 2023
12 of 13 checks passed
@xwm1992 xwm1992 added this to the 1.11.0 milestone Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]requests lost
5 participants