Reputation: 1
I am using pyspark and es-hadoop to access the records and I am getting an error that no fields can have dot (.). Logs are ingested by eve.json by filebeat using the module and pushed through to elasticsearch.
I defined a ingestor
PUT _ingest/pipeline/dot-expander-pipeline
{
"processors": [
{
"dot_expander": {
"field": "*"
}
}
]
}
and then set it as a final pipeline
PUT /filebeat-*/_settings
{
"settings": {
"index.final_pipeline": "dot-expander-pipeline"
}
}
It works, I have placed a test field to make sure but it does clean only top level fields and not all fields. There are hundrends of fields with dots from suricata so manually specifying them can be challenging especially because they can super nested.
I am looking for either a way to break any dot fields into more nested elements (as json) so that they key does not contain . in its name OR bypass this using pyspark somehow.
In pyspark I tried the following renaming (from . to _) but had zero effect
# Filter for DNS events and events from the past week
df = filebeat_df.filter((col("suricata.eve.event_type") == "dns") & (col("@timestamp") >= F.date_sub(F.current_date(), 7)))
# Replace dots with underscores in all column names
columns_transformed = [col(c).alias(c.replace('.', '_')) for c in df.columns]
df_transformed = df.select(*columns_transformed)
# Extract hour from timestamp
df_transformed = df_transformed.withColumn("hour", F.hour(df_transformed["@timestamp"]))
# Aggregate data by hour
agg_df = df_transformed.groupBy("hour").agg(F.count("hour").alias("event_count"))
# Create a VectorAssembler to assemble features
vector_assembler = VectorAssembler(inputCols=["event_count"], outputCol="features")
# Apply the VectorAssembler to the aggregated data
vectorized_data = vector_assembler.transform(agg_df)
# Train KMeans model
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(vectorized_data)
# Make predictions
predictions = model.transform(vectorized_data)
# Evaluate clustering performance (optional)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Display anomalies (cluster with fewer points)
anomalies = predictions.groupBy("prediction").count().filter(col("count") < your_threshold)
anomalies.show()`
Upvotes: 0
Views: 57