Skip to content

Add Lucene engine impl for pluggable data formats#21299

Draft
mgodwan wants to merge 1 commit intoopensearch-project:mainfrom
mgodwan:lucene-engine
Draft

Add Lucene engine impl for pluggable data formats#21299
mgodwan wants to merge 1 commit intoopensearch-project:mainfrom
mgodwan:lucene-engine

Conversation

@mgodwan
Copy link
Copy Markdown
Member

@mgodwan mgodwan commented Apr 20, 2026

Description

Add Lucene engine impl for pluggable data formats

Related Issues

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

// ── Default factories ──

private static final LuceneFieldFactory TEXT_FACTORY = (doc, ft, value) -> {
doc.add(new Field(ft.name(), value.toString(), TEXT_FIELD_TYPE));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this will be replaced in upcoming PRs with a full fledged support registry

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 20, 2026

PR Reviewer Guide 🔍

(Review updated until commit 28075a4)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Lucene per-generation writer and document input implementation

Relevant files:

  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriterCodec.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneDocumentInput.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFieldFactoryRegistry.java
  • sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/index/LuceneWriterTests.java

Sub-PR theme: Lucene indexing execution engine, plugin registration, and search back-end

Relevant files:

  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneDataFormat.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LucenePlugin.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitter.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneCommitterFactory.java
  • sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchBackEnd.java
  • sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngineTests.java
  • sandbox/plugins/analytics-backend-lucene/src/test/java/org/opensearch/be/lucene/LuceneReaderManagerTests.java

Sub-PR theme: Composite engine integration and DataFormatAwareEngine javadoc additions

Relevant files:

  • sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
  • sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeParquetIndexIT.java
  • server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java

⚡ Recommended focus areas for review

Assertion Risk

The assertion assert writerGenerations.isEmpty() on line 268 will only fire when assertions are enabled (-ea). If a writer generation cannot be correlated after addIndexes (e.g., due to a codec issue or segment renaming), this silently produces an incomplete result in production. Consider replacing with an explicit runtime check or logging a warning.

assert writerGenerations.isEmpty() : "Could not get segments from all writers";
Resource Leak

MMapDirectory instances are created inside sourceDirectories but if addIndexes throws an exception, the finally block closes them. However, if an exception occurs while building sourceDirectories (e.g., on the second iteration), previously opened directories may not be closed. The current code only closes directories in the finally block after the addIndexes call, but directories added before the exception in the loop are not tracked yet at that point.

    sourceDirectories.add(new HardlinkCopyDirectoryWrapper(new MMapDirectory(dirPath)));
    writerGenerations.add(wfs.writerGeneration());
}

