mackeee
mackeee

Reputation: 320

Can I use streaming pubsub on Google Cloud Platform?

I'm using GCP Pubsub in Rails application. Environment:

Rails: v5.1.4
Ruby: v2.4.2
google-cloud-pubsub: v0.29.0
MacOS: 10.12.6 Sierra

this is code:

require 'json'
require 'google/cloud/pubsub'

module ActiveJob
  module QueueAdapters
    class PubSubQueueAdapter

      def enqueue(job)
        Rails.logger.info "[PubSubQueueAdapter enqueue job #{job.inspect}"

        topic = PubSubQueueAdapter.pubsub.topic(job.queue_name)
        topic.publish(job.class.name, arg: job.arguments)
      end

      class << self
        def pubsub
          @pubsub ||= begin
            project_id = Rails.application.config.x.settings['project_id']
            Google::Cloud::Pubsub.new(
                project_id: project_id,
                credentials: "#{Rails.root.join('config')}/#{Rails.application.config.x.settings['auth_file']}"
            )
          end
        end

        def run_worker!(queue_name = 'default')
          p 'Running worker'
          topic = pubsub.topic(queue_name)
          subscription = topic.subscription("#{queue_name}_task") || topic.subscribe("#{queue_name}_task")

          listener = subscription.listen do |message|
            message.acknowledge!
          end

          listener.start
        end
      end
    end
  end
end

And I exec a rake command:

$ bundle exec rake run_worker

"Running worker"
E0121 21:39:49.713197000 140736594088896 backup_poller.cc:105]         run_poller: {"created":"@1516538389.713182000","description":"Shutting down timer system","file":"src/core/lib/iomgr/timer_generic.cc","file_line":630}

What is this error message??
I wanna use streaming subscriber.

I thought It can solve to use SubscriberClient Google Cloud Pubsub document.
But def streaming_pull did not work. What to say of that, I could not use Google::Cloud::Pubsub::V1::Subscriber.new.
What should I do for using streaming subscriber in rails?

Upvotes: 0

Views: 423

Answers (1)

Max Dietz
Max Dietz

Reputation: 178

I believe because listener is a local variable, and you do not block, you are destructing the listener and shutting down the subscriber as soon as you exit the block. StreamingPull is supported in Ruby, and I believe you are using it, you are just exiting the function before you have connected to Pub/Sub. The error message you are seeing I believe is unrelated to Pub/Sub, and is from a lower level of the gRPC code.

Upvotes: 1

Related Questions