36
36
import java .time .ZoneId ;
37
37
import java .time .ZonedDateTime ;
38
38
import java .time .format .DateTimeFormatter ;
39
+ import java .time .format .DateTimeParseException ;
39
40
import java .util .UUID ;
40
41
import java .util .concurrent .BlockingQueue ;
41
42
import java .util .function .Consumer ;
43
+ import java .util .regex .Pattern ;
42
44
43
45
/**
44
46
* Must be thread-safe
@@ -53,6 +55,10 @@ public class K8SConsumer implements Consumer<FileRecord> {
53
55
private static final DateTimeFormatter format = DateTimeFormatter .ofPattern ("yyyy-MM-dd'T'HH:mm:ss.SSSSSSxxx" );
54
56
private final ZoneId timezoneId ;
55
57
58
+ // Validators
59
+ private static final Pattern hostnamePattern = Pattern .compile ("^[a-zA-Z0-9.-]+$" ); // Not perfect but filters basically all mistakes
60
+ private static final Pattern appNamePattern = Pattern .compile ("^[\\ x21-\\ x7e]+$" ); // DEC 33 - DEC 126 as specified in RFC5424
61
+
56
62
K8SConsumer (
57
63
AppConfig appConfig ,
58
64
KubernetesCachingAPIClient cacheClient ,
@@ -131,7 +137,26 @@ public void accept(FileRecord record) {
131
137
)
132
138
);
133
139
}
134
- Instant instant = Instant .parse (log .getTimestamp ());
140
+ Instant instant ;
141
+ try {
142
+ instant = Instant .parse (log .getTimestamp ());
143
+ }
144
+ catch (DateTimeParseException e ) {
145
+ throw new RuntimeException (
146
+ String .format (
147
+ "[%s] Can't parse timestamp <%s> properly for event from pod <%s/%s> on container <%s> in file %s/%s at offset %s: " ,
148
+ uuid ,
149
+ log .getTimestamp (),
150
+ namespace ,
151
+ podname ,
152
+ containerId ,
153
+ record .getPath (),
154
+ record .getFilename (),
155
+ record .getStartOffset ()
156
+ ),
157
+ e
158
+ );
159
+ }
135
160
ZonedDateTime zdt = instant .atZone (timezoneId );
136
161
String timestamp = zdt .format (format );
137
162
@@ -152,34 +177,87 @@ public void accept(FileRecord record) {
152
177
JsonObject dockerMetadata = new JsonObject ();
153
178
dockerMetadata .addProperty ("container_id" , containerId );
154
179
155
- // Handle hostname and appname , use fallback values when labels are empty or if label not found
180
+ // Handle hostname and appName , use fallback values when labels are empty or if label not found
156
181
String hostname ;
157
- String appname ;
182
+ String appName ;
158
183
if (podMetadataContainer .getLabels () == null ) {
159
184
LOGGER .warn (
160
- "[{}] Can't resolve metadata and/or labels for container <{}>, using fallback values for hostname and appname " ,
185
+ "[{}] Can't resolve metadata and/or labels for container <{}>, using fallback values for hostname and appName " ,
161
186
uuid ,
162
187
containerId
163
188
);
164
189
hostname = appConfig .getKubernetes ().getLabels ().getHostname ().getFallback ();
165
- appname = appConfig .getKubernetes ().getLabels ().getAppname ().getFallback ();
190
+ appName = appConfig .getKubernetes ().getLabels ().getAppName ().getFallback ();
166
191
}
167
192
else {
168
193
hostname = podMetadataContainer .getLabels ().getOrDefault (
169
194
appConfig .getKubernetes ().getLabels ().getHostname ().getLabel (log .getStream ()),
170
195
appConfig .getKubernetes ().getLabels ().getHostname ().getFallback ()
171
196
);
172
- appname = podMetadataContainer .getLabels ().getOrDefault (
173
- appConfig .getKubernetes ().getLabels ().getAppname ().getLabel (log .getStream ()),
174
- appConfig .getKubernetes ().getLabels ().getAppname ().getFallback ()
197
+ appName = podMetadataContainer .getLabels ().getOrDefault (
198
+ appConfig .getKubernetes ().getLabels ().getAppName ().getLabel (log .getStream ()),
199
+ appConfig .getKubernetes ().getLabels ().getAppName ().getFallback ()
200
+ );
201
+ }
202
+ hostname = appConfig .getKubernetes ().getLabels ().getHostname ().getPrefix () + hostname ;
203
+ appName = appConfig .getKubernetes ().getLabels ().getAppName ().getPrefix () + appName ;
204
+
205
+ if (!hostnamePattern .matcher (hostname ).matches ()) {
206
+ throw new RuntimeException (
207
+ String .format (
208
+ "[%s] Detected hostname <[%s]> from pod <[%s]/[%s]> on container <%s> contains invalid characters, can't continue" ,
209
+ uuid ,
210
+ hostname ,
211
+ namespace ,
212
+ podname ,
213
+ containerId
214
+ )
215
+ );
216
+ }
217
+
218
+ if (hostname .length () >= 255 ) {
219
+ throw new RuntimeException (
220
+ String .format (
221
+ "[%s] Detected hostname <[%s]...> from pod <[%s]/[%s]> on container <%s> is too long, can't continue" ,
222
+ uuid ,
223
+ hostname .substring (0 ,30 ),
224
+ namespace ,
225
+ podname ,
226
+ containerId
227
+ )
228
+ );
229
+ }
230
+
231
+ if (!appNamePattern .matcher (appName ).matches ()) {
232
+ throw new RuntimeException (
233
+ String .format (
234
+ "[%s] Detected appName <[%s]> from pod <[%s]/[%s]> on container <%s> contains invalid characters, can't continue" ,
235
+ uuid ,
236
+ appName ,
237
+ namespace ,
238
+ podname ,
239
+ containerId
240
+ )
241
+ );
242
+ }
243
+ if (appName .length () > 48 ) {
244
+ throw new RuntimeException (
245
+ String .format (
246
+ "[%s] Detected appName <[%s]...> from pod <[%s]/[%s]> on container <%s> is too long, can't continue" ,
247
+ uuid ,
248
+ appName .substring (0 ,30 ),
249
+ namespace ,
250
+ podname ,
251
+ containerId
252
+ )
175
253
);
176
254
}
177
255
178
256
if (LOGGER .isDebugEnabled ()) {
179
257
LOGGER .debug (
180
258
"[{}] Resolved message to be {}@{} from {}/{} generated at {}" ,
181
259
uuid ,
182
- appname ,
260
+ appName ,
183
261
hostname ,
184
262
namespace ,
185
263
podname ,
@@ -205,8 +283,8 @@ public void accept(FileRecord record) {
205
283
SyslogMessage syslog = new SyslogMessage ()
206
284
.withTimestamp (timestamp , true )
207
285
.withSeverity (Severity .WARNING )
208
- .withHostname (appConfig . getKubernetes (). getLabels (). getHostname (). getPrefix () + hostname )
209
- .withAppName (appConfig . getKubernetes (). getLabels (). getAppname (). getPrefix () + appname )
286
+ .withHostname (hostname )
287
+ .withAppName (appName )
210
288
.withFacility (Facility .USER )
211
289
.withSDElement (SDMetadata )
212
290
.withMsg (new String (record .getRecord ()));
0 commit comments