Class: Emq::PublishingJob
- Inherits:
-
Object
- Object
- Emq::PublishingJob
- Defined in:
- app/messages/emq/publishing_job.rb
Overview
This class should be responsible for publishing messages to the EMQ which are validated against an Avro schema stored in the RedPanda registry before being sent
Constant Summary collapse
- AVRO_SCHEMA_VERSION_KEY =
The prefix for the key which contains the version of the Avro schema to use by the message builder
'avro_schema_version_'
Instance Attribute Summary collapse
-
#bunny_config ⇒ Object
readonly
Returns the value of attribute bunny_config.
Instance Method Summary collapse
-
#initialize ⇒ PublishingJob
constructor
Initialize the publishing job with the bunny configuration.
-
#publish(objects, message_config, schema_key) ⇒ Object
Publish a message to the EMQ to publish from the given object(s) Note:- The schema_key must exist within the subjects hash of the bunny configuration and must also have a matching configuration within the pipeline settings.
Constructor Details
#initialize ⇒ PublishingJob
Initialize the publishing job with the bunny configuration
14 15 16 17 |
# File 'app/messages/emq/publishing_job.rb', line 14 def initialize # Load the bunny configuration from the Rails configuration and convert it to a Struct @bunny_config = SuperStruct.new(Rails.configuration.bunny, deep: true) end |
Instance Attribute Details
#bunny_config ⇒ Object (readonly)
Returns the value of attribute bunny_config.
7 8 9 |
# File 'app/messages/emq/publishing_job.rb', line 7 def bunny_config @bunny_config end |
Instance Method Details
#publish(objects, message_config, schema_key) ⇒ Object
Publish a message to the EMQ to publish from the given object(s) Note:- The schema_key must exist within the subjects hash of the bunny configuration and must also have a matching configuration within the pipeline settings. (See the ‘volume_tracking’ section in config/pipelines/pacbio.yml for reference.) Any messages published using publishing_job require a corresponding entry in the pipeline configuration, identified by the schema key.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'app/messages/emq/publishing_job.rb', line 32 def publish(objects, , schema_key) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength,Metrics/CyclomaticComplexity # Check if the schema_key exists in the subjects hash and return early if it does not return unless bunny_config.amqp.schemas.subjects.key?(schema_key) # Get the subject and version from the schema and return early if either is nil subject = bunny_config.amqp.schemas.subjects[schema_key].subject version = bunny_config.amqp.schemas.subjects[schema_key].version return if subject.nil? || version.nil? # Get the message builder configuration for the schema key and version # and create a message builder class from the configuration = (, schema_key, version) if .nil? Rails.logger.error('Message builder configuration not found for ' \ "schema key: #{schema_key} and version: #{version}") return end = ..to_s.constantize # Create a validator and sender for the subject and version encoder = Emq::Encoder.new(subject, version, bunny_config.amqp.schemas.registry_url) sender = Emq::Sender.new(bunny_config.amqp.broker, subject, version) begin # Publish each object to the EMQ Array(objects).each do |object| # Construct the message to publish from the object using the given configuration = .new(object:, configuration: ) .content # check if the schema_key is present in the payload next if [schema_key].nil? # Validate the message against the schema and send it to the EMQ = [schema_key] = encoder.() sender.() end # Log success message after successful publishing Rails.logger.info('Published volume tracking message to EMQ') rescue StandardError => e # DO NOT Raise an exception if any error occurs; logs the error instead # This is to prevent the job from failing and to allow the job to continue # These logs can be monitored through Kibana Rails.logger.error("Failed to publish message to EMQ: #{e.}") end end |