View Javadoc
1   package com.acumenvelocity.ath.solr.tm;
2   
3   import java.io.IOException;
4   import java.util.Iterator;
5   import java.util.NoSuchElementException;
6   import java.util.UUID;
7   
8   import org.apache.solr.client.solrj.SolrClient;
9   import org.apache.solr.client.solrj.SolrQuery;
10  import org.apache.solr.client.solrj.SolrServerException;
11  import org.apache.solr.client.solrj.response.QueryResponse;
12  import org.apache.solr.common.SolrDocument;
13  import org.apache.solr.common.SolrDocumentList;
14  import org.apache.solr.common.params.CursorMarkParams;
15  
16  import com.acumenvelocity.ath.common.Const;
17  import com.acumenvelocity.ath.common.Log;
18  
19  import net.sf.okapi.common.Event;
20  import net.sf.okapi.common.EventType;
21  import net.sf.okapi.common.MimeTypeMapper;
22  import net.sf.okapi.common.exceptions.OkapiException;
23  import net.sf.okapi.common.filters.AbstractFilter;
24  import net.sf.okapi.common.resource.Ending;
25  import net.sf.okapi.common.resource.Property;
26  import net.sf.okapi.common.resource.RawDocument;
27  import net.sf.okapi.common.resource.StartDocument;
28  import net.sf.okapi.common.resource.TextContainer;
29  import net.sf.okapi.common.resource.TextUnit;
30  
31  /**
32   * Streaming Solr translation memory filter designed for large-scale TM operations.
33   * Leverages Solr's deep paging capabilities through cursor marks to efficiently
34   * process millions of translation units without exhausting heap memory.
35   * 
36   * This filter transforms Solr query results into Okapi event streams suitable
37   * for integration with translation processing pipelines. Documents are retrieved
38   * in configurable page sizes and processed incrementally, making it ideal for
39   * production environments with substantial translation memory databases.
40   */
41  public class SolrTmFilter extends AbstractFilter {
42  
43    private static final int STREAM_PAGE_SIZE = 500;
44    private static final String CURSOR_SORT_FIELD = "id";
45    private static final String TM_QUERY_TEMPLATE = "tmId:\"{}\"";
46  
47    private final SolrClient solrClient;
48    private final String tmCollection;
49    private final UUID tmId;
50    private final SolrQuery baseQuery;
51  
52    private StreamingDocumentIterator documentStream;
53    private boolean startEventEmitted;
54    private boolean endEventEmitted;
55    private boolean isOperational;
56  
57    /**
58     * Constructs a streaming filter for a specific translation memory.
59     * 
60     * @param solrClient   Connection to the Solr instance
61     * @param tmCollection Target translation memory collection
62     * @param tmId         Translation memory identifier to filter by
63     * @throws IllegalArgumentException if any required parameter is null
64     */
65    public SolrTmFilter(SolrClient solrClient, String tmCollection, UUID tmId) {
66      if (solrClient == null || tmCollection == null || tmId == null) {
67        throw new IllegalArgumentException("Solr client, collection, and TM ID are mandatory");
68      }
69  
70      setMimeType(MimeTypeMapper.DEFAULT_MIME_TYPE);
71      setName("okf_solrtm");
72      setDisplayName("Solr TM Filter");
73  
74      this.solrClient = solrClient;
75      this.tmCollection = tmCollection;
76      this.tmId = tmId;
77  
78      String queryString = Log.format(TM_QUERY_TEMPLATE, tmId);
79      this.baseQuery = new SolrQuery(queryString);
80  
81      configureQueryForStreaming();
82      initializeState();
83    }
84  
85    /**
86     * Configures the query object for optimal cursor-based streaming.
87     */
88    private void configureQueryForStreaming() {
89      this.baseQuery.setRows(STREAM_PAGE_SIZE);
90      this.baseQuery.setSort(CURSOR_SORT_FIELD, SolrQuery.ORDER.asc);
91    }
92  
93    /**
94     * Resets state variables to their initial configuration.
95     */
96    private void initializeState() {
97      this.documentStream = null;
98      this.startEventEmitted = false;
99      this.endEventEmitted = false;
100     this.isOperational = false;
101   }
102 
103   /**
104    * Activates the filter and establishes the streaming connection to Solr.
105    * Validates connectivity before initializing the document iterator.
106    * 
107    * @param input Raw document wrapper providing filter context
108    * @throws OkapiException if Solr connectivity fails
109    */
110   @Override
111   public void open(RawDocument input) {
112     setSrcLoc(input.getSourceLocale());
113     setTrgLoc(input.getTargetLocale());
114     
115     initializeState();
116 
117     try {
118       validateSolrConnectivity();
119       this.documentStream = new StreamingDocumentIterator();
120       this.isOperational = true;
121     } catch (SolrServerException | IOException ex) {
122       throw new OkapiException("Cannot establish connection to TM collection: " + tmCollection, ex);
123     }
124   }
125 
126   /**
127    * Performs a health check on the Solr connection.
128    * 
129    * @throws SolrServerException if the server cannot be reached
130    * @throws IOException         if network communication fails
131    */
132   private void validateSolrConnectivity() throws SolrServerException, IOException {
133     solrClient.ping(tmCollection);
134   }
135 
136   /**
137    * Indicates whether more events are available in the processing stream.
138    * 
139    * @return true if additional events can be retrieved
140    */
141   @Override
142   public boolean hasNext() {
143     if (!isOperational) {
144       return false;
145     }
146 
147     if (!startEventEmitted) {
148       return true;
149     }
150 
151     if (documentStream != null && documentStream.hasNext()) {
152       return true;
153     }
154 
155     return !endEventEmitted;
156   }
157 
158   /**
159    * Retrieves the next event from the processing stream.
160    * Emits document boundary markers and text unit events in sequence.
161    * 
162    * @return The next available event
163    * @throws NoSuchElementException when the stream is depleted
164    */
165   @Override
166   public Event next() {
167     if (!hasNext()) {
168       throw new NoSuchElementException("No additional events in stream");
169     }
170 
171     if (!startEventEmitted) {
172       startEventEmitted = true;
173       return produceStartEvent();
174     }
175 
176     if (documentStream != null && documentStream.hasNext()) {
177       SolrDocument doc = documentStream.next();
178       return convertToTextUnitEvent(doc);
179     }
180 
181     if (!endEventEmitted) {
182       endEventEmitted = true;
183       return produceEndEvent();
184     }
185 
186     throw new NoSuchElementException("No additional events in stream");
187   }
188 
189   /**
190    * Generates the opening document boundary event.
191    * 
192    * @return Event signaling document stream initiation
193    */
194   private Event produceStartEvent() {
195     StartDocument sd = new StartDocument("tm-stream");
196     sd.setLocale(getSrcLoc());
197 
198     return new Event(EventType.START_DOCUMENT, sd);
199   }
200 
201   /**
202    * Generates the closing document boundary event.
203    * 
204    * @return Event signaling document stream completion
205    */
206   private Event produceEndEvent() {
207     Ending closer = new Ending("tm-stream");
208     return new Event(EventType.END_DOCUMENT, closer);
209   }
210 
211   /**
212    * Transforms a Solr document into an Okapi text unit event.
213    * Extracts source and target content along with metadata properties.
214    * 
215    * @param doc The Solr document to convert
216    * @return Event containing the translation unit
217    */
218   private Event convertToTextUnitEvent(SolrDocument doc) {
219     String unitId = extractUnitIdentifier(doc);
220     TextUnit unit = new TextUnit(unitId);
221 
222     String sourceContent = extractSourceText(doc);
223     String targetContent = extractTargetText(doc);
224 
225     if (sourceContent != null && !sourceContent.trim().isEmpty()) {
226       unit.setSource(new TextContainer(sourceContent));
227     }
228 
229     if (targetContent != null && !targetContent.trim().isEmpty()) {
230       unit.setTarget(getTrgLoc(), new TextContainer(targetContent));
231     }
232 
233     populateMetadataProperties(unit, doc);
234 
235     return new Event(EventType.TEXT_UNIT, unit);
236   }
237 
238   /**
239    * Extracts or synthesizes a unique identifier for the translation unit.
240    * 
241    * @param doc Source Solr document
242    * @return Unique identifier string
243    */
244   private String extractUnitIdentifier(SolrDocument doc) {
245     Object id = doc.getFieldValue("id");
246     return id != null ? id.toString() : "segment-" + System.nanoTime();
247   }
248 
249   /**
250    * Locates source language content by probing standard field names.
251    * Subclasses may override to customize field resolution.
252    * 
253    * @param doc Source Solr document
254    * @return Source text content or null
255    */
256   protected String extractSourceText(SolrDocument doc) {
257     String[] possibleFields = { "source", "src_text", "original", "content_src" };
258 
259     for (String field : possibleFields) {
260       Object value = doc.getFieldValue(field);
261       if (value != null) {
262         return value.toString();
263       }
264     }
265 
266     return null;
267   }
268 
269   /**
270    * Locates target language content by probing standard field names.
271    * Subclasses may override to customize field resolution.
272    * 
273    * @param doc Source Solr document
274    * @return Target text content or null
275    */
276   protected String extractTargetText(SolrDocument doc) {
277     String[] possibleFields = { "target", "tgt_text", "translation", "content_tgt" };
278 
279     for (String field : possibleFields) {
280       Object value = doc.getFieldValue(field);
281       if (value != null) {
282         return value.toString();
283       }
284     }
285 
286     return null;
287   }
288 
289   /**
290    * Enriches the text unit with metadata extracted from the Solr document.
291    * Subclasses can override to customize which properties are transferred.
292    * 
293    * @param unit Target text unit
294    * @param doc  Source Solr document
295    */
296   protected void populateMetadataProperties(TextUnit unit, SolrDocument doc) {
297     copyFieldToProperty(unit, doc, Const.ATH_PROP_USER_ID);
298     copyFieldToProperty(unit, doc, Const.ATH_PROP_TM_ID);
299     copyFieldToProperty(unit, doc, Const.ATH_PROP_SRC_LANG);
300     copyFieldToProperty(unit, doc, Const.ATH_PROP_TRG_LANG);
301     copyFieldToProperty(unit, doc, Const.ATH_PROP_SOURCE_WITH_CODES);
302     copyFieldToProperty(unit, doc, Const.ATH_PROP_TARGET_WITH_CODES);
303     copyFieldToProperty(unit, doc, Const.ATH_PROP_CREATED_AT);
304   }
305 
306   /**
307    * Copies a single field value from document to text unit properties.
308    * 
309    * @param unit      Target text unit
310    * @param doc       Source Solr document
311    * @param fieldName Field to copy
312    */
313   private void copyFieldToProperty(TextUnit unit, SolrDocument doc, String fieldName) {
314     Object value = doc.getFieldValue(fieldName);
315     if (value != null) {
316       unit.setProperty(new Property(fieldName, value.toString()));
317     }
318   }
319 
320   /**
321    * Terminates the filter and releases associated resources.
322    * The Solr client remains open as it's externally managed.
323    */
324   @Override
325   public void close() {
326     isOperational = false;
327     documentStream = null;
328   }
329 
330   /**
331    * Queries Solr for the total count of matching segments without retrieval.
332    * Useful for displaying progress indicators or estimating resource needs.
333    * 
334    * @return Total segment count matching the query
335    * @throws OkapiException if the count operation fails
336    */
337   public long estimateTotalSegments() throws OkapiException {
338     try {
339       SolrQuery countQuery = baseQuery.getCopy();
340       countQuery.setRows(0);
341       countQuery.remove(CursorMarkParams.CURSOR_MARK_PARAM);
342 
343       QueryResponse response = solrClient.query(tmCollection, countQuery);
344       return response.getResults().getNumFound();
345     } catch (SolrServerException | IOException ex) {
346       throw new OkapiException("Unable to estimate segment count", ex);
347     }
348   }
349 
350   /**
351    * Returns the collection name being queried.
352    * 
353    * @return Solr collection identifier
354    */
355   public String getTmCollection() {
356     return tmCollection;
357   }
358 
359   /**
360    * Returns the translation memory identifier.
361    * 
362    * @return TM ID being filtered
363    */
364   public UUID getTmId() {
365     return tmId;
366   }
367 
368   /**
369    * Provides read access to the query configuration.
370    * 
371    * @return Copy of the configured query
372    */
373   public SolrQuery getQuery() {
374     return baseQuery.getCopy();
375   }
376 
377   /**
378    * Indicates whether the filter is currently active.
379    * 
380    * @return true if filter is operational
381    */
382   public boolean isActive() {
383     return isOperational;
384   }
385 
386   /**
387    * Iterator implementation leveraging Solr's deep paging mechanism.
388    * Uses cursor marks to efficiently traverse large result sets without
389    * the performance penalties associated with traditional offset pagination.
390    */
391   private class StreamingDocumentIterator implements Iterator<SolrDocument> {
392 
393     private String cursorPosition;
394     private Iterator<SolrDocument> currentPage;
395     private boolean moreDataAvailable;
396 
397     /**
398      * Initializes the iterator at the first cursor position.
399      */
400     StreamingDocumentIterator() {
401       this.cursorPosition = CursorMarkParams.CURSOR_MARK_START;
402       this.currentPage = null;
403       this.moreDataAvailable = true;
404     }
405 
406     @Override
407     public boolean hasNext() {
408       if (!isOperational) {
409         return false;
410       }
411 
412       if (currentPage != null && currentPage.hasNext()) {
413         return true;
414       }
415 
416       if (!moreDataAvailable) {
417         return false;
418       }
419 
420       try {
421         retrieveNextPage();
422         return currentPage != null && currentPage.hasNext();
423       } catch (Exception ex) {
424         throw new OkapiException("Failed to retrieve next page from Solr", ex);
425       }
426     }
427 
428     @Override
429     public SolrDocument next() {
430       if (!hasNext()) {
431         throw new NoSuchElementException("Iterator exhausted");
432       }
433 
434       return currentPage.next();
435     }
436 
437     /**
438      * Fetches the subsequent page of documents using cursor pagination.
439      * Updates the cursor position for the next retrieval cycle.
440      * 
441      * @throws SolrServerException if the query fails
442      * @throws IOException         if network issues occur
443      */
444     private void retrieveNextPage() throws SolrServerException, IOException {
445       SolrQuery pageQuery = baseQuery.getCopy();
446       pageQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorPosition);
447 
448       QueryResponse response = solrClient.query(tmCollection, pageQuery);
449       SolrDocumentList documents = response.getResults();
450 
451       if (documents.isEmpty()) {
452         moreDataAvailable = false;
453         currentPage = null;
454         return;
455       }
456 
457       String nextCursor = response.getNextCursorMark();
458       if (cursorPosition.equals(nextCursor)) {
459         moreDataAvailable = false;
460         currentPage = documents.iterator();
461       } else {
462         cursorPosition = nextCursor;
463         currentPage = documents.iterator();
464         moreDataAvailable = true;
465       }
466     }
467   }
468 }