Class: Emq::Encoder

Inherits:
Object
  • Object
show all
Defined in:
app/messages/emq/encoder.rb

Overview

This class should be responsible for encoding messages using an Avro schema stored in RedPanda registry

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject, version, registry_url) ⇒ Encoder

Initialize the validator with the subject, version and registry URL

Parameters:

  • subject (String)

    the subject of the schema

  • version (String)

    the version of the schema

  • registry_url (String)

    the URL of the schema registry



17
18
19
20
21
# File 'app/messages/emq/encoder.rb', line 17

def initialize(subject, version, registry_url)
  @subject = subject
  @version = version
  @registry_url = registry_url
end

Instance Attribute Details

#schema_configObject (readonly)

Returns the value of attribute schema_config.



11
12
13
# File 'app/messages/emq/encoder.rb', line 11

def schema_config
  @schema_config
end

#validate_objObject (readonly)

Returns the value of attribute validate_obj.



11
12
13
# File 'app/messages/emq/encoder.rb', line 11

def validate_obj
  @validate_obj
end

Instance Method Details

#encode_message(message) ⇒ String

Encode a message using the schema

Parameters:

  • message (Hash)

    the message to encode

Returns:

  • (String)

    the encoded message



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'app/messages/emq/encoder.rb', line 26

def encode_message(message) # rubocop:disable Metrics/MethodLength
  # Create schema the schema to use for encoding
  schema = create_message_schema
  begin
    schema = Avro::Schema.parse(schema)
  rescue Avro::SchemaParseError => e
    Rails.logger.error("Schema parsing error: <#{e.message}>. Schema: #{schema}")
    raise
  end
  stream = StringIO.new
  writer = Avro::IO::DatumWriter.new(schema)
  encoder = Avro::IO::BinaryEncoder.new(stream)
  encoder.write("\xC3\x01") # Avro single-object container file header
  encoder.write([schema.crc_64_avro_fingerprint].pack('Q')) # 8 byte schema fingerprint
  writer.write(message, encoder)
  stream.string
rescue StandardError => e
  Rails.logger.error("Error validating volume tracking message: <#{e.message}>")
  raise
end