Reputation: 111
I'm using Spark Streaming for multiple custom receivers (2 receivers for different UDP data sockets, 1 for HTTP data) in my app. Receiver's transformations don't have any resource in common.
When count of input data was increased, I found that these 3 receivers works not in parallel, but one by one.
For example, if I set batch interval 20 seconds, every single receiver process data for about 5 seconds, but if all 3 receivers enabled together, their summary processing time = 3 * 5 second (approx), instead of 5 seconds.
So I created this test, and saw the same situation.
Environment: Core i5, 4 cores, 16 GB of memory.
2 UDP receivers for 4 cores (so it's enough for receive and process). Transformations for dstreams are strange and aren't cached (persisted), but for test purposes only
Question: what's wrong and how I can enable parallel processing?
Spark web ui picture shows, that receiver's info process one by one.
@Slf4j
public class SparkApp {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[*]")
.setAppName("ParallelReceiver");
// no changes in processing
conf.set("spark.cores.max", "4");
// undocumented, has some effect for parallel processing (spark web ui),
// but not for the whole processing time
conf.set("spark.streaming.concurrentJobs", "10");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
RunCalc runCalc1 = new RunCalc(jssc, 5216, 2000, "1");
runCalc1.service();
RunCalc runCalc2 = new RunCalc(jssc, 5217, 2000, "2");
runCalc2.service();
jssc.start();
jssc.awaitTermination();
}
}
@Data
@Slf4j
public class RunCalc {
private final JavaStreamingContext jssc;
private final int port;
private final Integer defaultBitrate;
private final String suff;
public void service() {
// get stream nginx log data from UDP
JavaReceiverInputDStream<NginxRaw> records = jssc.receiverStream(new UdpReceiver(port, defaultBitrate));
records.print();
calc(records, suff);
records.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, suff));
}
private void calc(JavaReceiverInputDStream<NginxRaw> records, String suff) {
// first operation
JavaDStream<Integer> reduce = records.filter(r -> r.getChannelName() != null)
.map(NginxRaw::getBytesSent)
.reduce((r1, r2) -> r1 + r2);
reduce.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "reduce" + "-" + suff));
// second operation
JavaPairDStream<String, NginxRaw> uidRawPairs = records.mapToPair(r -> new Tuple2<>(r.getMac()
.toUpperCase(), r))
.window(Durations.minutes(1), Durations.minutes(1));
JavaPairDStream<String, Iterable<NginxRaw>> groups = uidRawPairs.groupByKey();
JavaPairDStream<String, Long> uidSizePairs = groups.mapValues(v -> v.spliterator()
.getExactSizeIfKnown());
uidSizePairs.foreachRDD(rdd -> DebugUtil.saveTestDataToDisk(rdd, "uidSizeWindowCalc" + "-" + suff));
}
}
@Slf4j
public class UdpReceiver extends Receiver<NginxRaw> {
private final int port;
private final int defaultBitrate;
private DatagramSocket socket;
public UdpReceiver(int port, int defaultBitrate) {
super(StorageLevel.MEMORY_AND_DISK());
this.port = port;
this.defaultBitrate = defaultBitrate;
}
@Override
public void onStart() {
new Thread(this::receive).start();
}
@Override
public void onStop() {
}
private void receive() {
try {
log.debug("receive");
log.debug("thread: {}", Thread.currentThread());
String row;
initSocket();
byte[] receiveData = new byte[5000];
// Until stopped or connection broken continue reading
while (!isStopped()) {
DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
socket.receive(receivePacket);
byte[] data = receivePacket.getData();
row = new String(data, 0, receivePacket.getLength());
NginxRaw rawLine = new NginxRaw(row, defaultBitrate);
filterAndSave(rawLine);
}
socket.close();
// Restart in an attempt to connect again when server is active again
log.debug("Trying to connect again");
restart("Trying to connect again");
} catch (ConnectException e) {
// restart if could not connect to server
log.error("Could not connect", e);
reportError("Could not connect: ", e);
restart("Could not connect", e);
} catch (Throwable e) {
// restart if there is any other error
log.error("Error receiving data", e);
reportError("Error receiving data: ", e);
restart("Error receiving data", e);
}
}
/**
* connect to the server
*/
private void initSocket() {
if (socket == null) {
try {
socket = new DatagramSocket(null);
socket.setReuseAddress(true);
socket.setBroadcast(true);
socket.bind(new InetSocketAddress(port));
} catch (SocketException e) {
log.debug("Error = {}", e);
e.printStackTrace();
}
}
}
private void filterAndSave(NginxRaw rawLine) {
if (!rawLine.getMac()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&&
!rawLine.getChannelName()
.equals(SyslogRaw.SYSLOG_NOT_FILLED_STRING)
&& !rawLine.getChannelName()
.equals("vod")
&& !rawLine.getIp()
.equals("127.0.0.1")) {
store(rawLine);
}
}
}
Upvotes: 4
Views: 1676
Reputation: 2261
I had a similar issue: multiple receivers for the same queue, but data was processed serially. The fix was very simple: I unify and join all streams into a single one!
Before I had this:
sizeStream.foreachRDD(rdd -> {
...
});
for (JavaPairDStream<String, Long> dstream : streams) {
dstream.foreachRDD(rdd -> {
...
});
}
and now I have this:
JavaPairDStream<String, Long> countStream = streamingContext.union(streams.get(0), streams.subList(1,streams.size()));
JavaPairDStream<String, Tuple2<Long, Long>> joinStream = sizeStream.join(countStream);
joinStream.foreachRDD(rdd -> {
...
});
Upvotes: 2