Class: Emq::PublishingJob

Inherits:
Object
  • Object
show all
Defined in:
app/messages/emq/publishing_job.rb

Overview

This class should be responsible for publishing messages to the EMQ which are validated against an Avro schema stored in the RedPanda registry before being sent

Constant Summary collapse

AVRO_SCHEMA_VERSION_KEY =

The prefix for the key which contains the version of the Avro schema to use by the message builder

'avro_schema_version_'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePublishingJob

Initialize the publishing job with the bunny configuration



16
17
18
19
# File 'app/messages/emq/publishing_job.rb', line 16

def initialize
  # Load the bunny configuration from the Rails configuration and convert it to an OpenStruct
  @bunny_config = PublishingJob.deep_open_struct(Rails.configuration.bunny)
end

Instance Attribute Details

#bunny_configObject (readonly)

Returns the value of attribute bunny_config.



9
10
11
# File 'app/messages/emq/publishing_job.rb', line 9

def bunny_config
  @bunny_config
end

Class Method Details

.deep_open_struct(obj) ⇒ Object

recursively converts a nested hash into an OpenStruct, allowing for dot notation access to hash keys and their values.



86
87
88
89
90
# File 'app/messages/emq/publishing_job.rb', line 86

def self.deep_open_struct(obj)
  return obj unless obj.is_a?(Hash)

  OpenStruct.new(obj.transform_values { |val| deep_open_struct(val) }) # rubocop:disable Style/OpenStructUse
end

Instance Method Details

#publish(objects, message_config, schema_key) ⇒ Object

Publish a message to the EMQ to publish from the given object(s) Note:- The schema_key must exist within the subjects hash of the bunny configuration and must also have a matching configuration within the pipeline settings. (See the ‘volume_tracking’ section in config/pipelines/pacbio.yml for reference.) Any messages published using publishing_job require a corresponding entry in the pipeline configuration, identified by the schema key.

Parameters:

  • objects (Object)

    the object or objects to publish

  • message_config (Object)

    the pipeline configuration to construct the message

  • schema_key (String)

    the key of the schema to validate the message against



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'app/messages/emq/publishing_job.rb', line 34

def publish(objects, message_config, schema_key) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength,Metrics/CyclomaticComplexity
  # Check if the schema_key exists in the subjects hash and return early if it does not
  schema = bunny_config.amqp.schemas.subjects[schema_key]
  return if schema.nil?

  # Get the subject and version from the schema and return early if either is nil
  subject = bunny_config.amqp.schemas.subjects[schema_key].subject
  version = bunny_config.amqp.schemas.subjects[schema_key].version
  return if subject.nil? || version.nil?

  # Get the message builder configuration for the schema key and version
  # and create a message builder class from the configuration
  message_builder_config_obj = message_builder_config(message_config, schema_key, version)
  if message_builder_config_obj.nil?
    Rails.logger.error('Message builder configuration not found for ' \
                       "schema key: #{schema_key} and version: #{version}")
    return
  end
  message_builder_class = message_builder_config_obj.message_class.to_s.constantize

  # Create a validator and sender for the subject and version
  encoder = Emq::Encoder.new(subject, version, bunny_config.amqp.schemas.registry_url)
  sender = Emq::Sender.new(bunny_config.amqp.broker, subject, version)

  begin
    # Publish each object to the EMQ
    Array(objects).each do |object|
      # Construct the message to publish from the object using the given configuration
      message_object = message_builder_class.new(object:,
                                                 configuration: message_builder_config_obj)
                                            .content

      # check if the schema_key is present in the payload
      next if message_object[schema_key].nil?

      # Validate the message against the schema and send it to the EMQ
      publish_message = message_object[schema_key]
      message = encoder.encode_message(publish_message)
      sender.send_message(message)
    end
    # Log success message after successful publishing
    Rails.logger.info('Published volume tracking message to EMQ')
  rescue StandardError => e
    # DO NOT Raise an exception if any error occurs; logs the error instead
    # This is to prevent the job from failing and to allow the job to continue
    # These logs can be monitored through Kibana
    Rails.logger.error("Failed to publish message to EMQ: #{e.message}")
  end
end