condit
condit

Reputation: 10962

Custom SchemaIndexProviders and OnlineAccessors

I'm trying to build a custom indexing solution for Neo4j 2.1.2. The IndexPopulator gets notifications for all the population events before the index becomes online. After getOnlineAccessor is called, however, the IndexAccessor doesn't get notified about changes to nodes and properties. I was expecting process to get called with those events. Am I missing something about how custom indexes should be configured?

The provider factory:

@Service.Implementation(KernelExtensionFactory.class)
public class IndexProviderFactory extends KernelExtensionFactory<Dependencies> {

  public static final String KEY = "index";

  public SchemaIndexProvider.Descriptor DESCRIPTOR = new SchemaIndexProvider.Descriptor(KEY, "1.0");

  private final Supplier<CustomIndexSchemaProvider> provider =
      new Supplier<CustomIndexSchemaProvider>() {
        @Override
        public CustomIndexSchemaProvider get() {
          return new CustomIndexSchemaProvider();
        }
      };

  public IndexProviderFactory() {
    super(KEY);
  }

  @Override
  public Lifecycle newKernelExtension(Dependencies dependencies) throws Throwable {
    return Suppliers.memoize(provider).get();
  }

}

And the actual test case with the bulk of the custom provider:

public class IndexTest {

  static Logger logger = Logger.getLogger(IndexTest.class.getName());
  static Label LABEL = DynamicLabel.label("foo");

  @Test
  public void testUpdate() throws Exception {
    GraphDatabaseService db = new TestGraphDatabaseFactory().newImpermanentDatabase();
    try (Transaction tx = db.beginTx()) {
      final IndexCreator indexCreator = db.schema().indexFor(LABEL).on("bar");
      indexCreator.create();
      tx.success();
    }
    try (Transaction tx = db.beginTx()) {
      db.schema().awaitIndexesOnline(5, TimeUnit.SECONDS);
      tx.success();
    }
    try (Transaction tx = db.beginTx()) {
      Node node = db.createNode(LABEL);
      node.setProperty("bar", 1);
      tx.success();
    }
    db.shutdown();
  }

  static interface Dependencies {
    Config getConfig();
  }

  static class CustomIndexSchemaProvider extends SchemaIndexProvider {

    private static final Logger logger = Logger
        .getLogger(CustomIndexSchemaProvider.class.getName());

    LoggingIndex index = new LoggingIndex();

    public CustomIndexSchemaProvider() {
      super(DESCRIPTOR, 2);
    }

    @Override
    public String getPopulationFailure(long indexId) throws IllegalStateException {
      return null;
    }

    @Override
    public InternalIndexState getInitialState(long indexId) {
      return InternalIndexState.POPULATING;
    }

    @Override
    public IndexAccessor getOnlineAccessor(long indexId, IndexConfiguration config)
        throws IOException {
      logger.info("getOnlineAccessor " + indexId);
      return index;
    }

    @Override
    public IndexPopulator getPopulator(long indexId, IndexDescriptor descriptor,
        IndexConfiguration config) {
      logger.info("getPopulator for " + indexId);
      return index;
    }

    public static class LoggingIndex extends IndexAccessor.Adapter implements IndexPopulator,
        IndexUpdater, IndexAccessor {

      @Override
      public void add(long nodeId, Object propertyValue) {
        logger.info("add " + propertyValue);
      }

      @Override
      public void process(NodePropertyUpdate update) throws IOException,
          IndexEntryConflictException {
        logger.info("process " + update);
      }

      @Override
      public void remove(Iterable<Long> nodeIds) throws IOException {}

      @Override
      public void markAsFailed(String failure) throws IOException {}

      @Override
      public void create() {}

      @Override
      public void close(boolean populationCompletedSuccessfully) {}

      @Override
      public void verifyDeferredConstraints(PropertyAccessor accessor) throws Exception {}

      @Override
      public IndexUpdater newPopulatingUpdater(PropertyAccessor accessor) throws IOException {
        return this;
      }
    }

  }

}

Upvotes: 1

Views: 63

Answers (1)

condit
condit

Reputation: 10962

The OnlineIndexProxy calls newUpdater (not newPopulatingUpdater) in the IndexAccessor.Adapter. Adding this to my LoggingIndex solves the issue:

@Override
public IndexUpdater newUpdater(IndexUpdateMode mode) {
  return this;
}

Upvotes: 1

Related Questions