Flo
Flo

Reputation: 21

Dataflow Pub/Sub streaming jobs get stuck and keep modifying ACK deadline of messages

We’ve been using Dataflow with the Python BEAM SDK (2.34.0 and 2.35.0) for two different streaming jobs, both having as input(s) Pub/Sub topic(s). One of those jobs windows input messages and groups them before writing them to Cloud Storage. The other does not apply any windowing (but makes use of timers). Those jobs currently process very few messages (about 1 message per second), and have a single worker each.

In the past few days, both these jobs have been getting stuck around once a day, causing outages in our system. The symptoms are:

Dataflow does not trigger any autoscaling in response to the messages stacking up in the subscription. This is probably due to the CPU of the VMs staying low (as messages are no longer processed anyway). Also, from what I understand, messages are only ACK’ed once they’ve been safely committed at the end of a stage (both our jobs contain two stages). But Dataflow might still actually be reading the messages (as it pushes the ACK deadline for them). However those messages never leave the ReadFromPubSub step.

Manually deleting the VM worker (from the Compute Engine console) triggers the automatic recreation of a VM and unstucks the job. Consumption of the Pub/Sub subscription resumes and everything returns to normal.

How can we solve this problem and ensure our jobs don’t get stuck? We’re out of ideas as Dataflow only produces few logs and our business code does not seem to be responsible for this behaviour.

(1) Dataflow making requests to extend the messages ACK deadline.

(2) Error logs. ReadStream-process can also be MergeBuckets-process. NameOfAGroupByKeyStep is the step at which the execution passes from stage 1 to stage 2.

Stuck state: workflow-msec-<NameOfAGroupByKeyStep>/ReadStream-process, reporter: 0x4566bdcd4aa8, with stack:
--- Thread (name: futex-default-SDomainT/132) stack: ---
PC: @     0x55f23554d580  thread::(anonymous namespace)::FutexDomain::RawBlock()
@     0x55f23554d580  thread::(anonymous namespace)::FutexDomain::RawBlock()
@     0x55f23554cbbe  thread::(anonymous namespace)::FutexDomain::BlockCurrent()
@     0x55f2356e7aac  base::scheduling::Downcalls::UserSchedule()
@     0x55f2356e689e  AbslInternalPerThreadSemWait
@     0x55f235749e84  absl::CondVar::WaitCommon()
@     0x55f23554c221  thread::SelectUntil()
@     0x55f2345be1cb  dist_proc::dax::workflow::(anonymous namespace)::BatchingWindmillGetDataClient::GetData()
@     0x55f2345ac148  dist_proc::dax::workflow::StreamingRpcWindmillServiceStreamingServer::GetData()
@     0x55f234ae9f85  dist_proc::dax::workflow::WindmillServiceStreamingServerProxy::GetData()
@     0x55f234945ad3  dist_proc::dax::workflow::StateManager::PrefetchAll()
@     0x55f23494521b  dist_proc::dax::workflow::StateManager::ReadTag()
@     0x55f23493c3d6  dist_proc::dax::workflow::WindmillWindowingAPIDelegate::ReadKeyedStateImplVirtual()
@     0x55f2349420ed  dist_proc::dax::workflow::WindowingAPIDelegate::ReadKeyedStateImpl()
@     0x55f234941fd2  dist_proc::dax::workflow::WindmillCacheAccess::ReadKeyedStateImpl()
@     0x55f2346e6ec8  dist_proc::dax::workflow::CacheAccess::ReadStateFromCache<>()::{lambda()#1}::operator()()
@     0x55f2346e6e8e  absl::functional_internal::InvokeObject<>()
@     0x55f234942912  std::__u::__function::__policy_invoker<>::__call_impl<>()
@     0x55f2349c5927  dist_proc::dax::workflow::StateObjectsCache::ReadImpl()
@     0x55f2349c56f5  dist_proc::dax::workflow::StateObjectsCache::Read()

(3) Info-level logs that precedes outages, about networking issues:

I0128 16:07:09.289409461     166 subchannel.cc:945]          subchannel 0x473cbc81a000 {address=ipv4:74.125.133.95:443, args=grpc.client_channel_factory=0x473cbfcb4690, grpc.default_authority=europe-west1-dataflowstreaming-pa.googleapis.com, grpc.dns_enable_srv_queries=1, grpc.http2_scheme=https, grpc.internal.channel_credentials=0x473cbf494f78, grpc.internal.security_connector=0x473cbb5f0230, grpc.internal.subchannel_pool=0x473cbf766870, grpc.keepalive_permit_without_calls=1, grpc.keepalive_time_ms=60000, grpc.keepalive_timeout_ms=60000, grpc.max_metadata_size=1048576, grpc.max_receive_message_length=-1, grpc.primary_user_agent=grpc-c++/1.44.0-dev, grpc.resource_quota=0x473cbf752ca8, grpc.server_uri=dns:///europe-west1-dataflowstreaming-pa.googleapis.com}: connect failed: {"created":"@1643386029.289272376","description":"Failed to connect to remote host: FD shutdown","file":"third_party/grpc/src/core/lib/iomgr/ev_poll_posix.cc","file_line":500,"grpc_status":14,"os_error":"Timeout occurred","referenced_errors":[{"created":"@1643386029.289234760","description":"connect() timed out","file":"third_party/grpc/src/core/lib/iomgr/tcp_client_posix.cc","file_line":114}],"target_address":"ipv4:74.125.133.95:443"}

(4) Dataflow correctly detecting the system lag increasing

Upvotes: 1

Views: 473

Answers (1)

Flo
Flo

Reputation: 21

After contacting Google's support team, we never got a clear answer to what was the problem, but it stopped occurring. We just concluded it was an internal error that was eventually fixed by the Dataflow team.

Upvotes: 1

Related Questions