Reputation: 21
We’ve been using Dataflow with the Python BEAM SDK (2.34.0
and 2.35.0
) for two different streaming jobs, both having as input(s) Pub/Sub topic(s). One of those jobs windows input messages and groups them before writing them to Cloud Storage. The other does not apply any windowing (but makes use of timers). Those jobs currently process very few messages (about 1 message per second), and have a single worker each.
In the past few days, both these jobs have been getting stuck around once a day, causing outages in our system. The symptoms are:
ReadFromPubSub
) don’t process any message.Dataflow does not trigger any autoscaling in response to the messages stacking up in the subscription. This is probably due to the CPU of the VMs staying low (as messages are no longer processed anyway). Also, from what I understand, messages are only ACK’ed once they’ve been safely committed at the end of a stage (both our jobs contain two stages). But Dataflow might still actually be reading the messages (as it pushes the ACK deadline for them). However those messages never leave the ReadFromPubSub
step.
Manually deleting the VM worker (from the Compute Engine console) triggers the automatic recreation of a VM and unstucks the job. Consumption of the Pub/Sub subscription resumes and everything returns to normal.
How can we solve this problem and ensure our jobs don’t get stuck? We’re out of ideas as Dataflow only produces few logs and our business code does not seem to be responsible for this behaviour.
(1) Dataflow making requests to extend the messages ACK deadline.
(2) Error logs. ReadStream-process
can also be MergeBuckets-process
. NameOfAGroupByKeyStep
is the step at which the execution passes from stage 1 to stage 2.
Stuck state: workflow-msec-<NameOfAGroupByKeyStep>/ReadStream-process, reporter: 0x4566bdcd4aa8, with stack:
--- Thread (name: futex-default-SDomainT/132) stack: ---
PC: @ 0x55f23554d580 thread::(anonymous namespace)::FutexDomain::RawBlock()
@ 0x55f23554d580 thread::(anonymous namespace)::FutexDomain::RawBlock()
@ 0x55f23554cbbe thread::(anonymous namespace)::FutexDomain::BlockCurrent()
@ 0x55f2356e7aac base::scheduling::Downcalls::UserSchedule()
@ 0x55f2356e689e AbslInternalPerThreadSemWait
@ 0x55f235749e84 absl::CondVar::WaitCommon()
@ 0x55f23554c221 thread::SelectUntil()
@ 0x55f2345be1cb dist_proc::dax::workflow::(anonymous namespace)::BatchingWindmillGetDataClient::GetData()
@ 0x55f2345ac148 dist_proc::dax::workflow::StreamingRpcWindmillServiceStreamingServer::GetData()
@ 0x55f234ae9f85 dist_proc::dax::workflow::WindmillServiceStreamingServerProxy::GetData()
@ 0x55f234945ad3 dist_proc::dax::workflow::StateManager::PrefetchAll()
@ 0x55f23494521b dist_proc::dax::workflow::StateManager::ReadTag()
@ 0x55f23493c3d6 dist_proc::dax::workflow::WindmillWindowingAPIDelegate::ReadKeyedStateImplVirtual()
@ 0x55f2349420ed dist_proc::dax::workflow::WindowingAPIDelegate::ReadKeyedStateImpl()
@ 0x55f234941fd2 dist_proc::dax::workflow::WindmillCacheAccess::ReadKeyedStateImpl()
@ 0x55f2346e6ec8 dist_proc::dax::workflow::CacheAccess::ReadStateFromCache<>()::{lambda()#1}::operator()()
@ 0x55f2346e6e8e absl::functional_internal::InvokeObject<>()
@ 0x55f234942912 std::__u::__function::__policy_invoker<>::__call_impl<>()
@ 0x55f2349c5927 dist_proc::dax::workflow::StateObjectsCache::ReadImpl()
@ 0x55f2349c56f5 dist_proc::dax::workflow::StateObjectsCache::Read()
(3) Info-level logs that precedes outages, about networking issues:
I0128 16:07:09.289409461 166 subchannel.cc:945] subchannel 0x473cbc81a000 {address=ipv4:74.125.133.95:443, args=grpc.client_channel_factory=0x473cbfcb4690, grpc.default_authority=europe-west1-dataflowstreaming-pa.googleapis.com, grpc.dns_enable_srv_queries=1, grpc.http2_scheme=https, grpc.internal.channel_credentials=0x473cbf494f78, grpc.internal.security_connector=0x473cbb5f0230, grpc.internal.subchannel_pool=0x473cbf766870, grpc.keepalive_permit_without_calls=1, grpc.keepalive_time_ms=60000, grpc.keepalive_timeout_ms=60000, grpc.max_metadata_size=1048576, grpc.max_receive_message_length=-1, grpc.primary_user_agent=grpc-c++/1.44.0-dev, grpc.resource_quota=0x473cbf752ca8, grpc.server_uri=dns:///europe-west1-dataflowstreaming-pa.googleapis.com}: connect failed: {"created":"@1643386029.289272376","description":"Failed to connect to remote host: FD shutdown","file":"third_party/grpc/src/core/lib/iomgr/ev_poll_posix.cc","file_line":500,"grpc_status":14,"os_error":"Timeout occurred","referenced_errors":[{"created":"@1643386029.289234760","description":"connect() timed out","file":"third_party/grpc/src/core/lib/iomgr/tcp_client_posix.cc","file_line":114}],"target_address":"ipv4:74.125.133.95:443"}
(4) Dataflow correctly detecting the system lag increasing
Upvotes: 1
Views: 473
Reputation: 21
After contacting Google's support team, we never got a clear answer to what was the problem, but it stopped occurring. We just concluded it was an internal error that was eventually fixed by the Dataflow team.
Upvotes: 1