Skip to content

Extensions

Mirro Mutth edited this page Feb 15, 2024 · 3 revisions

CodecRegistrar

It is an Extension of custom driver behavior for encoding or decoding field data.

Example for JSON

Add an extending Codec of JSON field, which is based on Jackson.

Implement codec

Firstly, implement a Codec, ParametrizedCodec, MassiveCodec or MassiveParametrizedCodec.

  • Codec: the normal codec
    • Data type is Class<?>
    • Data byte size is less than or equal to Integer.MAX_VALUE
  • ParametrizedCodec
    • Data type is Class<?> or ParametrizedType
    • Data byte size is less than or equal to Integer.MAX_VALUE
  • MassiveCodec
    • Data type is Class<?>
    • Data byte size is less than or equal to 4,294,967,295, the maximum value of unsigned int32
  • MassiveParametrizedCodec
    • Data type is Class<?> or ParametrizedType
    • Data byte size is less than or equal to 4,294,967,295

Actually, JSON field can store a large json data which byte size can be 4,294,967,295. Only Codec is implemented just to simplify the example.

public final class JacksonCodec implements Codec<Object> {

    /**
     * Just for example, should configure it in real applications.
     */
    private static final ObjectMapper MAPPER = new ObjectMapper();

    private final ByteBufAllocator allocator;

    /**
     * Used for encoding/decoding mode, see also registrar in second step.
     */
    private final boolean encoding;

    public JacksonCodec(ByteBufAllocator allocator, boolean encoding) {
        this.allocator = allocator;
        this.encoding = encoding;
    }

    @Override
    public Object decode(ByteBuf value, FieldInformation info, Class<?> target, boolean binary, CodecContext context) {
        // If you ensure server is using UTF-8, you can just use InputStream
        try (Reader r = new InputStreamReader(new ByteBufInputStream(value), CharCollation.fromId(info.getCollationId(), context.getServerVersion()).getCharset())) {
            return MAPPER.readValue(r, target);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Parameter encode(Object value, CodecContext context) {
        return new JacksonParameter(allocator, value, context);
    }

    @Override
    public boolean canDecode(FieldInformation info, Class<?> target) {
        return !encoding && info.getType() == MySqlType.JSON && info.getCollationId() != CharCollation.BINARY_ID;
    }

    @Override
    public boolean canEncode(Object value) {
        return encoding;
    }

    private static final class JacksonParameter implements MySqlParameter {

        private final Object value;

        private final CodecContext context;

        private JacksonParameter(Object value, CodecContext context) {
            this.value = value;
            this.context = context;
        }

        @Override
        public Mono<ByteBuf> publishBinary(ByteBufAllocator allocator) {
            // JSON in binary protocol should be a var-integer sized encoded string.
            // That means we should write a var-integer as a size of following content
            // bytes firstly, then write the encoded string as content.
            //
            // Binary protocol may be different for each type of encoding, so if do not
            // use binary protocol, just return a Mono.error() instead.
            return Mono.fromSupplier(() -> {
                Charset charset = context.getClientCollation().getCharset();
                ByteBuf content = allocator.buffer();

                // Encode and calculate content bytes first, we should know bytes size.
                try (Writer w = new OutputStreamWriter(new ByteBufOutputStream(content), charset)) {
                    MAPPER.writeValue(w, value);
                } catch (IOException e) {
                    content.release();
                    throw new CustomRuntimeException(e);
                } catch (Throwable e) {
                    content.release();
                    throw e;
                }

                ByteBuf buf = null;
                try {
                    buf = allocator.buffer();
                    // Note: VarIntUtils is an unstable, internal utility.
                    VarIntUtils.writeVarInt(buf, content.readableBytes());
                    return buf.writeBytes(content);
                } catch (Throwable e) {
                    if (buf != null) {
                        buf.release();
                    }
                    throw e;
                } finally {
                    content.release();
                }
            });
        }

        @Override
        public Mono<Void> publishText(ParameterWriter writer) {
            return Mono.fromRunnable(() -> {
                try {
                    MAPPER.writeValue(writer, value);
                } catch (IOException e) {
                    throw new CustomRuntimeException(e);
                }
            });
        }

        @Override
        public short getType() {
            return DataTypes.VARCHAR;
        }

        /**
         * Optional, for statements/parameters logging.
         */
        @Override
        public String toString() {
            return value.toString();
        }
    }
}

Implement CodecRegistrar

Secondly, implement a CodecRegistrar for add codecs with correct ordering.

// It is just an example of package name and does not represent any company, individual or organization.
package org.example.demo.json;

// Some imports...

public final class JacksonCodecRegistrar implements CodecRegistrar {

    @Override
    public void register(ByteBufAllocator allocator, CodecRegistry registry) {
        // Decoding JSON by highest priority, encoding anything by lowest priority.
        registry.addFirst(new JacksonCodec(false))
            .addLast(new JacksonCodec(true));
    }
}

Register extension

There are 2 ways to register the Extension to configuration.

Discoverable

It can be used for all configuration ways.

Create a file in META-INF/services, which file name is io.asyncer.r2dbc.mysql.extension.Extension, it contains this line:

foo.bar.r2dbc.mysql.json.JacksonCodecRegistrar

It will make the JacksonCodecRegistrar discoverable, and the JacksonCodecRegistrar will be added automatically if the [autodetectExtensions][/asyncer-io/r2dbc-mysql/wiki/configuration-options#autodetectextensions] is not disabled.

Manually

It can only be used by [Programmatic Configuration][/asyncer-io/r2dbc-mysql/wiki/Getting-Started#programmatic-configuration].

MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .host("127.0.0.1")
    .user("root")
    .port(3306)
    .extendWith(new JacksonCodecRegistrar())
    .build()

This is only needed if user wants to create a special connection and does not want the current extension to affect other connections.