Alessio Rossotti
Alessio Rossotti

Reputation: 314

Check logs with Spark

I'm new to Spark and I'm trying to develop a python script that reads a csv file with some logs:

userId,timestamp,ip,event
13,2016-12-29 16:53:44,86.20.90.121,login
43,2016-12-29 16:53:44,106.9.38.79,login
66,2016-12-29 16:53:44,204.102.78.108,logoff
101,2016-12-29 16:53:44,14.139.102.226,login
91,2016-12-29 16:53:44,23.195.2.174,logoff

And checks if a user had some strange behaviors, for example if he has done two consecutive 'login' without doing 'logoff'. I've loaded the csv as a Spark dataFrame and I wanted to compare the log rows of a single user, ordered by timestamp and checking if two consecutive events are of the same type (login - login , logoff - logoff). I'm searching for doing it in a 'map-reduce' way, but at the moment I can't figure out how to use a reduce function that compares consecutive rows. The code I've written works, but the performance are very bad.

sc = SparkContext("local","Data Check")
sqlContext = SQLContext(sc)

LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
N_USERS = 10*1000

dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH)
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event")

wrongUsers = []

for i in range(0,N_USERS):

    userDataFrame = dataFrame.where(dataFrame['userId'] == i)
    userDataFrame = userDataFrame.sort('timestamp')

    prevEvent = ''

    for row in userDataFrame.rdd.collect():

        currEvent = row[3]
        if(prevEvent == currEvent):
            wrongUsers.append(row[0])

        prevEvent = currEvent

badUsers = sqlContext.createDataFrame(wrongUsers)
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

Upvotes: 2

Views: 1128

Answers (1)

Florent Moiny
Florent Moiny

Reputation: 451

First (not related but still), be sure that the number of entries per user is not that big because that collect in for row in userDataFrame.rdd.collect(): is dangerous.

Second, you don't need to leave the DataFrame area here to use classical Python, just stick to Spark.

Now, your problem. It's basically "for each line I want to know something from the previous line": that belongs to the concept of Window functions and to be precise the lag function. Here are two interesting articles about Window functions in Spark: one from Databricks with code in Python and one from Xinh with (I think easier to understand) examples in Scala.

I have a solution in Scala, but I think you'll pull it off translating it in Python:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag

import sqlContext.implicits._

val LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
val RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"

val data = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .option("inferSchema", "true")
  .option("header", "true") // use the header from your csv
  .load(LOG_FILE_PATH)

val wSpec = Window.partitionBy("userId").orderBy("timestamp")

val badUsers = data
  .withColumn("previousEvent", lag($"event", 1).over(wSpec))
  .filter($"previousEvent" === $"event")
  .select("userId")
  .distinct

badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)

Basically you just retrieve the value from the previous line and compare it to the value on your current line, if it's a match that is a wrong behavior and you keep the userId. For the first line in your "block" of lines for each userId, the previous value will be null: when comparing with the current value, the boolean expression will be false so no problem here.

Upvotes: 1

Related Questions