Class: Emq::PublishingJob
- Inherits:
-
Object
- Object
- Emq::PublishingJob
- Defined in:
- app/messages/emq/publishing_job.rb
Overview
This class should be responsible for publishing messages to the EMQ which are validated against an Avro schema stored in the RedPanda registry before being sent
Constant Summary collapse
- AVRO_SCHEMA_VERSION_KEY =
The prefix for the key which contains the version of the Avro schema to use by the message builder
'avro_schema_version_'
Instance Attribute Summary collapse
-
#bunny_config ⇒ Object
readonly
Returns the value of attribute bunny_config.
Class Method Summary collapse
-
.deep_open_struct(obj) ⇒ Object
recursively converts a nested hash into an OpenStruct, allowing for dot notation access to hash keys and their values.
Instance Method Summary collapse
-
#initialize ⇒ PublishingJob
constructor
Initialize the publishing job with the bunny configuration.
-
#publish(objects, message_config, schema_key) ⇒ Object
Publish a message to the EMQ to publish from the given object(s) Note:- The schema_key must exist within the subjects hash of the bunny configuration and must also have a matching configuration within the pipeline settings.
Constructor Details
#initialize ⇒ PublishingJob
Initialize the publishing job with the bunny configuration
16 17 18 19 |
# File 'app/messages/emq/publishing_job.rb', line 16 def initialize # Load the bunny configuration from the Rails configuration and convert it to an OpenStruct @bunny_config = PublishingJob.deep_open_struct(Rails.configuration.bunny) end |
Instance Attribute Details
#bunny_config ⇒ Object (readonly)
Returns the value of attribute bunny_config.
9 10 11 |
# File 'app/messages/emq/publishing_job.rb', line 9 def bunny_config @bunny_config end |
Class Method Details
.deep_open_struct(obj) ⇒ Object
recursively converts a nested hash into an OpenStruct, allowing for dot notation access to hash keys and their values.
86 87 88 89 90 |
# File 'app/messages/emq/publishing_job.rb', line 86 def self.deep_open_struct(obj) return obj unless obj.is_a?(Hash) OpenStruct.new(obj.transform_values { |val| deep_open_struct(val) }) # rubocop:disable Style/OpenStructUse end |
Instance Method Details
#publish(objects, message_config, schema_key) ⇒ Object
Publish a message to the EMQ to publish from the given object(s) Note:- The schema_key must exist within the subjects hash of the bunny configuration and must also have a matching configuration within the pipeline settings. (See the ‘volume_tracking’ section in config/pipelines/pacbio.yml for reference.) Any messages published using publishing_job require a corresponding entry in the pipeline configuration, identified by the schema key.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/messages/emq/publishing_job.rb', line 34 def publish(objects, , schema_key) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength,Metrics/CyclomaticComplexity # Check if the schema_key exists in the subjects hash and return early if it does not schema = bunny_config.amqp.schemas.subjects[schema_key] return if schema.nil? # Get the subject and version from the schema and return early if either is nil subject = bunny_config.amqp.schemas.subjects[schema_key].subject version = bunny_config.amqp.schemas.subjects[schema_key].version return if subject.nil? || version.nil? # Get the message builder configuration for the schema key and version # and create a message builder class from the configuration = (, schema_key, version) if .nil? Rails.logger.error('Message builder configuration not found for ' \ "schema key: #{schema_key} and version: #{version}") return end = ..to_s.constantize # Create a validator and sender for the subject and version encoder = Emq::Encoder.new(subject, version, bunny_config.amqp.schemas.registry_url) sender = Emq::Sender.new(bunny_config.amqp.broker, subject, version) begin # Publish each object to the EMQ Array(objects).each do |object| # Construct the message to publish from the object using the given configuration = .new(object:, configuration: ) .content # check if the schema_key is present in the payload next if [schema_key].nil? # Validate the message against the schema and send it to the EMQ = [schema_key] = encoder.() sender.() end # Log success message after successful publishing Rails.logger.info('Published volume tracking message to EMQ') rescue StandardError => e # DO NOT Raise an exception if any error occurs; logs the error instead # This is to prevent the job from failing and to allow the job to continue # These logs can be monitored through Kibana Rails.logger.error("Failed to publish message to EMQ: #{e.}") end end |