Isvara
Isvara

Reputation: 3463

Why is my Ignite data streamer failing?

I'm trying to stream data into Ignite via the following StreamVisitor:

val streamer = ignite.dataStreamer[ProductKey, Product]("products")
streamer.allowOverwrite(true)
streamer.autoFlushFrequency(100)

streamer.receiver(new StreamVisitor[ProductKey, Product] {
    val atomic = ignite.atomicLong(s"version@${input.inventoryId}", 0L, true)

    def apply(cache: IgniteCache[ProductKey, Product], entry: Entry[ProductKey, Product]): Unit = {
        def updateProduct(key: ProductKey, product: Product): Unit = {
            val version = atomic.incrementAndGet()
            println(s"Updating product ${product.productId} to version $version")
            cache.put(key, product.copy(version = version))
            //                versionChangeQueue.add(VersionChange(product.inventoryId, version))
        }

        val key = entry.getKey
        val product = entry.getValue
        val current = cache.get(key)
        if (current == null) {
            updateProduct(key, product)
        } else {
            if (attributesDiffer(product.attributes, current.attributes)) {
                updateProduct(key, product)
            } else {
                println(s"Product ${product.productId} hasn't changed")
            }
        }
    }

    private def attributesDiffer(newAttributes: Map[UUID, String], oldAttributes: Map[UUID, String]): Boolean = {
        newAttributes exists {
            case (id, value) => oldAttributes.getOrElse(id, "") != value
        }
    }
})

I repeatedly addData and then flush. Even though I see this:

18:31:04.672 INFO  o.a.i.i.m.d.GridDeploymentLocalStore - Class locally deployed: class io.livefeeds.api.pull.PullWorker$$anon$1

I often get a long stream of the following exceptions:

[18:41:07] (err) Failed to execute compound future reducer: GridCompoundFuture [rdc=null, initFlag=1, lsnrCalls=0, done=false, cancelled=false, err=null, futs=[true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false]]class org.apache.ignite.IgniteCheckedException: DataStreamer request failed [node=a568919f-5f9a-4bdc-ae0a-3360a0179319]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:1857)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:336)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get deployment for request [sndId=2e500b52-0416-46bd-bcd9-b72c57c77f4a, req=DataStreamerRequest [reqId=1, cacheName=products, ignoreDepOwnership=true, skipStore=false, keepBinary=false, depMode=SHARED, sampleClsName=io.livefeeds.api.pull.PullWorker$$anon$1, userVer=0, ldrParticipants=null, clsLdrId=75eaedb7161-2e500b52-0416-46bd-bcd9-b72c57c77f4a, forceLocDep=false, topVer=AffinityTopologyVersion [topVer=10, minorTopVer=0], partId=-2147483648]]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:273)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:748)
[18:41:07] (err) Failed to execute compound future reducer: GridCompoundFuture [rdc=null, initFlag=1, lsnrCalls=0, done=false, cancelled=false, err=null, futs=[true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false]]class org.apache.ignite.IgniteCheckedException: DataStreamer request failed [node=a568919f-5f9a-4bdc-ae0a-3360a0179319]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer.onResponse(DataStreamerImpl.java:1857)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$3.onMessage(DataStreamerImpl.java:336)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get deployment for request [sndId=2e500b52-0416-46bd-bcd9-b72c57c77f4a, req=DataStreamerRequest [reqId=1, cacheName=products, ignoreDepOwnership=true, skipStore=false, keepBinary=false, depMode=SHARED, sampleClsName=io.livefeeds.api.pull.PullWorker$$anon$1, userVer=0, ldrParticipants=null, clsLdrId=75eaedb7161-2e500b52-0416-46bd-bcd9-b72c57c77f4a, forceLocDep=false, topVer=AffinityTopologyVersion [topVer=10, minorTopVer=0], partId=-2147483648]]
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:273)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
    at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
    at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
    at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
    at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
    at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
    at java.lang.Thread.run(Thread.java:748)

And the server log says:

[13:38:02,507][WARNING][data-streamer-stripe-5-#14%livefeeds-dev%][GridDeploymentCommunication] Failed to receive peer response from node within duration [node=ee600e02-bf03-4793-8ca1-623b8b0faa52, duration=5005]
[13:38:02,508][WARNING][data-streamer-stripe-5-#14%livefeeds-dev%][GridDeploymentPerVersionStore] Failed to get resource from node (is node alive?) [nodeId=ee600e02-bf03-4793-8ca1-623b8b0faa52, clsLdrId=039dbdb7161-ee600e02-bf03-4793-8ca1-623b8b0faa52, resName=io/livefeeds/api/pull/PullWorker$$anon$1.class, parentClsLdr=sun.misc.Launcher$AppClassLoader@764c12b6]

What is actually going wrong here?

Upvotes: 0

Views: 794

Answers (1)

Isvara
Isvara

Reputation: 3463

The problem here seemed to be closing over the Ignite instance. I created a standalone FeedVisitor class, with an @IgniteInstanceResource, in a JAR deployed to the server. The standalone class might also work with peer class loading, though.

Upvotes: 1

Related Questions