Skip to content

Commit

Permalink
AVRO-4069: Remove Reader String Cache from Generic Datum Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
belugabehr committed Jan 4, 2025
1 parent 8040078 commit 3f2cc32
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public enum StringType {
private final ClassLoader classLoader;

/**
* Set the Java type to be used when reading this schema. Meaningful only only
* Set the Java type to be used when reading this schema. Meaningful only for
* string schemas and map schemas (for the keys).
*/
public static void setStringType(Schema s, StringType stringType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
Expand All @@ -46,8 +44,8 @@ public class GenericDatumReader<D> implements DatumReader<D> {
private Schema actual;
private Schema expected;
private DatumReader<D> fastDatumReader = null;

private ResolvingDecoder creatorResolver = null;
private final Map<Class, Constructor> stringCtorCache;
private final Thread creator;

public GenericDatumReader() {
Expand All @@ -73,6 +71,7 @@ public GenericDatumReader(Schema writer, Schema reader, GenericData data) {
protected GenericDatumReader(GenericData data) {
this.data = data;
this.creator = Thread.currentThread();
this.stringCtorCache = new HashMap<>();
}

/** Return the {@link GenericData} implementation. */
Expand Down Expand Up @@ -452,13 +451,15 @@ protected Object newMap(Object old, int size) {
* representation. By default, this calls {@link #readString(Object,Decoder)}.
*/
protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
Class stringClass = this.getReaderCache().getStringClass(expected);
if (stringClass == String.class) {
return in.readString();
}
Class stringClass = this.findStringClass(expected);

// Default is CharSequence / UTF8 so check it first
if (stringClass == CharSequence.class) {
return readString(old, in);
}
if (stringClass == String.class) {
return in.readString();
}
return this.newInstanceFromString(stringClass, in.readString());
}

Expand Down Expand Up @@ -487,99 +488,34 @@ protected Object createString(String value) {
*/
protected Class findStringClass(Schema schema) {
String name = schema.getProp(GenericData.STRING_PROP);
if (name == null)
return CharSequence.class;

switch (GenericData.StringType.valueOf(name)) {
case String:
return String.class;
default:
if (name == null) {
return CharSequence.class;
}
}

/**
* This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
* code.
*/
private static final class IdentitySchemaKey {
private final Schema schema;

private final int hashcode;

public IdentitySchemaKey(Schema schema) {
this.schema = schema;
this.hashcode = System.identityHashCode(schema);
}

@Override
public int hashCode() {
return this.hashcode;
}

@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
return false;
}
IdentitySchemaKey key = (IdentitySchemaKey) obj;
return this == key || this.schema == key.schema;
if (GenericData.StringType.String.name().equals(name)) {
return String.class;
}
return CharSequence.class;
}

// VisibleForTesting
static class ReaderCache {
private final Map<IdentitySchemaKey, Class> stringClassCache = new ConcurrentHashMap<>();

private final Map<Class, Function<String, Object>> stringCtorCache = new ConcurrentHashMap<>();

private final Function<Schema, Class> findStringClass;

public ReaderCache(Function<Schema, Class> findStringClass) {
this.findStringClass = findStringClass;
}

public Object newInstanceFromString(Class c, String s) {
final Function<String, Object> ctor = stringCtorCache.computeIfAbsent(c, this::buildFunction);
return ctor.apply(s);
}

private Function<String, Object> buildFunction(Class c) {
@SuppressWarnings("unchecked")
protected Object newInstanceFromString(Class c, String s) {
final Constructor cachedCtor = stringCtorCache.computeIfAbsent(c, clazz -> {
final Constructor ctor;
try {
ctor = c.getDeclaredConstructor(String.class);
ctor = clazz.getDeclaredConstructor(String.class);
ctor.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new AvroRuntimeException(e);
}
ctor.setAccessible(true);

return (String s) -> {
try {
return ctor.newInstance(s);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(e);
}
};
}

public Class getStringClass(final Schema s) {
final IdentitySchemaKey key = new IdentitySchemaKey(s);
return this.stringClassCache.computeIfAbsent(key, (IdentitySchemaKey k) -> this.findStringClass.apply(k.schema));
return ctor;
});
try {
return cachedCtor.newInstance(s);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(e);
}
}

private final ReaderCache readerCache = new ReaderCache(this::findStringClass);

// VisibleForTesting
ReaderCache getReaderCache() {
return readerCache;
}

@SuppressWarnings("unchecked")
protected Object newInstanceFromString(Class c, String s) {
return this.getReaderCache().newInstanceFromString(c, s);
}

/**
* Called to read byte arrays. Subclasses may override to use a different byte
* array representation. By default, this calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
Expand All @@ -33,27 +31,9 @@ public class TestGenericDatumReader {

private static final Random r = new Random(System.currentTimeMillis());

@Test
void readerCache() {
final GenericDatumReader.ReaderCache cache = new GenericDatumReader.ReaderCache(this::findStringClass);
List<Thread> threads = IntStream.rangeClosed(1, 200).mapToObj((int index) -> {
final Schema schema = TestGenericDatumReader.this.build(index);
final WithSchema s = new WithSchema(schema, cache);
return (Runnable) () -> s.test();
}).map(Thread::new).collect(Collectors.toList());
threads.forEach(Thread::start);
threads.forEach((Thread t) -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

@Test
void newInstanceFromString() {
final GenericDatumReader.ReaderCache cache = new GenericDatumReader.ReaderCache(this::findStringClass);
final GenericDatumReader cache = new GenericDatumReader();

Object object = cache.newInstanceFromString(StringBuilder.class, "Hello");
assertEquals(StringBuilder.class, object.getClass());
Expand All @@ -62,21 +42,6 @@ void newInstanceFromString() {

}

static class WithSchema {
private final Schema schema;

private final GenericDatumReader.ReaderCache cache;

public WithSchema(Schema schema, GenericDatumReader.ReaderCache cache) {
this.schema = schema;
this.cache = cache;
}

public void test() {
this.cache.getStringClass(schema);
}
}

private List<Schema> list = new ArrayList<>();

private Schema build(int index) {
Expand Down

0 comments on commit 3f2cc32

Please sign in to comment.