Reputation: 1268
I have written a jar which has jedis connection pool feature, by using which I have written the groovy script in nifi for redis location search. But it is behaving stangely, sometimes it is working and sometimes not.
Redis.java
public class Redis {
private static Object staticLock = new Object();
private static JedisPool pool;
private static String host;
private static int port;
private static int connectTimeout;
private static int operationTimeout;
private static String password;
private static JedisPoolConfig config;
public static void initializeSettings(String host, int port, String password, int connectTimeout, int operationTimeout) {
Redis.host = host;
Redis.port = port;
Redis.password = password;
Redis.connectTimeout = connectTimeout;
Redis.operationTimeout = operationTimeout;
}
public static JedisPool getPoolInstance() {
if (pool == null) { // avoid synchronization lock if initialization has already happened
synchronized(staticLock) {
if (pool == null) { // don't re-initialize if another thread beat us to it.
JedisPoolConfig poolConfig = getPoolConfig();
boolean useSsl = port == 6380 ? true : false;
int db = 0;
String clientName = "MyClientName"; // null means use default
SSLSocketFactory sslSocketFactory = null; // null means use default
SSLParameters sslParameters = null; // null means use default
HostnameVerifier hostnameVerifier = new SimpleHostNameVerifier(host);
pool = new JedisPool(poolConfig, host, port);
//(poolConfig, host, port, connectTimeout,operationTimeout,password, db,
// clientName, useSsl, sslSocketFactory, sslParameters, hostnameVerifier);
}
}
}
return pool;
}
public static JedisPoolConfig getPoolConfig() {
if (config == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
int maxConnections = 200;
poolConfig.setMaxTotal(maxConnections);
poolConfig.setMaxIdle(maxConnections);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWaitMillis(operationTimeout);
poolConfig.setMinIdle(50);
Redis.config = poolConfig;
}
return config;
}
public static String getPoolCurrentUsage()
{
JedisPool jedisPool = getPoolInstance();
JedisPoolConfig poolConfig = getPoolConfig();
int active = jedisPool.getNumActive();
int idle = jedisPool.getNumIdle();
int total = active + idle;
String log = String.format(
"JedisPool: Active=%d, Idle=%d, Waiters=%d, total=%d, maxTotal=%d, minIdle=%d, maxIdle=%d",
active,
idle,
jedisPool.getNumWaiters(),
total,
poolConfig.getMaxTotal(),
poolConfig.getMinIdle(),
poolConfig.getMaxIdle()
);
return log;
}
private static class SimpleHostNameVerifier implements HostnameVerifier {
private String exactCN;
private String wildCardCN;
public SimpleHostNameVerifier(String cacheHostname)
{
exactCN = "CN=" + cacheHostname;
wildCardCN = "CN=*" + cacheHostname.substring(cacheHostname.indexOf('.'));
}
public boolean verify(String s, SSLSession sslSession) {
try {
String cn = sslSession.getPeerPrincipal().getName();
return cn.equalsIgnoreCase(wildCardCN) || cn.equalsIgnoreCase(exactCN);
} catch (SSLPeerUnverifiedException ex) {
return false;
}
}
}
}
CustomFunction:
public class Functions {
SecureRandom rand = new SecureRandom();
private static final String UTF8= "UTF-8";
public static JedisPool jedisPool=null;
public static String searchPlace(double lattitude,double longitude) {
try(Jedis jedis = jedisPool.getResource()) {
}
catch(Exception e){
log.error('execption',e);
}
}
}
Groovyscript:
import org.apache.nifi.processor.ProcessContext;
import com.customlib.functions.*;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def flowFiles = [] as List<FlowFile>
def failflowFiles = [] as List<FlowFile>
def input=null;
def data=null;
static onStart(ProcessContext context){
Redis.initializeSettings("host", 6379, null,0,0);
Functions.jedisPool= Redis.getPoolInstance();
}
static onStop(ProcessContext context){
Functions.jedisPool.destroy();
}
try{
log.warn('is jedispool connected::::'+Functions.jedisPool.isClosed());
def inputStream = session.read(flowFile)
def writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
data=writer.toString();
input = new JsonSlurper().parseText( data );
log.warn('place is::::'+Functions.getLocationByLatLong(input["data"]["lat"], input["data"]["longi"]);
.......
...........
}
catch(Exception e){
}
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( data.getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
failflowFiles<< newFlowFile;
}
session.transfer(flowFiles, REL_SUCCESS)
session.transfer(failflowFiles, REL_FAILURE)
session.remove(flowFile)
The nifi is in 3 node cluster. The function lib is configured in groovyscript module directory.In the above groovy script processor, the log statement is jedispool connected:::: is sometimes printing false
,sometimes true
but after deploying for the first time jar every time works. But later it is unpredictable, I am not getting what is wrong in the code. How the groovyscript will load the jar. How can I acheive the lib based search using groovy script.
Upvotes: 0
Views: 328
Reputation: 28634
Redis.pool never gets null after initialization. You are calling pool.destroy() but not setting it to null.
getPoolInstance() checks if pool is null only then it creates a new pool.
I don't see any reason to have 2 variables to hold reference to the same pool: in Redis and in Functions class.
Upvotes: 1