Alexander Ryasnyanskiy
Alexander Ryasnyanskiy

Reputation: 111

Spark streaming parallel processing for multiple receivers

I'm using Spark Streaming for multiple custom receivers (2 receivers for different UDP data sockets, 1 for HTTP data) in my app. Receiver's transformations don't have any resource in common.

When count of input data was increased, I found that these 3 receivers works not in parallel, but one by one.

For example, if I set batch interval 20 seconds, every single receiver process data for about 5 seconds, but if all 3 receivers enabled together, their summary processing time = 3 * 5 second (approx), instead of 5 seconds.

So I created this test, and saw the same situation.

Environment: Core i5, 4 cores, 16 GB of memory. 2 UDP receivers for 4 cores (so it's enough for receive and process). Transformations for dstreams are strange and aren't cached (persisted), but for test purposes only

Question: what's wrong and how I can enable parallel processing?

Spark web ui picture shows, that receiver's info process one by one.

Spark web ui picture

@Slf4j
public class SparkApp {

public static void main(String[] args) throws InterruptedException {

    SparkConf conf = new SparkConf().setMaster("local[*]")
            .setAppName("ParallelReceiver");

    // no changes in processing
    conf.set("spark.cores.max", "4");

    // undocumented, has some effect for parallel processing (spark web ui), 
    // but not for the whole processing time
    conf.set("spark.streaming.concurrentJobs", "10");

    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

    RunCalc runCalc1 = new RunCalc(jssc, 5216, 2000, "1");
    runCalc1.service();

    RunCalc runCalc2 = new RunCalc(jssc, 5217, 2000, "2");
    runCalc2.service();

    jssc.start();
    jssc.awaitTermination();

  }

}
@Data
@Slf4j
public class RunCalc {

private final JavaStreamingContext jssc;
private final int port;
private final Integer defaultBitrate;
private final String suff;

public void service() {

    // get stream nginx log data from UDP
    JavaReceiverInputDStream<NginxRaw> records = jssc.receiverStream(new UdpReceiver(port, defaultBitrate));

    records.print();
    calc(records, suff);
    records.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, suff));
}

private void calc(JavaReceiverInputDStream<NginxRaw> records, String suff) {

    // first operation
    JavaDStream<Integer> reduce = records.filter(r -> r.getChannelName() != null)
            .map(NginxRaw::getBytesSent)
            .reduce((r1, r2) -> r1 + r2);

    reduce.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "reduce" + "-" + suff));

    // second operation
    JavaPairDStream<String, NginxRaw> uidRawPairs = records.mapToPair(r -> new Tuple2<>(r.getMac()
            .toUpperCase(), r))
            .window(Durations.minutes(1), Durations.minutes(1));

    JavaPairDStream<String, Iterable<NginxRaw>> groups = uidRawPairs.groupByKey();

    JavaPairDStream<String, Long> uidSizePairs = groups.mapValues(v -> v.spliterator()
            .getExactSizeIfKnown());

    uidSizePairs.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "uidSizeWindowCalc" + "-" + suff));

}

}
@Slf4j
public class UdpReceiver extends Receiver<NginxRaw> {

private final int port;

private final int defaultBitrate;

private DatagramSocket socket;

public UdpReceiver(int port, int defaultBitrate) {
    super(StorageLevel.MEMORY_AND_DISK());
    this.port = port;
    this.defaultBitrate = defaultBitrate;
}


@Override
public void onStart() {
    new Thread(this::receive).start();
}

@Override
public void onStop() {

}

private void receive() {

    try {

        log.debug("receive");
        log.debug("thread: {}", Thread.currentThread());

        String row;
        initSocket();
        byte[] receiveData = new byte[5000];

        // Until stopped or connection broken continue reading
        while (!isStopped()) {

            DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);

            socket.receive(receivePacket);

            byte[] data = receivePacket.getData();

            row = new String(data, 0, receivePacket.getLength());

            NginxRaw rawLine = new NginxRaw(row, defaultBitrate);

            filterAndSave(rawLine);

        }

        socket.close();

        // Restart in an attempt to connect again when server is active again
        log.debug("Trying to connect again");
        restart("Trying to connect again");

    } catch (ConnectException e) {

        // restart if could not connect to server
        log.error("Could not connect", e);
        reportError("Could not connect: ", e);
        restart("Could not connect", e);

    } catch (Throwable e) {

        // restart if there is any other error
        log.error("Error receiving data", e);
        reportError("Error receiving data: ", e);
        restart("Error receiving data", e);

    }

}

/**
 * connect to the server
 */
private void initSocket() {

    if (socket == null) {

        try {

            socket = new DatagramSocket(null);
            socket.setReuseAddress(true);
            socket.setBroadcast(true);
            socket.bind(new InetSocketAddress(port));

        } catch (SocketException e) {

            log.debug("Error = {}", e);
            e.printStackTrace();

        }
    }

}

private void filterAndSave(NginxRaw rawLine) {

    if (!rawLine.getMac()
            .equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
            &&
            !rawLine.getChannelName()
                    .equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
            && !rawLine.getChannelName()
            .equals("vod")
            && !rawLine.getIp()
            .equals("127.0.0.1")) {

        store(rawLine);

    }

}
}

Upvotes: 4

Views: 1676

Answers (1)

Constantino Cronemberger
Constantino Cronemberger

Reputation: 2261

I had a similar issue: multiple receivers for the same queue, but data was processed serially. The fix was very simple: I unify and join all streams into a single one!

Before I had this:

  sizeStream.foreachRDD(rdd -> {
    ...
  });
  for (JavaPairDStream<String, Long> dstream : streams) {
    dstream.foreachRDD(rdd -> {
      ...
    });
  }

and now I have this:

  JavaPairDStream<String, Long> countStream = streamingContext.union(streams.get(0), streams.subList(1,streams.size()));
  JavaPairDStream<String, Tuple2<Long, Long>> joinStream = sizeStream.join(countStream);
  joinStream.foreachRDD(rdd -> {
    ...
  });

Upvotes: 2

Related Questions