1 package com.acumenvelocity.ath.solr.tm;
2
3 import java.io.IOException;
4 import java.util.Iterator;
5 import java.util.NoSuchElementException;
6 import java.util.UUID;
7
8 import org.apache.solr.client.solrj.SolrClient;
9 import org.apache.solr.client.solrj.SolrQuery;
10 import org.apache.solr.client.solrj.SolrServerException;
11 import org.apache.solr.client.solrj.response.QueryResponse;
12 import org.apache.solr.common.SolrDocument;
13 import org.apache.solr.common.SolrDocumentList;
14 import org.apache.solr.common.params.CursorMarkParams;
15
16 import com.acumenvelocity.ath.common.Const;
17 import com.acumenvelocity.ath.common.Log;
18
19 import net.sf.okapi.common.Event;
20 import net.sf.okapi.common.EventType;
21 import net.sf.okapi.common.MimeTypeMapper;
22 import net.sf.okapi.common.exceptions.OkapiException;
23 import net.sf.okapi.common.filters.AbstractFilter;
24 import net.sf.okapi.common.resource.Ending;
25 import net.sf.okapi.common.resource.Property;
26 import net.sf.okapi.common.resource.RawDocument;
27 import net.sf.okapi.common.resource.StartDocument;
28 import net.sf.okapi.common.resource.TextContainer;
29 import net.sf.okapi.common.resource.TextUnit;
30
31
32
33
34
35
36
37
38
39
40
41 public class SolrTmFilter extends AbstractFilter {
42
43 private static final int STREAM_PAGE_SIZE = 500;
44 private static final String CURSOR_SORT_FIELD = "id";
45 private static final String TM_QUERY_TEMPLATE = "tmId:\"{}\"";
46
47 private final SolrClient solrClient;
48 private final String tmCollection;
49 private final UUID tmId;
50 private final SolrQuery baseQuery;
51
52 private StreamingDocumentIterator documentStream;
53 private boolean startEventEmitted;
54 private boolean endEventEmitted;
55 private boolean isOperational;
56
57
58
59
60
61
62
63
64
65 public SolrTmFilter(SolrClient solrClient, String tmCollection, UUID tmId) {
66 if (solrClient == null || tmCollection == null || tmId == null) {
67 throw new IllegalArgumentException("Solr client, collection, and TM ID are mandatory");
68 }
69
70 setMimeType(MimeTypeMapper.DEFAULT_MIME_TYPE);
71 setName("okf_solrtm");
72 setDisplayName("Solr TM Filter");
73
74 this.solrClient = solrClient;
75 this.tmCollection = tmCollection;
76 this.tmId = tmId;
77
78 String queryString = Log.format(TM_QUERY_TEMPLATE, tmId);
79 this.baseQuery = new SolrQuery(queryString);
80
81 configureQueryForStreaming();
82 initializeState();
83 }
84
85
86
87
88 private void configureQueryForStreaming() {
89 this.baseQuery.setRows(STREAM_PAGE_SIZE);
90 this.baseQuery.setSort(CURSOR_SORT_FIELD, SolrQuery.ORDER.asc);
91 }
92
93
94
95
96 private void initializeState() {
97 this.documentStream = null;
98 this.startEventEmitted = false;
99 this.endEventEmitted = false;
100 this.isOperational = false;
101 }
102
103
104
105
106
107
108
109
110 @Override
111 public void open(RawDocument input) {
112 setSrcLoc(input.getSourceLocale());
113 setTrgLoc(input.getTargetLocale());
114
115 initializeState();
116
117 try {
118 validateSolrConnectivity();
119 this.documentStream = new StreamingDocumentIterator();
120 this.isOperational = true;
121 } catch (SolrServerException | IOException ex) {
122 throw new OkapiException("Cannot establish connection to TM collection: " + tmCollection, ex);
123 }
124 }
125
126
127
128
129
130
131
132 private void validateSolrConnectivity() throws SolrServerException, IOException {
133 solrClient.ping(tmCollection);
134 }
135
136
137
138
139
140
141 @Override
142 public boolean hasNext() {
143 if (!isOperational) {
144 return false;
145 }
146
147 if (!startEventEmitted) {
148 return true;
149 }
150
151 if (documentStream != null && documentStream.hasNext()) {
152 return true;
153 }
154
155 return !endEventEmitted;
156 }
157
158
159
160
161
162
163
164
165 @Override
166 public Event next() {
167 if (!hasNext()) {
168 throw new NoSuchElementException("No additional events in stream");
169 }
170
171 if (!startEventEmitted) {
172 startEventEmitted = true;
173 return produceStartEvent();
174 }
175
176 if (documentStream != null && documentStream.hasNext()) {
177 SolrDocument doc = documentStream.next();
178 return convertToTextUnitEvent(doc);
179 }
180
181 if (!endEventEmitted) {
182 endEventEmitted = true;
183 return produceEndEvent();
184 }
185
186 throw new NoSuchElementException("No additional events in stream");
187 }
188
189
190
191
192
193
194 private Event produceStartEvent() {
195 StartDocument sd = new StartDocument("tm-stream");
196 sd.setLocale(getSrcLoc());
197
198 return new Event(EventType.START_DOCUMENT, sd);
199 }
200
201
202
203
204
205
206 private Event produceEndEvent() {
207 Ending closer = new Ending("tm-stream");
208 return new Event(EventType.END_DOCUMENT, closer);
209 }
210
211
212
213
214
215
216
217
218 private Event convertToTextUnitEvent(SolrDocument doc) {
219 String unitId = extractUnitIdentifier(doc);
220 TextUnit unit = new TextUnit(unitId);
221
222 String sourceContent = extractSourceText(doc);
223 String targetContent = extractTargetText(doc);
224
225 if (sourceContent != null && !sourceContent.trim().isEmpty()) {
226 unit.setSource(new TextContainer(sourceContent));
227 }
228
229 if (targetContent != null && !targetContent.trim().isEmpty()) {
230 unit.setTarget(getTrgLoc(), new TextContainer(targetContent));
231 }
232
233 populateMetadataProperties(unit, doc);
234
235 return new Event(EventType.TEXT_UNIT, unit);
236 }
237
238
239
240
241
242
243
244 private String extractUnitIdentifier(SolrDocument doc) {
245 Object id = doc.getFieldValue("id");
246 return id != null ? id.toString() : "segment-" + System.nanoTime();
247 }
248
249
250
251
252
253
254
255
256 protected String extractSourceText(SolrDocument doc) {
257 String[] possibleFields = { "source", "src_text", "original", "content_src" };
258
259 for (String field : possibleFields) {
260 Object value = doc.getFieldValue(field);
261 if (value != null) {
262 return value.toString();
263 }
264 }
265
266 return null;
267 }
268
269
270
271
272
273
274
275
276 protected String extractTargetText(SolrDocument doc) {
277 String[] possibleFields = { "target", "tgt_text", "translation", "content_tgt" };
278
279 for (String field : possibleFields) {
280 Object value = doc.getFieldValue(field);
281 if (value != null) {
282 return value.toString();
283 }
284 }
285
286 return null;
287 }
288
289
290
291
292
293
294
295
296 protected void populateMetadataProperties(TextUnit unit, SolrDocument doc) {
297 copyFieldToProperty(unit, doc, Const.ATH_PROP_USER_ID);
298 copyFieldToProperty(unit, doc, Const.ATH_PROP_TM_ID);
299 copyFieldToProperty(unit, doc, Const.ATH_PROP_SRC_LANG);
300 copyFieldToProperty(unit, doc, Const.ATH_PROP_TRG_LANG);
301 copyFieldToProperty(unit, doc, Const.ATH_PROP_SOURCE_WITH_CODES);
302 copyFieldToProperty(unit, doc, Const.ATH_PROP_TARGET_WITH_CODES);
303 copyFieldToProperty(unit, doc, Const.ATH_PROP_CREATED_AT);
304 }
305
306
307
308
309
310
311
312
313 private void copyFieldToProperty(TextUnit unit, SolrDocument doc, String fieldName) {
314 Object value = doc.getFieldValue(fieldName);
315 if (value != null) {
316 unit.setProperty(new Property(fieldName, value.toString()));
317 }
318 }
319
320
321
322
323
324 @Override
325 public void close() {
326 isOperational = false;
327 documentStream = null;
328 }
329
330
331
332
333
334
335
336
337 public long estimateTotalSegments() throws OkapiException {
338 try {
339 SolrQuery countQuery = baseQuery.getCopy();
340 countQuery.setRows(0);
341 countQuery.remove(CursorMarkParams.CURSOR_MARK_PARAM);
342
343 QueryResponse response = solrClient.query(tmCollection, countQuery);
344 return response.getResults().getNumFound();
345 } catch (SolrServerException | IOException ex) {
346 throw new OkapiException("Unable to estimate segment count", ex);
347 }
348 }
349
350
351
352
353
354
355 public String getTmCollection() {
356 return tmCollection;
357 }
358
359
360
361
362
363
364 public UUID getTmId() {
365 return tmId;
366 }
367
368
369
370
371
372
373 public SolrQuery getQuery() {
374 return baseQuery.getCopy();
375 }
376
377
378
379
380
381
382 public boolean isActive() {
383 return isOperational;
384 }
385
386
387
388
389
390
391 private class StreamingDocumentIterator implements Iterator<SolrDocument> {
392
393 private String cursorPosition;
394 private Iterator<SolrDocument> currentPage;
395 private boolean moreDataAvailable;
396
397
398
399
400 StreamingDocumentIterator() {
401 this.cursorPosition = CursorMarkParams.CURSOR_MARK_START;
402 this.currentPage = null;
403 this.moreDataAvailable = true;
404 }
405
406 @Override
407 public boolean hasNext() {
408 if (!isOperational) {
409 return false;
410 }
411
412 if (currentPage != null && currentPage.hasNext()) {
413 return true;
414 }
415
416 if (!moreDataAvailable) {
417 return false;
418 }
419
420 try {
421 retrieveNextPage();
422 return currentPage != null && currentPage.hasNext();
423 } catch (Exception ex) {
424 throw new OkapiException("Failed to retrieve next page from Solr", ex);
425 }
426 }
427
428 @Override
429 public SolrDocument next() {
430 if (!hasNext()) {
431 throw new NoSuchElementException("Iterator exhausted");
432 }
433
434 return currentPage.next();
435 }
436
437
438
439
440
441
442
443
444 private void retrieveNextPage() throws SolrServerException, IOException {
445 SolrQuery pageQuery = baseQuery.getCopy();
446 pageQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorPosition);
447
448 QueryResponse response = solrClient.query(tmCollection, pageQuery);
449 SolrDocumentList documents = response.getResults();
450
451 if (documents.isEmpty()) {
452 moreDataAvailable = false;
453 currentPage = null;
454 return;
455 }
456
457 String nextCursor = response.getNextCursorMark();
458 if (cursorPosition.equals(nextCursor)) {
459 moreDataAvailable = false;
460 currentPage = documents.iterator();
461 } else {
462 cursorPosition = nextCursor;
463 currentPage = documents.iterator();
464 moreDataAvailable = true;
465 }
466 }
467 }
468 }