avidlearner
avidlearner

Reputation: 253

Read data from Redis to Flink

I have been trying to find a connector to read data from Redis to Flink. Flink's documentation contains the description for a connector to write to Redis. I need to read data from Redis in my Flink job. In Using Apache Flink for data streaming, Fabian has mentioned that it is possible to read data from Redis. What is the connector that can be used for the purpose?

Upvotes: 1

Views: 5198

Answers (4)

David Dreyfus
David Dreyfus

Reputation: 11

One of challenges in getting your Flink program to use Jedis to talk to Redis is getting the appropriate libraries into the JAR file you submit to Flink. Absent this, you will get call stacks indicating certain classes are undefined. Here is a snippet of a Maven pom.xml I created to move Redis and its dependent component, apache commons-pool2, into my JAR.

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>2.9</version>
            <executions>
                <execution>
                    <id>unpack</id>
                    <!-- executed just before the package phase -->
                    <!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
                    <phase>prepare-package</phase>
                    <goals>
                        <goal>unpack</goal>
                    </goals>
                    <configuration>
                        <artifactItems>
                            <artifactItem>
                                <groupId>org.apache.commons</groupId>
                                <artifactId>commons-pool2</artifactId>
                                <version>2.4.2</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                <includes>org/apache/commons/**</includes>
                            </artifactItem>
                            <artifactItem>
                                <groupId>redis.clients</groupId>
                                <artifactId>jedis</artifactId>
                                <version>2.9.0</version>
                                <type>jar</type>
                                <overWrite>false</overWrite>
                                <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                <includes>redis/clients/**</includes>
                            </artifactItem>

                        </artifactItems>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

Upvotes: 0

ariskk
ariskk

Reputation: 166

We are running one in production that looks roughly like this

class RedisSource extends RichSourceFunction[SomeDataType] {

  var client: RedisClient = _

  override def open(parameters: Configuration) = {
    client = RedisClient() // init connection etc
  }

  @volatile var isRunning = true

  override def cancel(): Unit = {
    isRunning = false
    client.close()
  }

  override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
      for {
        data <- ??? // get some data from the redis client
      } yield ctx.collect(SomeDataType(data))

  }
} 

I think it really depends on what you need to fetch from redis. The above could be used to fetch a message from a list/queue, transform/push and then delete it form the queue. Redis also supports Pub/Sub, so it would possible to subscribe, grab the SourceConext and push messages downstream.

Upvotes: 1

jainnidhi
jainnidhi

Reputation: 31

Currently, Flink Redis Connector is not available but it can be implemented by extending RichSinkFunction/SinkFunction class.

public class RedisSink extends RichSinkFunction<String> {

  @Override
  public void open(Configuration parameters) throws Exception {
      //open redis connection
  }

  @Override
  public void invoke(String map) throws Exception {
     //sink data to redis
  }

  @Override
  public void close() throws Exception {
     super.close();
  }

}

Upvotes: 1

David Anderson
David Anderson

Reputation: 43697

There's been a bit of discussion about having a streaming redis source connector for Apache Flink (see FLINK-3033), but there isn't one available. It shouldn't be difficult to implement one, however.

Upvotes: 0

Related Questions