-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Kafka] kafka source refactored some reader read logic #6408
Conversation
Please fix the conflict |
2ae71dd
to
c6f8cc2
Compare
c6f8cc2
to
30a76e5
Compare
done |
// ------------------------------------------------------------------------ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ------------------------------------------------------------------------ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
*/ | ||
package org.apache.seatunnel.common.utils; | ||
|
||
public final class TemporaryClassLoaderContext implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this tool!
} | ||
|
||
private void enqueueTaskUnsafe(SplitFetcherTask task) { | ||
assert lock.isHeldByCurrentThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As default, assert
not working. https://docs.oracle.com/cd/E19683-01/806-7930/6jgp65ikq/index.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Please share more information about why we need refactor kafka? And what we can get after refactor? |
+1 |
Firstly, I believe there is room for improvement in the previous approach where a new consumer thread is created for each split, and each consumer thread polls in a loop. Enhancements could involve having one consumer thread per parallelism level, ensuring clearer offset management, and promptly removing consumed partitions to reduce overhead. |
Make sense to me. Thanks @Carl-Zhou-CN ! |
Checking... |
...-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
Show resolved
Hide resolved
}, | ||
{ | ||
rule_type = MAX | ||
rule_value = 149 | ||
rule_value = 99 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for change test case config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original test case did not play a role because he did not read the data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -234,4 +234,19 @@ private void addTaskUnsafe(SplitFetcherTask task) { | |||
taskQueue.add(task); | |||
nonEmpty.signal(); | |||
} | |||
|
|||
public void enqueueTask(SplitFetcherTask task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not direct use
Line 108 in c6f627f
public void addTask(@NonNull SplitFetcherTask task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, the new method is redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @Carl-Zhou-CN
} else { | ||
deserializationSchema.deserialize(consumerRecord.value(), outputCollector); | ||
} | ||
splitState.setCurrentOffset(consumerRecord.offset() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need +1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if ci passes. Thanks @Carl-Zhou-CN !
An attempt to improve kafka source with reference to flink kafka source
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.