// Single batched addIndexes call for all source directories
if (sourceDirectories.isEmpty() == false) {
    try {
        sharedWriter.addIndexes(sourceDirectories.toArray(new Directory[0]));
        logger.debug("Incorporated {} Lucene segments into shared writer in a single addIndexes call", sourceDirectories.size());
    } finally {
        // Close all source directories
        for (Directory dir : sourceDirectories) {
            try {
                dir.close();
            } catch (IOException e) {
                logger.warn("Failed to close source directory after addIndexes", e);
            }
        }
    }
Duplicate Directory

LuceneWriter uses Files.createDirectory (not createDirectories) to create lucene_gen_<generation>. If two writers with the same generation are created concurrently or if a previous run left the directory behind, this will throw FileAlreadyExistsException and fail writer creation. There is no guard or cleanup for this scenario.

Files.createDirectory(tempDirectory);
Wrong Log Message

In the constructor's FileAlreadyExistsException catch block, the log message references store.shardPath().resolve("parquet") instead of the actual baseDirectory (which is store.shardPath().resolve(LuceneDataFormat.LUCENE_FORMAT_NAME)). This will log a misleading path.

logger.warn("Directory already exists: {}", store.shardPath().resolve("parquet"));
TODO Unresolved

getSafeCommitInfo() always throws UnsupportedOperationException with a TODO comment. This method may be called by the engine during recovery or peer recovery operations, which would cause unexpected failures in production.

public SafeCommitInfo getSafeCommitInfo() {
    throw new UnsupportedOperationException("TODO:: with index deleter");

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 20, 2026

PR Code Suggestions ✨

Latest suggestions up to 28075a4
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace assert with explicit runtime check

Using assert for this validation means it is silently skipped when assertions are
disabled (the default in production JVMs). If a writer generation is not found in
the NRT reader after addIndexes, it indicates a data loss or corruption scenario
that should always be detected. Replace the assert with an explicit check that
throws an IllegalStateException.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [268]

-assert writerGenerations.isEmpty() : "Could not get segments from all writers";
+if (!writerGenerations.isEmpty()) {
+    throw new IllegalStateException("Could not get segments from all writers; missing generations: " + writerGenerations);
+}
Suggestion importance[1-10]: 7

__

Why: The assert statement is disabled in production JVMs by default, meaning a missing writer generation (indicating potential data loss) would go undetected. Replacing it with an explicit IllegalStateException ensures the error is always caught.

Medium
Guard sync against already-closed directory

Calling directory.listAll() after indexWriter.close() and directory.close() (which
happens in flush()) will throw an AlreadyClosedException. The sync() method should
be called before flush() closes the directory, or it should guard against the closed
state. Additionally, syncing all files (including write.lock and segments_*) is
unnecessary and potentially harmful; only the segment data files should be synced.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [180-183]

+@Override
 public void sync() throws IOException {
+    if (!indexWriter.isOpen()) {
+        throw new IllegalStateException("Cannot sync after writer has been closed by flush()");
+    }
     directory.sync(Arrays.asList(directory.listAll()));
     directory.syncMetaData();
 }
Suggestion importance[1-10]: 6

__

Why: The sync() method calls directory.listAll() which will throw AlreadyClosedException if called after flush() has closed the directory and indexWriter. Adding a guard check prevents this potential runtime error.

Low
Prevent directory resource leak on partial failure

If an exception occurs while opening one of the MMapDirectory or
HardlinkCopyDirectoryWrapper instances (e.g., for a later segment), the
already-opened directories in sourceDirectories will be leaked because the finally
block that closes them is only reached after the loop completes. The directory
should be opened inside a try-with-resources or the exception handling should ensure
partial lists are closed.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [211]

-sourceDirectories.add(new HardlinkCopyDirectoryWrapper(new MMapDirectory(dirPath)));
+Directory srcDir = new HardlinkCopyDirectoryWrapper(new MMapDirectory(dirPath));
+try {
+    sourceDirectories.add(srcDir);
+} catch (Exception e) {
+    srcDir.close();
+    throw e;
+}
Suggestion importance[1-10]: 5

__

Why: If an exception occurs while opening one of the directories in the loop, previously opened directories in sourceDirectories would be leaked since the finally block only runs after the loop completes. However, the improved_code doesn't actually prevent the leak for already-added directories, making the fix incomplete.

Low
General
Avoid recreating provider object on every call

A new anonymous IndexStoreProvider object (with a new providers map) is created on
every call to getProvider(). This is wasteful and could cause subtle issues if
callers compare provider identity. The composite provider should be built once
(e.g., lazily or in the constructor) and reused.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java [332-351]

+private final IndexStoreProvider compositeProvider = new IndexStoreProvider() {
+    private final Map<DataFormat, IndexStoreProvider> providers;
+    {
+        Map<DataFormat, IndexStoreProvider> tempProviders = new HashMap<>();
+        tempProviders.put(primaryEngine.getDataFormat(), primaryEngine.getProvider());
+        tempProviders.putAll(
+            secondaryEngines.stream()
+                .map(eng -> new AbstractMap.SimpleEntry<>(eng.getDataFormat(), eng.getProvider()))
+                .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue))
+        );
+        providers = tempProviders;
+    }
+
+    @Override
+    public FormatStore getStore(DataFormat dataFormat) {
+        return providers.get(dataFormat).getStore(dataFormat);
+    }
+};
+
 @Override
 public IndexStoreProvider getProvider() {
-    return new IndexStoreProvider() {
-        private final Map<DataFormat, IndexStoreProvider> providers;
-        {
-            Map<DataFormat, IndexStoreProvider> tempProviders = new HashMap<>();
-            tempProviders.put(primaryEngine.getDataFormat(), primaryEngine.getProvider());
-            tempProviders.putAll(
-                secondaryEngines.stream()
-                    .map(eng -> new AbstractMap.SimpleEntry<>(eng.getDataFormat(), eng.getProvider()))
-                    .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue))
-            );
-            providers = tempProviders;
-        }
-
-        @Override
-        public FormatStore getStore(DataFormat dataFormat) {
-            return providers.get(dataFormat).getStore(dataFormat);
-        }
-    };
+    return compositeProvider;
 }
Suggestion importance[1-10]: 4

__

Why: Creating a new IndexStoreProvider instance with a new providers map on every getProvider() call is wasteful. Caching it as a field would be more efficient, though the functional impact is minor since the map contents would be the same each time.

