Class: Emq::Encoder
- Inherits:
-
Object
- Object
- Emq::Encoder
- Defined in:
- app/messages/emq/encoder.rb
Overview
This class should be responsible for encoding messages using an Avro schema stored in RedPanda registry
Instance Attribute Summary collapse
-
#schema_config ⇒ Object
readonly
Returns the value of attribute schema_config.
-
#validate_obj ⇒ Object
readonly
Returns the value of attribute validate_obj.
Instance Method Summary collapse
-
#encode_message(message) ⇒ String
Encode a message using the schema.
-
#initialize(subject, version, registry_url) ⇒ Encoder
constructor
Initialize the validator with the subject, version and registry URL.
Constructor Details
#initialize(subject, version, registry_url) ⇒ Encoder
Initialize the validator with the subject, version and registry URL
17 18 19 20 21 |
# File 'app/messages/emq/encoder.rb', line 17 def initialize(subject, version, registry_url) @subject = subject @version = version @registry_url = registry_url end |
Instance Attribute Details
#schema_config ⇒ Object (readonly)
Returns the value of attribute schema_config.
11 12 13 |
# File 'app/messages/emq/encoder.rb', line 11 def schema_config @schema_config end |
#validate_obj ⇒ Object (readonly)
Returns the value of attribute validate_obj.
11 12 13 |
# File 'app/messages/emq/encoder.rb', line 11 def validate_obj @validate_obj end |
Instance Method Details
#encode_message(message) ⇒ String
Encode a message using the schema
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'app/messages/emq/encoder.rb', line 26 def () # rubocop:disable Metrics/MethodLength # Create schema the schema to use for encoding schema = begin schema = Avro::Schema.parse(schema) rescue Avro::SchemaParseError => e Rails.logger.error("Schema parsing error: <#{e.}>. Schema: #{schema}") raise end stream = StringIO.new writer = Avro::IO::DatumWriter.new(schema) encoder = Avro::IO::BinaryEncoder.new(stream) encoder.write("\xC3\x01") # Avro single-object container file header encoder.write([schema.crc_64_avro_fingerprint].pack('Q')) # 8 byte schema fingerprint writer.write(, encoder) stream.string rescue StandardError => e Rails.logger.error("Error validating volume tracking message: <#{e.}>") raise end |