Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-19409 Ensure Avro serde caches per subject #2387

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 12 additions & 36 deletions src/Confluent.SchemaRegistry.Serdes.Avro/GenericSerializerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ namespace Confluent.SchemaRegistry.Serdes
{
internal class GenericSerializerImpl : AsyncSerializer<GenericRecord, Avro.Schema>
{
private Dictionary<Avro.Schema, string> knownSchemas = new Dictionary<global::Avro.Schema, string>();
private HashSet<KeyValuePair<string, string>> registeredSchemas = new HashSet<KeyValuePair<string, string>>();
private Dictionary<string, int> schemaIds = new Dictionary<string, int>();
private Dictionary<Avro.Schema, string> knownSchemas =
new Dictionary<global::Avro.Schema, string>();
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
new Dictionary<KeyValuePair<string, string>, int>();

public GenericSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
Expand Down Expand Up @@ -99,12 +100,10 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
// something more sophisticated than the below + not allow
// the misuse to keep happening without warning.
if (knownSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas ||
schemaIds.Count > schemaRegistryClient.MaxCachedSchemas)
registeredSchemas.Count > schemaRegistryClient.MaxCachedSchemas)
{
knownSchemas.Clear();
registeredSchemas.Clear();
schemaIds.Clear();
}

// Determine a schema string corresponding to the schema object.
Expand Down Expand Up @@ -139,41 +138,18 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
{
schemaId = latestSchema.Id;
}
else if (!registeredSchemas.Contains(subjectSchemaPair))
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
{
int newSchemaId;

// first usage: register/get schema to check compatibility
if (autoRegisterSchema)
{
newSchemaId = await schemaRegistryClient
schemaId = autoRegisterSchema
? await schemaRegistryClient
.RegisterSchemaAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient
.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}
else
{
newSchemaId = await schemaRegistryClient.GetSchemaIdAsync(subject, writerSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);
}

if (!schemaIds.ContainsKey(writerSchemaString))
{
schemaIds.Add(writerSchemaString, newSchemaId);
}
else if (schemaIds[writerSchemaString] != newSchemaId)
{
schemaIds.Clear();
registeredSchemas.Clear();
throw new KafkaException(new Error(isKey ? ErrorCode.Local_KeySerialization : ErrorCode.Local_ValueSerialization, $"Duplicate schema registration encountered: Schema ids {schemaIds[writerSchemaString]} and {newSchemaId} are associated with the same schema."));
}

registeredSchemas.Add(subjectSchemaPair);

schemaId = schemaIds[writerSchemaString];
}
else
{
schemaId = schemaIds[writerSchemaString];
registeredSchemas.Add(subjectSchemaPair, schemaId);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,8 @@ internal class SerializerSchemaData
{
private string writerSchemaString;
private global::Avro.Schema writerSchema;

/// <remarks>
/// A given schema is uniquely identified by a schema id, even when
/// registered against multiple subjects.
/// </remarks>
private int? writerSchemaId;

private SpecificWriter<T> avroWriter;

private HashSet<string> subjectsRegistered = new HashSet<string>();

public HashSet<string> SubjectsRegistered
{
get => subjectsRegistered;
set => subjectsRegistered = value;
}

public string WriterSchemaString
{
get => writerSchemaString;
Expand All @@ -64,12 +49,6 @@ public Avro.Schema WriterSchema
set => writerSchema = value;
}

public int? WriterSchemaId
{
get => writerSchemaId;
set => writerSchemaId = value;
}

public SpecificWriter<T> AvroWriter
{
get => avroWriter;
Expand All @@ -79,20 +58,14 @@ public SpecificWriter<T> AvroWriter

private Dictionary<Type, SerializerSchemaData> multiSchemaData =
new Dictionary<Type, SerializerSchemaData>();

private SerializerSchemaData singleSchemaData;
private Dictionary<KeyValuePair<string, string>, int> registeredSchemas =
new Dictionary<KeyValuePair<string, string>, int>();

public SpecificSerializerImpl(
ISchemaRegistryClient schemaRegistryClient,
AvroSerializerConfig config,
RuleRegistry ruleRegistry) : base(schemaRegistryClient, config, ruleRegistry)
{
Type writerType = typeof(T);
if (writerType != typeof(ISpecificRecord))
{
singleSchemaData = ExtractSchemaData(writerType);
}

if (config == null) { return; }

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
Expand Down Expand Up @@ -177,24 +150,18 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
{
try
{
int schemaId;
string subject;
RegisteredSchema latestSchema = null;
SerializerSchemaData currentSchemaData;
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
if (singleSchemaData == null)
{
var key = data.GetType();
if (!multiSchemaData.TryGetValue(key, out currentSchemaData))
{
currentSchemaData = ExtractSchemaData(key);
multiSchemaData[key] = currentSchemaData;
}
}
else
var key = data != null ? data.GetType() : typeof(Null);
if (!multiSchemaData.TryGetValue(key, out currentSchemaData))
{
currentSchemaData = singleSchemaData;
currentSchemaData = ExtractSchemaData(key);
multiSchemaData[key] = currentSchemaData;
}

string fullname = null;
Expand All @@ -204,25 +171,26 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
}

subject = GetSubjectName(topic, isKey, fullname);
var subjectSchemaPair = new KeyValuePair<string, string>(subject, currentSchemaData.WriterSchemaString);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

if (latestSchema != null)
{
currentSchemaData.WriterSchemaId = latestSchema.Id;
schemaId = latestSchema.Id;
}
else if (!currentSchemaData.SubjectsRegistered.Contains(subject))
else if (!registeredSchemas.TryGetValue(subjectSchemaPair, out schemaId))
{
// first usage: register/get schema to check compatibility
currentSchemaData.WriterSchemaId = autoRegisterSchema
schemaId = autoRegisterSchema
? await schemaRegistryClient
.RegisterSchemaAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false)
: await schemaRegistryClient
.GetSchemaIdAsync(subject, currentSchemaData.WriterSchemaString, normalizeSchemas)
.ConfigureAwait(continueOnCapturedContext: false);

currentSchemaData.SubjectsRegistered.Add(subject);
registeredSchemas.Add(subjectSchemaPair, schemaId);
}
}
finally
Expand All @@ -248,7 +216,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
{
stream.WriteByte(Constants.MagicByte);

writer.Write(IPAddress.HostToNetworkOrder(currentSchemaData.WriterSchemaId.Value));
writer.Write(IPAddress.HostToNetworkOrder(schemaId));
currentSchemaData.AvroWriter.Write(data, new BinaryEncoder(stream));

// TODO: maybe change the ISerializer interface so that this copy isn't necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public BaseSerializeDeserializeTests()
schemaRegistryMock.Setup(x => x.RegisterSchemaAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, string schema, bool normalize) => store.TryGetValue(schema, out int id) ? id : store[schema] = store.Count + 1
);
schemaRegistryMock.Setup(x => x.GetSchemaIdAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, string schema, bool normalize) =>
{
return subjectStore[subject].First(x =>
x.SchemaString == schema
).Id;
}
);
schemaRegistryMock.Setup(x => x.LookupSchemaAsync(It.IsAny<string>(), It.IsAny<Schema>(), It.IsAny<bool>(), It.IsAny<bool>())).ReturnsAsync(
(string subject, Schema schema, bool ignoreDeleted, bool normalize) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,43 @@ public void ISpecificRecord()
Assert.Equal(user.favorite_number, result.favorite_number);
}

[Fact]
public void ISpecificRecordStrings()
{
var schemaStr = "{\"type\":\"string\"}";
var schema = new RegisteredSchema("topic1-value", 1, 1, schemaStr, SchemaType.Avro, null);
store[schemaStr] = 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume this is not required given AutoRegisterSchemas = false,

subjectStore["topic1-value"] = new List<RegisteredSchema> { schema };

schema = new RegisteredSchema("topic2-value", 1, 2, schemaStr, SchemaType.Avro, null);
schema.Metadata = new Metadata(null, new Dictionary<string, string>
{
{ "confluent:version", "1" }
}, null);
store[schemaStr] = 2;
subjectStore["topic2-value"] = new List<RegisteredSchema> { schema };

var config = new AvroSerializerConfig
{
AutoRegisterSchemas = false,
SubjectNameStrategy = SubjectNameStrategy.Topic
};
var serializer = new AvroSerializer<String>(schemaRegistryClient, config);

Headers headers = new Headers();
var bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result;
Assert.Equal(1, bytes[4]);

bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result;
Assert.Equal(2, bytes[4]);

bytes = serializer.SerializeAsync("hi", new SerializationContext(MessageComponentType.Value, "topic1", headers)).Result;
Assert.Equal(1, bytes[4]);

bytes = serializer.SerializeAsync("world", new SerializationContext(MessageComponentType.Value, "topic2", headers)).Result;
Assert.Equal(2, bytes[4]);
}

[Fact]
public void ISpecificRecordRecordNameStrategy()
{
Expand Down