Protobuf schema validation issue #6697
-
|
Hi @EricWittmann, I am facing issue during serialization when validation flag is enabled. I am able to reproduce the issue with the test class. Could you please help us to resolve it ? Thanks in advance! APICurio - 3.1.0 version ERROR - person.proto syntax = "proto3";
package com.example.person;
// Optionally specify where to generate Java classes
option java_package = "com.example.person";
option java_outer_classname = "PersonProto";
message Person {
// --- Basic information ---
string name = 1;
int32 id = 2;
string email = 3;
// --- Only one contact detail can be set at a time ---
oneof contact_info {
string phone_number = 4;
string social_handle = 5;
string address = 6;
}
// --- Only one status field can be set ---
oneof status {
bool is_active = 7;
string last_seen = 8;
}
// --- Reserved fields (deprecated or removed) ---
reserved 9, 10;
reserved "old_field", "nickname";
}Test Class - public class APICurioRegisterAndSer {
private static final String API_CURIO_REGISTRY_URL = "http://localhost:8082/apis/registry/v3";
private static final String SERVERS = "localhost:9093";
private static final String TOPIC_NAME = "person";
private static final RegistryClient registerClient = RegistryClientFactory.create(RegistryClientOptions.create(API_CURIO_REGISTRY_URL));
public static void main(String[] args) throws Exception {
System.out.println("Starting example " + APICurioRegisterAndSer.class.getSimpleName());
registerSchemas("person", "person.proto");
Producer<String, PersonProto.Person> producer = createKafkaProducer();
try {
System.out.println("Producing (2) messages.");
PersonProto.Person person = PersonProto.Person.newBuilder()
.setName("ABC")
.setId(101)
.setEmail("[email protected]")
.setPhoneNumber("1234")
.setIsActive(true)
.build();
ProducerRecord<String, PersonProto.Person> producedRecord = new ProducerRecord<>(TOPIC_NAME, person);
producer.send(producedRecord);
Thread.sleep(100);
} finally {
System.out.println("Closing the producer.");
producer.flush();
producer.close();
}
}
static void registerSchemas(String subjectName, String protoFile) {
if (schemaExists(subjectName)) {
setCompatibility(subjectName);
} else {
try (InputStream inputStream = APICurioRegisterAndSer.class.getClassLoader().getResourceAsStream(protoFile)) {
if (inputStream == null) {
return;
}
byte[] schemaBytes = inputStream.readAllBytes();
String protoContent = new String(schemaBytes, StandardCharsets.UTF_8);
VersionContent versionContent = new VersionContent();
versionContent.setContent(protoContent);
versionContent.setContentType("application/protobuf");
CreateVersion firstVersion = new CreateVersion();
firstVersion.setContent(versionContent);
CreateArtifact artifact = new CreateArtifact();
artifact.setArtifactId(subjectName);
artifact.setName("Person Schema " + subjectName);
artifact.setDescription("Person Schema " + subjectName);
artifact.setArtifactType("PROTOBUF");
artifact.setFirstVersion(firstVersion);
registerClient.groups().byGroupId("default").artifacts().post(artifact);
setCompatibility(subjectName);
} catch (Exception e) {
System.out.println(e);
}
}
}
static boolean schemaExists(String artifactName){
try {
registerClient.groups().byGroupId("default").artifacts().byArtifactId(artifactName).get();
} catch (Exception e) {
if (e instanceof io.apicurio.registry.rest.client.models.ProblemDetails problem
&& problem.getStatus() != null && problem.getStatus() == 404) {
return false;
}
}
return true;
}
static void setCompatibility(String subjectName){
CreateRule rule = new CreateRule();
rule.setRuleType(RuleType.COMPATIBILITY);
rule.setConfig("NONE");
try {
registerClient.groups().byGroupId("default").artifacts().byArtifactId(subjectName).rules().post(rule);
} catch (Exception e) {
System.out.println(e);
}
}
private static Producer<String, PersonProto.Person> createKafkaProducer() {
Properties props = new Properties();
// Configure kafka settings
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName());
props.putIfAbsent(SerdeConfig.REGISTRY_URL, API_CURIO_REGISTRY_URL);
props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "default");
props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_ID, "person");
props.putIfAbsent(SerdeConfig.SEND_TYPE_REF, false);
props.putIfAbsent(SerdeConfig.SEND_INDEXES, true);
props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, true);
props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.FALSE);
props.putIfAbsent(SerdeConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE);
return new KafkaProducer<>(props);
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 9 replies
-
|
hi, When validation is enabled, compatibility check is made between the protobuf schema from Apicurio Registry, and the descriptor from the data you are sending. I guess this means that the proto file you have used to generate the Java class |
Beta Was this translation helpful? Give feedback.
-
|
I spent a bit of time with this today. Root Cause AnalysisThe ProblemIn
The issue is in for (String reservedName : descriptor.getReservedNameList()) {
ReservedElement reservedElem = new ReservedElement(DEFAULT_LOCATION, "",
Collections.singletonList(reservedName));
reserved.add(reservedElem);
}
for (DescriptorProto.ReservedRange reservedRange : descriptor.getReservedRangeList()) {
List<IntRange> values = new ArrayList<>();
int start = reservedRange.getStart();
int end = reservedRange.getEnd() - 1;
values.add(new IntRange(start, end));
ReservedElement reservedElem = new ReservedElement(DEFAULT_LOCATION, "", values);
reserved.add(reservedElem);
}However, when you compile a .proto file with So when validating in the serializer:
What to do?We could have an option in the protobuf compatibility checker to skip the call to WDYT? |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for the reply. Yes. The option to skip looks fine. Is it working fine if we skip the check for reserve fields ? |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for the support! |
Beta Was this translation helpful? Give feedback.
I spent a bit of time with this today.
Root Cause Analysis
The Problem
In
ProtobufSerializer.java, the validation compares twoProtobufFileobjects:FileDescriptorusingtoProtoFileElement()The issue is in
FileDescriptorUtils.fileDescriptorToProtoFile(). When converting aFileDescriptor(from compiled Java classes) back to aProtoFileElement, it processes reserved fields: