Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Kafka] kafka source refactored some reader read logic #6408

Merged
merged 14 commits into from
Sep 5, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.common.utils;

public final class TemporaryClassLoaderContext implements AutoCloseable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this tool!


/**
* Sets the context class loader to the given ClassLoader and returns a resource that sets it
* back to the current context ClassLoader when the resource is closed.
*
* <pre>{@code
* try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classloader)) {
* // code that needs the context class loader
* }
* }</pre>
*/
public static TemporaryClassLoaderContext of(ClassLoader cl) {
final Thread t = Thread.currentThread();
final ClassLoader original = t.getContextClassLoader();

t.setContextClassLoader(cl);

return new TemporaryClassLoaderContext(t, original);
}

private final Thread thread;

private final ClassLoader originalContextClassLoader;

private TemporaryClassLoaderContext(Thread thread, ClassLoader originalContextClassLoader) {
this.thread = thread;
this.originalContextClassLoader = originalContextClassLoader;
}

@Override
public void close() {
thread.setContextClassLoader(originalContextClassLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,19 @@ private void addTaskUnsafe(SplitFetcherTask task) {
taskQueue.add(task);
nonEmpty.signal();
}

public void enqueueTask(SplitFetcherTask task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, the new method is redundant

lock.lock();
try {
enqueueTaskUnsafe(task);
} finally {
lock.unlock();
}
}

private void enqueueTaskUnsafe(SplitFetcherTask task) {
assert lock.isHeldByCurrentThread();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
I believe that this ReentrantLock usage is the same as depicted in the diagram, only occurring during testing and debugging. It can be disabled in production, but of course, it can also be enabled for added safety.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

taskQueue.add(task);
nonEmpty.signal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class ConsumerMetadata implements Serializable {
private String topic;
private boolean isPattern = false;
private Properties properties;
private String consumerGroup;
private StartMode startMode = StartMode.GROUP_OFFSETS;
private Map<TopicPartition, Long> specificStartOffsets;
private Long startOffsetsTimestamp;
Expand Down

This file was deleted.

Loading
Loading