Reputation: 603
I am using Scala 2.12 and we have Elasticsearch 5.2.2. My requirement is for only fetch/search based on the criteria. The search will return more than 10,000 documents or messages at one go. So I cannot use the regular search. The data (each doc/message) is a complex JSON, which I can later parse. So I need to fetch such all messages and store it in a single list of Json or anything. I am not so fluent in Scala.I can use Elastic4s in scala for search. I see it has scroll and scan option, but didn't find any full working example. So looking for some help.
I see some sample code as below, but need more help to fetch everything and put everything as above:
client.execute {
search in "index" / "type" query <yourquery> scroll "1m"
}
client.execute {
search scroll <id>
}
But how to get the scroll id and how to proceed to get all the data?
Update:
The scala version and ES version are mentioned above.
I am using the following example:
SBT:
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-core" % "7.0.2"
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "5.5.10"
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % "6.5.1"
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "5.6.0"
Code:
import com.sksamuel.elastic4s.ElasticsearchClientUri
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.http.Response
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.HttpClient
import com.sksamuel.elastic4s.http.ElasticDsl._
val client = HttpClient(ElasticsearchClientUri("host", 9200))
val resp1 = client.execute {
search("index")
.matchQuery("key", "value")
.scroll("1m")
.limit(500)
}.await.result
val resp2 = client.execute {
searchScroll(resp1.scrollId.get).keepAlive(1.minute)
}.await
I think I am not using the correct versions for elastic4s modules.
Isuses:
import com.sksamuel.elastic4s.HttpClient: It is not recognizing the HttpClient class. As it is showing error HttpClient not found when I am trying to initialize the "client" variable.
Next, in my resp2, when I am trying to get the "scrollId", it is not recognizing that. How to fetch the scrollId from resp1?
Basically, what is missing here?
Update 2:
I changed the version of below dependencies as per the example on github (samples)
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-http" % "6.3.3"
Code:
val client = ElasticClient(ElasticProperties("http://host:9200"))
Now, I am getting the following the error;
Error:
Symbol 'type <none>.term.BuildableTermsQuery' is missing from the classpath.
[error] This symbol is required by 'method com.sksamuel.elastic4s.http.search.SearchHandlers.BuildableTermsNoOp'.
[error] Make sure that type BuildableTermsQuery is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'SearchHandlers.class' was compiled against an incompatible version of <none>.term.
[error] val client = ElasticClient(ElasticProperties("host:9200"))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
Upvotes: 2
Views: 1349
Reputation: 132
Personally I would use Akka Streams for this type of workflow as it makes parallel-processing and workflow construction easier.
The reference documentation can be a bit dense, but the basic idea is you start with a Source that has one output... push it through any number of Flows... then collect in a Sink.
Elastic4s supports using this (nearly) natively so you don't have to directly handle the scrolls etc.
Now, I don't know what you want to do with your records. But to create a Source for your data would be something like this:
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
class MyIndexer(indexName:String) {
def getIndexSource(client:ElasticClient)(implicit actorRefFactory: ActorRefFactory) = Source.fromPublisher(
client.publisher(search(indexName) (your-query-here) sortByFieldAsc "originalSource.ctime" scroll "5m")
)
}
calling MyIndexer.getIndexSource
will give you back a Source[SearchHit]
. You can then convert the SearchHit to your domain object however you would normally handle the result from Elastic4s (in my case, with Circe's generic.auto; same as when you use the non-streaming interface you can use .to[Domainobject]).
You're probably wondering about that ActorRefFactory
implicit; that is the akka ActorSystem. If you're working in something like Play framework you can get this for free by using dependency injection to request an instance of ActorSystem in any injected class (i.e. class MyClass @Inject() (implicit sys:ActorSystem)
). If you're using plain Scala you could do this in your Main function:
private implicit val actorSystem = ActorSystem("some-name-here")
private implicit val mat:Materializer = ActorMaterializer.create(actorSystem)
and use implicit parameters to thread those values through to where they are needed.
An example of how to use this to get a sequence of all results (probably not exactly what you need, given the description, but a good example) would work something like this:
import com.sksamuel.elastic4s.circe._
import io.circe.generic.auto._
val source = indexer.getIndexSource(esclient)
val resultFuture = source
.log("logger-name-here")
.map(_.to[Yourdomainobject])
.toMat(Sink.seq[Yourdomainobject])(Keep.right)
.run()
resultFuture
.map(resultSeq=>{ do stuff with result seq })
.recover({
case err:Throwable=>{handle error}
})
Now, if you want to do your processing efficiently you'll want to implement your processing as GraphStages and bolt it into the stream here. I've been implementing a bunch of scanners that work over several hundred thousand objects and each one is nothing more than a Main function that sets up and runs a stream that does all of the actual processing.
I tend to design my logic as a flowchart and then implement each box of the chart as a seperate akka GraphStage then bolt them together and use builtin elements like Broadcast and Merge to get good parallel processing.
Hope that this is of some use!
Upvotes: 3