Unable to send Log Events via the KafkaAppender with existing Kafka Schema

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Unable to send Log Events via the KafkaAppender with existing Kafka Schema

Michael Williams
Hi

I’m trying to send logs to Kafka via log4j2. The Topic I’m sending to
already has a Schema and I cannot find a way for the KafkaAppender to get
the Schema Id as it always passes the string “bytes” to the Schema Registry.

In (1) KafkaAppender.tryAppend a byte[] is created representing the
LogEvent to be sent to Kafka. It is always a byte[], never a subclass of
GenericContainer.

Further down the call chain the Schema string is obtained by calling (2)
AvroSchemaUtils.getSchema(). This returns “bytes” as the object passed in
is the byte[] from tryAppend. This is then used in a call to the Schema
Registry which expects a string representation of the Schema and not the
string "bytes".

I haven’t found a way to set the Schema Id for the KafkaAppender or have
the appender use a GenericContainer subclass of my choosing. Is either
possible or is there another way?

Thanks
Michael

1) Creating the byte[] to be sent to Kafka:

private void tryAppend(final LogEvent event) throws ExecutionException,
 InterruptedException, TimeoutException {
    final Layout<? extends Serializable> layout = getLayout();
    byte[] data;
    if (layout instanceof SerializedLayout) {
        final byte[] header = layout.getHeader();
        final byte[] body = layout.toByteArray(event);
        data = new byte[header.length + body.length];
        System.arraycopy(header, 0, data, 0, header.length);
        System.arraycopy(body, 0, data, header.length, body.length);
    } else {
        data = layout.toByteArray(event);
    }
    manager.send(data);
}


2) Getting the Schema string to pass to the Schema Registry to get the
Schema Id:

public static Schema getSchema(Object object) {
  if (object == null) {
    return (Schema)primitiveSchemas.get("Null");
  } else if (object instanceof Boolean) {
    return (Schema)primitiveSchemas.get("Boolean");
  } else if (object instanceof Integer) {
    return (Schema)primitiveSchemas.get("Integer");
  } else if (object instanceof Long) {
    return (Schema)primitiveSchemas.get("Long");
  } else if (object instanceof Float) {
    return (Schema)primitiveSchemas.get("Float");
  } else if (object instanceof Double) {
    return (Schema)primitiveSchemas.get("Double");
  } else if (object instanceof CharSequence) {
    return (Schema)primitiveSchemas.get("String");
  } else if (object instanceof byte[]) {
    return (Schema)primitiveSchemas.get("Bytes");
  } else if (object instanceof GenericContainer) {
    return ((GenericContainer)object).getSchema();
  } else {
    throw new IllegalArgumentException("Unsupported Avro type. Supported
types are null, Boolean, Integer, Long, Float, Double, String, byte[] and
IndexedRecord");
  }
}