forked from amazon-ion/ion-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIonCursorBinary.java
3771 lines (3540 loc) · 162 KB
/
IonCursorBinary.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.impl;
import com.amazon.ion.BufferConfiguration;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonException;
import com.amazon.ion.IonCursor;
import com.amazon.ion.IonType;
import com.amazon.ion.IvmNotificationConsumer;
import com.amazon.ion.SystemSymbols;
import com.amazon.ion.impl.bin.FlexInt;
import com.amazon.ion.impl.bin.OpCodes;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import static com.amazon.ion.impl.IonTypeID.DELIMITED_END_ID;
import static com.amazon.ion.impl.IonTypeID.ONE_ANNOTATION_FLEX_SYM_LOWER_NIBBLE_1_1;
import static com.amazon.ion.impl.IonTypeID.ONE_ANNOTATION_SID_LOWER_NIBBLE_1_1;
import static com.amazon.ion.impl.IonTypeID.SYSTEM_MACRO_INVOCATION_ID;
import static com.amazon.ion.impl.IonTypeID.SYSTEM_SYMBOL_VALUE;
import static com.amazon.ion.impl.IonTypeID.TWO_ANNOTATION_FLEX_SYMS_LOWER_NIBBLE_1_1;
import static com.amazon.ion.impl.IonTypeID.TWO_ANNOTATION_SIDS_LOWER_NIBBLE_1_1;
import static com.amazon.ion.impl.IonTypeID.TYPE_IDS_1_1;
import static com.amazon.ion.impl.bin.Ion_1_1_Constants.FLEX_SYM_MAX_SYSTEM_SYMBOL;
import static com.amazon.ion.impl.bin.Ion_1_1_Constants.FLEX_SYM_SYSTEM_SYMBOL_OFFSET;
import static com.amazon.ion.util.IonStreamUtils.throwAsIonException;
/**
* An IonCursor over binary Ion data, capable of buffering or skipping Ion values at any depth. Records byte
* indices of the value currently buffered, enabling parsers direct access to the bytes in the value's representation.
*/
class IonCursorBinary implements IonCursor {
private static final int LOWER_SEVEN_BITS_BITMASK = 0x7F;
private static final int HIGHEST_BIT_BITMASK = 0x80;
private static final int VALUE_BITS_PER_VARUINT_BYTE = 7;
// Note: because long is a signed type, Long.MAX_VALUE is represented in Long.SIZE - 1 bits.
private static final int MAXIMUM_SUPPORTED_VAR_UINT_BYTES = (Long.SIZE - 1) / VALUE_BITS_PER_VARUINT_BYTE;
private static final int IVM_START_BYTE = 0xE0;
private static final int IVM_FINAL_BYTE = 0xEA;
private static final int IVM_REMAINING_LENGTH = 3; // Length of the IVM after the first byte.
private static final int SINGLE_BYTE_MASK = 0xFF;
private static final int LIST_TYPE_ORDINAL = IonType.LIST.ordinal();
private static final IvmNotificationConsumer NO_OP_IVM_NOTIFICATION_CONSUMER = (x, y) -> {};
// Initial capacity of the stack used to hold ContainerInfo. Each additional level of nesting in the data requires
// a new ContainerInfo. Depths greater than 8 are assumed to be rare.
private static final int CONTAINER_STACK_INITIAL_CAPACITY = 8;
// When set as an 'endIndex', indicates that the value is delimited.
private static final int DELIMITED_MARKER = -1;
/**
* The kind of location at which `checkpoint` points.
*/
private enum CheckpointLocation {
BEFORE_UNANNOTATED_TYPE_ID,
BEFORE_ANNOTATED_TYPE_ID,
AFTER_SCALAR_HEADER,
AFTER_CONTAINER_HEADER
}
/**
* The state representing where the cursor left off after the previous operation.
*/
private enum State {
FILL,
FILL_DELIMITED,
SEEK,
SEEK_DELIMITED,
READY,
TERMINATED
}
/**
* State only used when the cursor's data source is refillable and the cursor is in slow mode.
*/
private static class RefillableState {
/**
* At this and all greater depths, the buffer is known to hold all values in their entirety. This means slow mode
* can be disabled until stepping out of this depth.
*/
int fillDepth = -1;
/**
* The current size of the internal buffer.
*/
long capacity;
/**
* The total number of bytes that have been discarded (shifted out of the buffer or skipped directly from
* the input stream).
*/
long totalDiscardedBytes = 0;
/**
* The state of the reader when in slow mode (when slow mode is disabled, the reader is always implicitly in the
* READY state). This enables the reader to complete the previous IonCursor API invocation if it returned a
* NEEDS_DATA event.
*/
State state;
/**
* The number of bytes that still need to be consumed from the input during a fill or seek operation.
*/
long bytesRequested = 0;
/**
* The maximum size of the buffer. If the user attempts to buffer more bytes than this, an exception will be raised.
*/
final int maximumBufferSize;
/**
* The number of bytes shifted left in the buffer during the current operation to make room for more bytes. This
* is needed when rewinding to a previous location, as any saved indices at that location will need to be
* shifted by this amount.
*/
long pendingShift = 0;
/**
* The source of data, for refillable streams.
*/
final InputStream inputStream;
/**
* Index of the first "pinned" byte in the buffer. Pinned bytes must be preserved in the buffer until un-pinned.
*/
long pinOffset = -1;
/**
* The target depth to which the reader should seek. This is used when a container is determined to be oversize
* while buffering one of its children.
*/
int targetSeekDepth = -1;
/**
* Handler invoked when a single value would exceed `maximumBufferSize`.
*/
BufferConfiguration.OversizedValueHandler oversizedValueHandler;
/**
* Indicates whether the current value is being skipped due to being oversized.
*/
boolean isSkippingCurrentValue = false;
/**
* The number of bytes of an oversized value skipped during single-byte read operations.
*/
int individualBytesSkippedWithoutBuffering = 0;
/**
* The last byte that was read without being buffered (due to the buffer exceeding the maximum size). This
* allows for one byte to be un-read even if an oversize value is being skipped. Un-reading is necessary
* when the cursor probes for, but does not find, an end delimiter.
*/
int lastUnbufferedByte = -1;
/**
* Whether to skip over annotation sequences rather than recording them for consumption by the user. This is
* used when probing forward in the stream for the end of a delimited container while remaining logically
* positioned on the current value. This is only needed in 'slow' mode because in quick mode the entire
* container is assumed to be buffered in its entirety and no probing occurs.
*/
private boolean skipAnnotations = false;
RefillableState(InputStream inputStream, int capacity, int maximumBufferSize, State initialState) {
this.inputStream = inputStream;
this.capacity = capacity;
this.maximumBufferSize = maximumBufferSize;
this.state = initialState;
}
}
/**
* Marks an argument group.
*/
private static class ArgumentGroupMarker {
/**
* Marks the start index of the current page in the argument group.
*/
long pageStartIndex = -1;
/**
* Marks the end index of the current page in the argument group. If -1, this indicates that the argument
* group is delimited and the end of the page has not yet been found.
*/
long pageEndIndex = -1;
/**
* For tagless groups, the primitive type of the tagless values in the group; otherwise, null. When null,
* there is always a single page of values in the group, and the end is reached either when an end delimiter
* is found (for delimited groups), or when the cursor's `peekIndex` reaches `pageEndIndex`. When non-null,
* there may be multiple pages of tagless values in the group; whenever the cursor reaches `pageEndIndex`, it
* must read a FlexUInt at that position to calculate the end index of the next page.
*/
TaglessEncoding taglessEncoding = null;
}
/**
* Dummy state that indicates the cursor has been terminated and that additional API calls will have no effect.
*/
private static final RefillableState TERMINATED_STATE = new RefillableState(null, -1, -1, State.TERMINATED);
/**
* Stack to hold container info. Stepping into a container results in a push; stepping out results in a pop.
*/
Marker[] containerStack = new Marker[CONTAINER_STACK_INITIAL_CAPACITY];
/**
* The index of the current container in `containerStack`.
*/
int containerIndex = -1;
/**
* The Marker representing the parent container of the current value.
*/
Marker parent = null;
ArgumentGroupMarker[] argumentGroupStack = new ArgumentGroupMarker[CONTAINER_STACK_INITIAL_CAPACITY];
int argumentGroupIndex = -1;
/**
* The start offset into the user-provided byte array, or 0 if the user provided an InputStream.
*/
private final long startOffset;
/**
* The index of the next byte in the buffer that is available to be read. Always less than or equal to `limit`.
*/
private long offset;
/**
* The index at which the next byte received will be written. Always greater than or equal to `offset`.
*/
long limit;
/**
* A slice of the current buffer. May be used to create ByteBuffer views over value representation bytes for
* quicker parsing.
*/
ByteBuffer byteBuffer;
/**
* The handler that will be notified when data is processed.
*/
private final BufferConfiguration.DataHandler dataHandler;
/**
* Marker for the sequence of annotation symbol IDs on the current value. If there are no annotations on
* the current value, the startIndex will be negative.
*/
final Marker annotationSequenceMarker = new Marker(-1, -1);
/**
* Holds both inline text markers and symbol IDs. If representing a symbol ID, the symbol ID value will
* be contained in the endIndex field, and the startIndex field will be -1.
*/
final MarkerList annotationTokenMarkers = new MarkerList(8);
/**
* Indicates whether the current value is annotated.
*/
boolean hasAnnotations = false;
/**
* Marker representing the current value.
*/
final Marker valueMarker = new Marker(-1, -1);
/**
* The index of the first byte in the header of the value at which the reader is currently positioned.
*/
long valuePreHeaderIndex = 0;
/**
* Type ID for the current value.
*/
IonTypeID valueTid = null;
/**
* Marker for the current inlineable field name.
*/
final Marker fieldTextMarker = new Marker(-1, -1);
/**
* The consumer to be notified when Ion version markers are encountered.
*/
private IvmNotificationConsumer ivmConsumer = NO_OP_IVM_NOTIFICATION_CONSUMER;
/**
* The event that occurred as a result of the last call to any of the cursor's IonCursor methods.
*/
IonCursor.Event event = IonCursor.Event.NEEDS_DATA;
/**
* The buffer in which the cursor stores slices of the Ion stream.
*/
byte[] buffer;
/**
* The major version of the Ion encoding currently being read.
*/
private int majorVersion = 1;
/**
* The minor version of the Ion encoding currently being read.
*/
int minorVersion = 0;
/**
* The field SID of the current value, if any.
*/
int fieldSid = -1;
/**
* The index of the first byte in the buffer that has not yet been successfully processed. The checkpoint is
* only advanced when sufficient progress has been made, e.g. when a complete value header has been processed, or
* a complete value has been skipped.
*/
private long checkpoint;
/**
* The index of the next byte to be read from the buffer.
*/
private long peekIndex;
/**
* The set of type IDs to use for Ion version currently active in the stream.
*/
private IonTypeID[] typeIds = IonTypeID.TYPE_IDS_NO_IVM;
/**
* Holds information necessary for reading from refillable input. Null if the cursor is byte-backed.
*/
private RefillableState refillableState;
/**
* Describes the byte at the `checkpoint` index.
*/
private CheckpointLocation checkpointLocation = CheckpointLocation.BEFORE_UNANNOTATED_TYPE_ID;
/**
* Indicates whether the cursor is in slow mode. Slow mode must be used when the input source is refillable (i.e.
* a stream) and the cursor has not buffered the current value's bytes. When slow mode is disabled, the cursor can
* consume bytes directly from its buffer without range checks or refilling. When a value has been buffered (see:
* `fillValue()`), its entire representation (including child values if applicable) can be read with slow mode
* disabled, resulting in better performance.
*/
boolean isSlowMode;
/**
* Indicates whether the current value extends beyond the end of the buffer.
*/
boolean isValueIncomplete = false;
/**
* The total number of bytes that had been consumed from the stream as of the last time progress was reported to
* the data handler.
*/
private long lastReportedByteTotal = 0;
/**
* The ID of the current macro invocation. When `isSystemInvocation` is true, a positive value indicates a system
* macro address, while a negative value indicates a system symbol ID. When `isSystemInvocation` is false, a
* positive value indicates a user macro address, while a negative value indicates that the cursor's current token
* is not a macro invocation.
*/
private long macroInvocationId = -1;
/**
* True if the given token represents a system invocation (either a system macro invocation or a system symbol
* value). When true, `macroInvocationId` is used to retrieve the ID of the system token.
*/
private boolean isSystemInvocation = false;
/**
* The type of the current value, if tagless. Otherwise, null.
*/
TaglessEncoding taglessType = null;
/**
* @return the given configuration's DataHandler, or null if that DataHandler is a no-op.
*/
private static BufferConfiguration.DataHandler getDataHandler(IonBufferConfiguration configuration) {
// Using null instead of a no-op handler enables a quick null check to skip calculating the amount of data
// processed, improving performance.
BufferConfiguration.DataHandler dataHandler = configuration.getDataHandler();
return dataHandler == IonBufferConfiguration.DEFAULT.getDataHandler() ? null : dataHandler;
}
/**
* Constructs a new fixed (non-refillable) cursor from the given byte array.
* @param configuration the configuration to use. The buffer size and oversized value configuration are unused, as
* the given buffer is used directly.
* @param buffer the byte array containing the bytes to read.
* @param offset the offset into the byte array at which the first byte of Ion data begins.
* @param length the number of bytes to be read from the byte array.
*/
IonCursorBinary(
final IonBufferConfiguration configuration,
byte[] buffer,
int offset,
int length
) {
this.dataHandler = getDataHandler(configuration);
peekIndex = offset;
valuePreHeaderIndex = offset;
checkpoint = peekIndex;
for (int i = 0; i < CONTAINER_STACK_INITIAL_CAPACITY; i++) {
containerStack[i] = new Marker(-1, -1);
}
for (int i = 0; i < CONTAINER_STACK_INITIAL_CAPACITY; i++) {
argumentGroupStack[i] = new ArgumentGroupMarker();
}
this.buffer = buffer;
this.startOffset = offset;
this.offset = offset;
this.limit = offset + length;
byteBuffer = ByteBuffer.wrap(buffer, offset, length);
isSlowMode = false;
refillableState = null;
}
/**
* @param value a non-negative number.
* @return the exponent of the next power of two greater than or equal to the given number.
*/
private static int logBase2(int value) {
return 32 - Integer.numberOfLeadingZeros(value == 0 ? 0 : value - 1);
}
/**
* @param value a non-negative number
* @return the next power of two greater than or equal to the given number.
*/
static int nextPowerOfTwo(int value) {
long highBit = Integer.toUnsignedLong(Integer.highestOneBit(value));
// Paraphrased from JLS 5.1.3. Narrowing Primitive Conversion
// For a narrowing conversion of a floating-point number to an integral type T
// If the input is not NaN and is not representable as the target type int, then the value must be too large
// (a positive value of large magnitude or positive infinity), and the result is the largest representable
// value of type int.
return (int)(double) ((highBit == value) ? value : highBit << 1);
}
/**
* Cache of configurations for fixed-sized streams. FIXED_SIZE_CONFIGURATIONS[i] returns a configuration with
* buffer size max(8, 2^i). Retrieve a configuration large enough for a given size using
* FIXED_SIZE_CONFIGURATIONS(logBase2(size)). Only supports sizes less than or equal to
* STANDARD_BUFFER_CONFIGURATION.getInitialBufferSize(). This limits the number of fixed-size configurations,
* keeping the footprint small and avoiding the need to create fixed-size configurations that require allocating
* really large initial buffers. Say the user provides a ByteArrayInputStream backed by a 2 GB byte array -- in
* that case, even though the data is fixed, the cursor should not allocate another 2 GB buffer to copy into. In
* that case, unless the user provides a custom buffer configuration, the cursor will just use the standard one
* that starts at 32K and copy into it incrementally as needed.
*/
private static final IonBufferConfiguration[] FIXED_SIZE_CONFIGURATIONS;
static {
int maxBufferSizeExponent = logBase2(IonBufferConfiguration.DEFAULT.getInitialBufferSize());
FIXED_SIZE_CONFIGURATIONS = new IonBufferConfiguration[maxBufferSizeExponent + 1];
for (int i = 0; i <= maxBufferSizeExponent; i++) {
// Create a buffer configuration for buffers of size 2^i. The minimum size is 8: the smallest power of two
// larger than the minimum buffer size allowed.
int size = Math.max(8, (int) Math.pow(2, i));
FIXED_SIZE_CONFIGURATIONS[i] = IonBufferConfiguration.Builder.from(IonBufferConfiguration.DEFAULT)
.withInitialBufferSize(size)
.withMaximumBufferSize(size)
.build();
}
}
/**
* Validates the given configuration.
* @param configuration the configuration to validate.
*/
private static void validate(IonBufferConfiguration configuration) {
if (configuration == null) {
throw new IllegalArgumentException("Buffer configuration must not be null.");
}
if (configuration.getInitialBufferSize() < 1) {
throw new IllegalArgumentException("Initial buffer size must be at least 1.");
}
if (configuration.getMaximumBufferSize() < configuration.getInitialBufferSize()) {
throw new IllegalArgumentException("Maximum buffer size cannot be less than the initial buffer size.");
}
}
/**
* Provides a fixed-size buffer configuration suitable for the given ByteArrayInputStream.
* @param inputStream the stream.
* @param alreadyReadLen the number of bytes already read from the stream. The configuration provided will allow
* enough space for these bytes.
* @return a fixed IonBufferConfiguration.
*/
private static IonBufferConfiguration getFixedSizeConfigurationFor(
ByteArrayInputStream inputStream,
int alreadyReadLen
) {
// Note: ByteArrayInputStream.available() can return a negative number because its constructor does
// not validate that the offset and length provided are actually within range of the provided byte array.
// Setting the result to 0 in this case avoids an error when looking up the fixed sized configuration.
int fixedBufferSize = Math.max(0, inputStream.available());
if (alreadyReadLen > 0) {
fixedBufferSize += alreadyReadLen;
}
if (IonBufferConfiguration.DEFAULT.getInitialBufferSize() > fixedBufferSize) {
return FIXED_SIZE_CONFIGURATIONS[logBase2(fixedBufferSize)];
}
return IonBufferConfiguration.DEFAULT;
}
/**
* Constructs a refillable cursor from the given input stream.
* @param configuration the configuration to use.
* @param alreadyRead the byte array containing the bytes already read (often the IVM).
* @param alreadyReadOff the offset into `alreadyRead` at which the first byte that was already read exists.
* @param alreadyReadLen the number of bytes already read from `alreadyRead`.
*/
IonCursorBinary(
IonBufferConfiguration configuration,
InputStream inputStream,
byte[] alreadyRead,
int alreadyReadOff,
int alreadyReadLen
) {
if (configuration == IonBufferConfiguration.DEFAULT) {
dataHandler = null;
if (inputStream instanceof ByteArrayInputStream) {
// ByteArrayInputStreams are fixed-size streams. Clamp the reader's internal buffer size at the size of
// the stream to avoid wastefully allocating extra space that will never be needed. It is still
// preferable for the user to manually specify the buffer size if it's less than the default, as doing
// so allows this branch to be skipped.
configuration = getFixedSizeConfigurationFor((ByteArrayInputStream) inputStream, alreadyReadLen);
}
} else {
validate(configuration);
dataHandler = getDataHandler(configuration);
}
peekIndex = 0;
checkpoint = 0;
for (int i = 0; i < CONTAINER_STACK_INITIAL_CAPACITY; i++) {
containerStack[i] = new Marker(-1, -1);
}
for (int i = 0; i < CONTAINER_STACK_INITIAL_CAPACITY; i++) {
argumentGroupStack[i] = new ArgumentGroupMarker();
}
this.buffer = new byte[configuration.getInitialBufferSize()];
this.startOffset = 0;
this.offset = 0;
this.limit = 0;
if (alreadyReadLen > 0) {
System.arraycopy(alreadyRead, alreadyReadOff, buffer, 0, alreadyReadLen);
limit = alreadyReadLen;
}
byteBuffer = ByteBuffer.wrap(buffer, 0, configuration.getInitialBufferSize());
isSlowMode = true;
refillableState = new RefillableState(
inputStream,
configuration.getInitialBufferSize(),
configuration.getMaximumBufferSize(),
State.READY
);
registerOversizedValueHandler(configuration.getOversizedValueHandler());
}
/*
* This class contains methods with the prefix 'slow', which usually have
* counterparts with the prefix 'unchecked'. The 'unchecked' variants are
* faster because they are always called from contexts where the buffer is
* already known to hold all bytes that will be required or available to
* complete the call, and therefore no filling or bounds checking is required.
* Sometimes a 'slow' method does not have an 'unchecked' variant. This is
* typically because the 'unchecked' variant is simple enough to be written
* inline at the call site. Public API methods will delegate to either the
* 'slow' or 'unchecked' variant depending on whether 'isSlowMode' is true.
* In general, 'isSlowMode' may be disabled whenever the input source is a
* byte array (and therefore cannot grow and does not need to be filled),
* or whenever the current value's parent container has already been filled.
* Where a choice exists, 'slow' methods must call other 'slow' methods, and
* 'unchecked' methods must call other 'unchecked' methods.
*/
/* ---- Begin: internal buffer manipulation methods ---- */
/**
* @param index a byte index in the buffer.
* @return the number of bytes available in the buffer after the given index.
*/
private long availableAt(long index) {
return limit - index;
}
/**
* Ensures that there is space for at least 'minimumNumberOfBytesRequired' additional bytes in the buffer,
* growing the buffer if necessary. May consolidate buffered bytes to the beginning of the buffer, shifting indices
* accordingly.
* @param numberOfBytes the number of bytes starting at `index` that need to be present.
* @param index the index after which to fill.
* @return true if the buffer has sufficient capacity; otherwise, false.
*/
private boolean ensureCapacity(long numberOfBytes, long index) {
int maximumFreeSpace = refillableState.maximumBufferSize;
int startOffset = (int) offset;
if (refillableState.pinOffset > -1) {
maximumFreeSpace -= (int) (offset - refillableState.pinOffset);
startOffset = (int) refillableState.pinOffset;
}
long minimumNumberOfBytesRequired = numberOfBytes + (index - startOffset);
if (minimumNumberOfBytesRequired < 0) {
throw new IonException("The number of bytes required cannot be represented in a Java long.");
}
refillableState.bytesRequested = minimumNumberOfBytesRequired;
if (freeSpaceAt(startOffset) >= minimumNumberOfBytesRequired) {
// No need to shift any bytes or grow the buffer.
return true;
}
if (minimumNumberOfBytesRequired > maximumFreeSpace) {
refillableState.isSkippingCurrentValue = true;
return false;
}
long shortfall = minimumNumberOfBytesRequired - refillableState.capacity;
if (shortfall > 0) {
int newSize = (int) Math.min(Math.max(refillableState.capacity * 2, nextPowerOfTwo((int) (refillableState.capacity + shortfall))), maximumFreeSpace);
byte[] newBuffer = new byte[newSize];
moveBytesToStartOfBuffer(newBuffer, startOffset);
refillableState.capacity = newSize;
buffer = newBuffer;
ByteOrder byteOrder = byteBuffer.order();
byteBuffer = ByteBuffer.wrap(buffer, (int) offset, (int) refillableState.capacity);
byteBuffer.order(byteOrder);
} else {
// The current capacity can accommodate the requested size; move the existing bytes to the beginning
// to make room for the remaining requested bytes to be filled at the end.
moveBytesToStartOfBuffer(buffer, startOffset);
}
return true;
}
/**
* Attempts to fill the buffer so that it contains at least `numberOfBytes` after `index`.
* @param index the index after which to fill.
* @param numberOfBytes the number of bytes starting at `index` that need to be present.
* @return false if not enough bytes were available in the stream to satisfy the request; otherwise, true.
*/
private boolean fillAt(long index, long numberOfBytes) {
long shortfall = numberOfBytes - availableAt(index);
if (shortfall > 0) {
if (ensureCapacity(numberOfBytes, index)) {
// Fill all the free space, not just the shortfall; this reduces I/O.
shortfall = refill(refillableState.bytesRequested);
} else {
// The request cannot be satisfied, but not because data was unavailable. Return normally; it is the
// caller's responsibility to recover.
shortfall = 0;
}
}
if (shortfall <= 0) {
refillableState.bytesRequested = 0;
refillableState.state = State.READY;
return true;
}
refillableState.state = State.FILL;
return false;
}
/**
* Moves all buffered (but not yet read) bytes from 'buffer' to the destination buffer.
* @param destinationBuffer the destination buffer, which may be 'buffer' itself or a new buffer.
*/
private void moveBytesToStartOfBuffer(byte[] destinationBuffer, int fromIndex) {
long size = availableAt(fromIndex);
if (size > 0) {
System.arraycopy(buffer, fromIndex, destinationBuffer, 0, (int) size);
}
if (fromIndex > 0) {
shiftIndicesLeft(fromIndex);
}
offset = 0;
if (refillableState.pinOffset > 0) {
refillableState.pinOffset = 0;
}
limit = size;
}
/**
* @return the number of bytes that can be written at the end of the buffer.
*/
private long freeSpaceAt(long index) {
return refillableState.capacity - index;
}
/**
* Reads a single byte without adding it to the buffer. Used when skipping an oversized value, in cases where
* the byte values are important (e.g. within the header of the oversized value, in order to determine
* the number of bytes to skip).
* @return the next byte, or -1 if the stream is at its end.
*/
private int readByteWithoutBuffering() {
int b = -1;
if (refillableState.lastUnbufferedByte > -1) {
b = refillableState.lastUnbufferedByte;
refillableState.lastUnbufferedByte = -1;
return b;
}
try {
b = refillableState.inputStream.read();
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
} catch (IOException e) {
throwAsIonException(e);
}
if (b >= 0) {
refillableState.individualBytesSkippedWithoutBuffering += 1;
}
return b;
}
/**
* Peek at the next byte from the stream, assuming it will be buffered unless the current value is being skipped.
* @return the byte, or -1 if the end of the stream has been reached.
*/
private int slowPeekByte() {
if (refillableState.isSkippingCurrentValue) {
return readByteWithoutBuffering();
}
return buffer[(int)(peekIndex++)] & SINGLE_BYTE_MASK;
}
/**
* Read the next byte from the stream, ensuring the byte is buffered.
* @return the byte, or -1 if the end of the stream has been reached.
*/
private int slowReadByte() {
if (refillableState.isSkippingCurrentValue) {
// If the value is being skipped, the byte will not have been buffered.
return readByteWithoutBuffering();
}
if (!fillAt(peekIndex, 1)) {
return -1;
}
return slowPeekByte();
}
/**
* Shift all active container end indices left by the given amount. This is used when bytes have been shifted
* to the start of the buffer in order to make room at the end.
* @param shiftAmount the amount to shift left.
*/
private void shiftContainerEnds(long shiftAmount) {
for (int i = containerIndex; i >= 0; i--) {
if (containerStack[i].endIndex > 0) {
containerStack[i].endIndex -= shiftAmount;
}
}
}
/**
* Shift all indices left by the given amount. This is used when data is moved in the underlying
* buffer either due to buffer growth or NOP padding being reclaimed to make room for a value that would otherwise
* exceed the buffer's maximum size.
* @param shiftAmount the amount to shift left.
*/
private void shiftIndicesLeft(int shiftAmount) {
peekIndex = Math.max(peekIndex - shiftAmount, 0);
valuePreHeaderIndex -= shiftAmount;
valueMarker.startIndex -= shiftAmount;
valueMarker.endIndex -= shiftAmount;
checkpoint -= shiftAmount;
if (fieldTextMarker.startIndex > -1) {
fieldTextMarker.startIndex -= shiftAmount;
fieldTextMarker.endIndex -= shiftAmount;
}
if (annotationSequenceMarker.startIndex > -1) {
annotationSequenceMarker.startIndex -= shiftAmount;
annotationSequenceMarker.endIndex -= shiftAmount;
}
// Note: even provisional annotation token markers must be shifted because a shift may occur between
// provisional creation and commit.
for (int i = 0; i < annotationTokenMarkers.provisionalSize(); i++) {
Marker marker = annotationTokenMarkers.provisionalGet(i);
if (marker.startIndex > -1) {
marker.startIndex -= shiftAmount;
marker.endIndex -= shiftAmount;
}
}
shiftContainerEnds(shiftAmount);
refillableState.pendingShift = shiftAmount;
refillableState.totalDiscardedBytes += shiftAmount;
}
/**
* Attempts to fill the buffer with up to the requested number of additional bytes. It is the caller's
* responsibility to ensure that there is space in the buffer.
* @param minimumNumberOfBytesRequired the minimum number of bytes requested to fill the current value.
* @return the shortfall between the number of bytes that were filled and the minimum number requested. If less than
* 1, then at least `minimumNumberOfBytesRequired` were filled.
*/
private long refill(long minimumNumberOfBytesRequired) {
int numberOfBytesFilled = -1;
long shortfall;
// Sometimes an InputStream implementation will return fewer than the number of bytes requested even
// if the stream is not at EOF. If this happens and there is still a shortfall, keep requesting bytes
// until either the shortfall is filled or EOF is reached.
do {
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) freeSpaceAt(limit));
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
numberOfBytesFilled = -1;
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled > 0) {
limit += numberOfBytesFilled;
}
shortfall = minimumNumberOfBytesRequired - availableAt(refillableState.pinOffset > -1 ? refillableState.pinOffset : offset);
} while (shortfall > 0 && numberOfBytesFilled >= 0);
return shortfall;
}
/**
* Seeks forward in the stream up to the requested number of bytes, from `offset`.
* @param numberOfBytes the number of bytes to seek from `offset`.
* @return true if not enough data was available in the stream; otherwise, false.
*/
private boolean slowSeek(long numberOfBytes) {
long size = availableAt(offset);
long unbufferedBytesToSkip = numberOfBytes - size;
if (unbufferedBytesToSkip <= 0) {
offset += numberOfBytes;
refillableState.bytesRequested = 0;
refillableState.state = State.READY;
return false;
}
offset = limit;
long shortfall;
long skipped = 0;
do {
try {
skipped = refillableState.inputStream.skip(unbufferedBytesToSkip);
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to skip than are currently available (e.g. if a header or trailer is incomplete).
skipped = 0;
} catch (IOException e) {
throwAsIonException(e);
}
refillableState.totalDiscardedBytes += skipped;
shiftContainerEnds(skipped);
shortfall = unbufferedBytesToSkip - skipped;
unbufferedBytesToSkip = shortfall;
} while (shortfall > 0 && skipped > 0);
if (shortfall <= 0) {
refillableState.bytesRequested = 0;
// The value has been entirely skipped, so its endIndex is now the buffer's limit.
valueMarker.endIndex = limit;
refillableState.state = State.READY;
return false;
}
refillableState.bytesRequested = shortfall;
refillableState.state = State.SEEK;
return true;
}
/* ---- End: internal buffer manipulation methods ---- */
/* ---- Begin: version-dependent parsing methods ---- */
/* ---- Ion 1.0 ---- */
/**
* Reads a 2+ byte VarUInt, given the first byte. NOTE: the VarUInt must fit in a `long`. This must only be
* called when it is known that the buffer already contains all the bytes in the VarUInt.
* @return the value.
*/
private long uncheckedReadVarUInt_1_0(byte currentByte) {
long result = currentByte & LOWER_SEVEN_BITS_BITMASK;
do {
if (peekIndex >= limit) {
throw new IonException("Malformed data: declared length exceeds the number of bytes remaining in the stream.");
}
currentByte = buffer[(int) (peekIndex++)];
result = (result << VALUE_BITS_PER_VARUINT_BYTE) | (currentByte & LOWER_SEVEN_BITS_BITMASK);
} while (currentByte >= 0);
if (result < 0) {
throw new IonException("Found a VarUInt that was too large to fit in a `long`");
}
return result;
}
/**
* Reads a VarUInt, ensuring enough data is available in the buffer. NOTE: the VarUInt must fit in a `long`.
* @return the value.
*/
private long slowReadVarUInt_1_0() {
int currentByte;
int numberOfBytesRead = 0;
long value = 0;
while (numberOfBytesRead < MAXIMUM_SUPPORTED_VAR_UINT_BYTES) {
currentByte = slowReadByte();
if (currentByte < 0) {
return -1;
}
numberOfBytesRead++;
value = (value << VALUE_BITS_PER_VARUINT_BYTE) | (currentByte & LOWER_SEVEN_BITS_BITMASK);
if ((currentByte & HIGHEST_BIT_BITMASK) != 0) {
return value;
}
}
throw new IonException("Found a VarUInt that was too large to fit in a `long`");
}
/**
* Reads the header of an annotation wrapper. This must only be called when it is known that the buffer already
* contains all the bytes in the header. Sets `valueMarker` with the start and end indices of the wrapped value.
* Sets `annotationSequenceMarker` with the start and end indices of the sequence of annotation SIDs. After
* successful return, `peekIndex` will point at the type ID byte of the wrapped value.
* @param valueTid the type ID of the annotation wrapper.
* @return true if the length of the wrapped value extends beyond the bytes currently buffered; otherwise, false.
*/
private boolean uncheckedReadAnnotationWrapperHeader_1_0(IonTypeID valueTid) {
long endIndex;
if (valueTid.variableLength) {
if (peekIndex >= limit) {
throw new IonException("Malformed data: declared length exceeds the number of bytes remaining in the stream.");
}
byte b = buffer[(int) peekIndex++];
if (b < 0) {
endIndex = (b & LOWER_SEVEN_BITS_BITMASK);
} else {
endIndex = uncheckedReadVarUInt_1_0(b);
}
if (endIndex == 0) {
throw new IonException("Annotation wrapper must wrap a value.");
}
} else {
endIndex = valueTid.length;
}
endIndex += peekIndex;
setMarker(endIndex, valueMarker);
if (endIndex > limit || endIndex < 0) {
throw new IonException("Malformed data: declared length exceeds the number of bytes remaining in the stream.");
}
byte b = buffer[(int) peekIndex++];
long annotationsLength;
if (b < 0) {
annotationsLength = (b & LOWER_SEVEN_BITS_BITMASK);
} else {
annotationsLength = uncheckedReadVarUInt_1_0(b);
}
annotationSequenceMarker.startIndex = peekIndex;
annotationSequenceMarker.endIndex = annotationSequenceMarker.startIndex + annotationsLength;
peekIndex = annotationSequenceMarker.endIndex;
if (peekIndex >= endIndex) {
throw new IonException("Annotation wrapper must wrap a value.");
}
if (peekIndex < 0) {
throw new IonException("Malformed data: declared length exceeds the number of bytes remaining in the stream.");
}
return false;
}
/**
* Reads the header of an annotation wrapper, ensuring enough data is available in the buffer. Sets `valueMarker`
* with the start and end indices of the wrapped value. Sets `annotationSequenceMarker` with the start and end
* indices of the sequence of annotation SIDs. After successful return, `peekIndex` will point at the type ID byte
* of the wrapped value.
* @param valueTid the type ID of the annotation wrapper.
* @return true if there are not enough bytes in the stream to complete the value; otherwise, false.
*/
private boolean slowReadAnnotationWrapperHeader_1_0(IonTypeID valueTid) {
long valueLength;
if (valueTid.variableLength) {
// At this point the value must be at least 4 more bytes: 1 for the smallest-possible wrapper length, 1
// for the smallest-possible annotations length, one for the smallest-possible annotation, and 1 for the
// smallest-possible value representation.
if (!fillAt(peekIndex, 4)) {
return true;
}
valueLength = slowReadVarUInt_1_0();
if (valueLength < 0) {
return true;
}