22
22
import org .elasticsearch .core .IOUtils ;
23
23
import org .elasticsearch .core .TimeValue ;
24
24
import org .elasticsearch .env .Environment ;
25
+ import org .elasticsearch .ingest .IngestService ;
25
26
import org .elasticsearch .ingest .Processor ;
26
27
import org .elasticsearch .ingest .common .IngestCommonPlugin ;
27
28
import org .elasticsearch .ingest .useragent .IngestUserAgentPlugin ;
46
47
import java .util .Map ;
47
48
import java .util .Objects ;
48
49
import java .util .Optional ;
50
+ import java .util .Set ;
49
51
import java .util .concurrent .TimeUnit ;
50
52
import java .util .function .Consumer ;
51
53
import java .util .function .Supplier ;
52
54
55
+ import static co .elastic .logstash .filters .elasticintegration .ingest .SafeSubsetIngestPlugin .safeSubset ;
53
56
import static com .google .common .util .concurrent .AbstractScheduledService .Scheduler .newFixedRateSchedule ;
54
57
55
58
@ SuppressWarnings ("UnusedReturnValue" )
@@ -78,7 +81,41 @@ public static EventProcessorBuilder fromElasticsearch(final RestClient elasticse
78
81
}
79
82
80
83
public EventProcessorBuilder () {
81
- this .addProcessorsFromPlugin (IngestCommonPlugin ::new );
84
+ this .addProcessorsFromPlugin (IngestCommonPlugin ::new , Set .of (
85
+ org .elasticsearch .ingest .common .AppendProcessor .TYPE ,
86
+ org .elasticsearch .ingest .common .BytesProcessor .TYPE ,
87
+ org .elasticsearch .ingest .common .CommunityIdProcessor .TYPE ,
88
+ org .elasticsearch .ingest .common .ConvertProcessor .TYPE ,
89
+ org .elasticsearch .ingest .common .CsvProcessor .TYPE ,
90
+ org .elasticsearch .ingest .common .DateIndexNameProcessor .TYPE ,
91
+ org .elasticsearch .ingest .common .DateProcessor .TYPE ,
92
+ org .elasticsearch .ingest .common .DissectProcessor .TYPE ,
93
+ "dot_expander" , // note: upstream constant is package-private
94
+ org .elasticsearch .ingest .DropProcessor .TYPE , // note: not in ingest-common
95
+ org .elasticsearch .ingest .common .FailProcessor .TYPE ,
96
+ org .elasticsearch .ingest .common .FingerprintProcessor .TYPE ,
97
+ org .elasticsearch .ingest .common .ForEachProcessor .TYPE ,
98
+ org .elasticsearch .ingest .common .GrokProcessor .TYPE ,
99
+ org .elasticsearch .ingest .common .GsubProcessor .TYPE ,
100
+ org .elasticsearch .ingest .common .HtmlStripProcessor .TYPE ,
101
+ org .elasticsearch .ingest .common .JoinProcessor .TYPE ,
102
+ org .elasticsearch .ingest .common .JsonProcessor .TYPE ,
103
+ org .elasticsearch .ingest .common .KeyValueProcessor .TYPE ,
104
+ org .elasticsearch .ingest .common .LowercaseProcessor .TYPE ,
105
+ org .elasticsearch .ingest .common .NetworkDirectionProcessor .TYPE ,
106
+ // note: no `pipeline` processor, as we provide our own
107
+ org .elasticsearch .ingest .common .RegisteredDomainProcessor .TYPE ,
108
+ org .elasticsearch .ingest .common .RemoveProcessor .TYPE ,
109
+ org .elasticsearch .ingest .common .RenameProcessor .TYPE ,
110
+ org .elasticsearch .ingest .common .RerouteProcessor .TYPE ,
111
+ org .elasticsearch .ingest .common .ScriptProcessor .TYPE ,
112
+ org .elasticsearch .ingest .common .SetProcessor .TYPE ,
113
+ org .elasticsearch .ingest .common .SortProcessor .TYPE ,
114
+ org .elasticsearch .ingest .common .SplitProcessor .TYPE ,
115
+ org .elasticsearch .ingest .common .TrimProcessor .TYPE ,
116
+ org .elasticsearch .ingest .common .URLDecodeProcessor .TYPE ,
117
+ org .elasticsearch .ingest .common .UppercaseProcessor .TYPE ,
118
+ org .elasticsearch .ingest .common .UriPartsProcessor .TYPE ));
82
119
this .addProcessorsFromPlugin (IngestUserAgentPlugin ::new );
83
120
this .addProcessor (SetSecurityUserProcessor .TYPE , SetSecurityUserProcessor .Factory ::new );
84
121
}
@@ -133,6 +170,10 @@ public EventProcessorBuilder addProcessor(final String type, final Supplier<Proc
133
170
return this .addProcessorsFromPlugin (SingleProcessorIngestPlugin .of (type , processorFactorySupplier ));
134
171
}
135
172
173
+ public EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier , Set <String > requiredProcessors ) {
174
+ return this .addProcessorsFromPlugin (safeSubset (pluginSupplier , requiredProcessors ));
175
+ }
176
+
136
177
public synchronized EventProcessorBuilder addProcessorsFromPlugin (Supplier <IngestPlugin > pluginSupplier ) {
137
178
this .ingestPlugins .add (pluginSupplier );
138
179
return this ;
@@ -158,16 +199,18 @@ synchronized EventProcessor build(final PluginContext pluginContext) {
158
199
final ScriptService scriptService = initScriptService (settings , threadPool );
159
200
resourcesToClose .add (scriptService );
160
201
202
+ final Environment env = new Environment (settings , null );
161
203
final Processor .Parameters processorParameters = new Processor .Parameters (
162
- new Environment ( settings , null ) ,
204
+ env ,
163
205
scriptService ,
164
206
null ,
165
207
threadPool .getThreadContext (),
166
208
threadPool ::relativeTimeInMillis ,
167
209
(delay , command ) -> threadPool .schedule (command , TimeValue .timeValueMillis (delay ), ThreadPool .Names .GENERIC ),
168
210
null ,
169
211
null ,
170
- threadPool .generic ()::execute
212
+ threadPool .generic ()::execute ,
213
+ IngestService .createGrokThreadWatchdog (env , threadPool )
171
214
);
172
215
173
216
IngestPipelineFactory ingestPipelineFactory = new IngestPipelineFactory (scriptService );
0 commit comments