Low

Previous suggestions

Suggestions up to commit 9dd0022
CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace assertion with hard runtime check

Java assertions are disabled by default in production JVMs (unless -ea is passed).
If not all writer generations are found in the NRT reader after addIndexes, this
will silently pass in production, potentially causing data loss or incorrect catalog
snapshots. This should be a hard runtime check (e.g., throw an
IllegalStateException) rather than an assertion.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [268]

-assert writerGenerations.isEmpty() : "Could not get segments from all writers";
+if (!writerGenerations.isEmpty()) {
+    throw new IllegalStateException("Could not get segments from all writers; missing generations: " + writerGenerations);
+}
Suggestion importance[1-10]: 7

__

Why: This is a valid and important concern: Java assertions are disabled by default in production, so a missing writer generation would silently pass, potentially causing incorrect catalog snapshots or data loss. Replacing the assertion with an IllegalStateException is a meaningful correctness improvement.

Medium
Guard against double-close after flush

After indexWriter.close() and directory.close() are called in flush(), the close()
method still attempts to call indexWriter.rollback() and directory.close() again.
While there are try/catch blocks, calling rollback() on an already-closed
IndexWriter may throw an AlreadyClosedException. A boolean flag (e.g., flushed)
should be set after flush to guard against double-close in close().

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [167-169]

 // Since flush is once only, we can close the write post this.
 indexWriter.close();
 directory.close();
+this.flushed = true;
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that close() may attempt to call indexWriter.rollback() and directory.close() after flush() has already closed them. The close() method has try/catch blocks that swallow exceptions, so this is handled gracefully, but adding a flushed flag would be cleaner and more explicit. The improved code only shows setting the flag but not using it in close(), making it incomplete.

Low
Prevent directory resource leaks during construction

The MMapDirectory instances created here are wrapped but never explicitly tracked
for closure in the failure path before addIndexes is called. If addIndexes throws,
the finally block closes them, but if an exception occurs while building
sourceDirectories (e.g., on a later iteration), previously created MMapDirectory
instances may leak. Consider using a try-with-resources or ensuring all directories
are closed in a single finally block that covers the entire construction loop.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [211]

-sourceDirectories.add(new HardlinkCopyDirectoryWrapper(new MMapDirectory(dirPath)));
+Directory mmap = new MMapDirectory(dirPath);
+try {
+    sourceDirectories.add(new HardlinkCopyDirectoryWrapper(mmap));
+} catch (Exception e) {
+    mmap.close();
+    throw e;
+}
Suggestion importance[1-10]: 5

__

Why: The suggestion identifies a real but unlikely resource leak: if MMapDirectory construction succeeds but HardlinkCopyDirectoryWrapper throws, the MMapDirectory would leak. However, the finally block already handles closing all directories after addIndexes, and the scenario where HardlinkCopyDirectoryWrapper throws during construction is very rare. The improved code is valid but the risk is low.

Low
General
Handle pre-existing temp directory on writer creation

Using Files.createDirectory will throw FileAlreadyExistsException if a writer with
the same generation already exists (e.g., due to a retry or crash recovery). This
could cause the engine to fail unexpectedly. Consider using Files.createDirectories
or checking for existence first, and handle the case where the directory already
exists (e.g., clean it up or fail with a clear error).

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [96-97]

-logger.info("Creating directory for temp lucene writer: " + tempDirectory);
+logger.info("Creating directory for temp lucene writer: {}", tempDirectory);
+if (Files.exists(tempDirectory)) {
+    IOUtils.rm(tempDirectory);
+    logger.warn("Removed existing temp directory for writer generation {}: {}", writerGeneration, tempDirectory);
+}
 Files.createDirectory(tempDirectory);
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that Files.createDirectory will throw if the directory already exists, which could happen during crash recovery or retries. The improved code adds cleanup logic and also fixes the string concatenation in the log statement to use parameterized logging. The fix is reasonable but the scenario may be handled at a higher level.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 9dd0022: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 20, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.30%. Comparing base (255c867) to head (9dd0022).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##               main   #21299    +/-   ##
==========================================
  Coverage     73.30%   73.30%            
+ Complexity    73756    73731    -25     
==========================================
  Files          5936     5936            
  Lines        335749   335753     +4     
  Branches      48396    48397     +1     
==========================================
+ Hits         246106   246109     +3     
+ Misses        70031    69926   -105     
- Partials      19612    19718   +106     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 28075a4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 28075a4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, minor suggestions raised

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants