11
11
12
12
'use strict' ;
13
13
14
- const debounce = require ( 'lodash/debounce' ) ;
15
14
const imurmurhash = require ( 'imurmurhash' ) ;
15
+ const invariant = require ( 'invariant' ) ;
16
16
const jsonStableStringify = require ( 'json-stable-stringify' ) ;
17
17
const path = require ( 'path' ) ;
18
18
const request = require ( 'request' ) ;
@@ -21,9 +21,6 @@ const toFixedHex = require('./toFixedHex');
21
21
import type { Options as TransformOptions } from '../JSTransformer/worker/worker' ;
22
22
import type { CachedResult } from './TransformCache' ;
23
23
24
- const SINGLE_REQUEST_MAX_KEYS = 100 ;
25
- const AGGREGATION_DELAY_MS = 100 ;
26
-
27
24
type FetchResultURIs = (
28
25
keys : Array < string > ,
29
26
callback : ( error ? : Error , results ? : Map < string , string > ) => void ,
@@ -39,55 +36,124 @@ type FetchProps = {
39
36
type FetchCallback = ( error ? : Error , resultURI ?: ?CachedResult ) => mixed ;
40
37
type FetchURICallback = ( error ? : Error , resultURI ?: ?string ) => mixed ;
41
38
39
+ type ProcessBatch < TItem , TResult > = (
40
+ batch : Array < TItem > ,
41
+ callback : ( error ? : Error , orderedResults ?: Array < TResult > ) => mixed ,
42
+ ) => mixed ;
43
+ type BatchProcessorOptions = {
44
+ maximumDelayMs : number ,
45
+ maximumItems : number ,
46
+ concurrency : number ,
47
+ } ;
48
+
49
+ /**
50
+ * We batch keys together trying to make a smaller amount of queries. For that
51
+ * we wait a small moment before starting to fetch. We limit also the number of
52
+ * keys we try to fetch at once, so if we already have that many keys pending,
53
+ * we can start fetching right away.
54
+ */
55
+ class BatchProcessor < TItem , TResult > {
56
+
57
+ _options : BatchProcessorOptions ;
58
+ _processBatch : ProcessBatch < TItem , TResult > ;
59
+ _queue : Array < {
60
+ item : TItem ,
61
+ callback : ( error ?: Error , result ?: TResult ) => mixed ,
62
+ } > ;
63
+ _timeoutHandle : ?number ;
64
+ _currentProcessCount : number ;
65
+
66
+ constructor (
67
+ options : BatchProcessorOptions ,
68
+ processBatch : ProcessBatch < TItem , TResult > ,
69
+ ) {
70
+ this . _options = options ;
71
+ this . _processBatch = processBatch ;
72
+ this . _queue = [ ] ;
73
+ this . _timeoutHandle = null ;
74
+ this . _currentProcessCount = 0 ;
75
+ ( this : any ) . _processQueue = this . _processQueue . bind ( this ) ;
76
+ }
77
+
78
+ _processQueue ( ) {
79
+ this . _timeoutHandle = null ;
80
+ while (
81
+ this . _queue . length > 0 &&
82
+ this . _currentProcessCount < this . _options . concurrency
83
+ ) {
84
+ this . _currentProcessCount ++ ;
85
+ const jobs = this . _queue . splice ( 0 , this . _options . maximumItems ) ;
86
+ const items = jobs . map ( job => job . item ) ;
87
+ this . _processBatch ( items , ( error , results ) => {
88
+ invariant (
89
+ results == null || results . length === items . length ,
90
+ 'Not enough results returned.' ,
91
+ ) ;
92
+ for ( let i = 0 ; i < items . length ; ++ i ) {
93
+ jobs [ i ] . callback ( error , results && results [ i ] ) ;
94
+ }
95
+ this . _currentProcessCount -- ;
96
+ this . _processQueueOnceReady ( ) ;
97
+ } ) ;
98
+ }
99
+ }
100
+
101
+ _processQueueOnceReady ( ) {
102
+ if ( this . _queue . length >= this . _options . maximumItems ) {
103
+ clearTimeout ( this . _timeoutHandle ) ;
104
+ process . nextTick ( this . _processQueue ) ;
105
+ return ;
106
+ }
107
+ if ( this . _timeoutHandle == null ) {
108
+ this . _timeoutHandle = setTimeout (
109
+ this . _processQueue ,
110
+ this . _options . maximumDelayMs ,
111
+ ) ;
112
+ }
113
+ }
114
+
115
+ queue (
116
+ item : TItem ,
117
+ callback : ( error ? : Error , result ? : TResult ) = > mixed ,
118
+ ) {
119
+ this . _queue . push ( { item, callback} ) ;
120
+ this . _processQueueOnceReady ( ) ;
121
+ }
122
+
123
+ }
124
+
125
+ type URI = string ;
126
+
42
127
/**
43
128
* We aggregate the requests to do a single request for many keys. It also
44
129
* ensures we do a single request at a time to avoid pressuring the I/O.
45
130
*/
46
131
class KeyURIFetcher {
47
132
48
133
_fetchResultURIs : FetchResultURIs ;
49
- _pendingQueries : Array < { key : string , callback : FetchURICallback } > ;
50
- _isProcessing : boolean ;
51
- _processQueriesDebounced : ( ) => void ;
52
- _processQueries : ( ) => void ;
134
+ _batchProcessor : BatchProcessor < string , ?URI > ;
53
135
54
- /**
55
- * Fetch the pending keys right now, if any and if we're not already doing
56
- * so in parallel. At the end of the fetch, we trigger a new batch fetching
57
- * recursively.
58
- */
59
- _processQueries ( ) {
60
- const { _pendingQueries} = this ;
61
- if ( _pendingQueries . length === 0 || this . _isProcessing ) {
62
- return ;
63
- }
64
- this . _isProcessing = true ;
65
- const queries = _pendingQueries . splice ( 0 , SINGLE_REQUEST_MAX_KEYS ) ;
66
- const keys = queries . map ( query => query . key ) ;
67
- this . _fetchResultURIs ( keys , ( error , results ) => {
68
- queries . forEach ( query => {
69
- query . callback ( error , results && results . get ( query . key ) ) ;
70
- } ) ;
71
- this . _isProcessing = false ;
72
- process . nextTick ( this . _processQueries ) ;
136
+ _processKeys (
137
+ keys : Array < string > ,
138
+ callback : ( error ?: Error , keyURIs : Array < ?URI > ) => mixed ,
139
+ ) {
140
+ this. _fetchResultURIs ( keys , ( error , URIsByKey ) => {
141
+ const URIs = keys . map ( key => URIsByKey && URIsByKey . get ( key ) ) ;
142
+ callback ( error , URIs ) ;
73
143
} ) ;
74
144
}
75
145
76
- /**
77
- * Enqueue the fetching of a particular key.
78
- */
79
146
fetch ( key : string , callback : FetchURICallback ) {
80
- this . _pendingQueries . push ( { key, callback} ) ;
81
- this . _processQueriesDebounced ( ) ;
147
+ this . _batchProcessor . queue ( key , callback ) ;
82
148
}
83
149
84
150
constructor ( fetchResultURIs : FetchResultURIs ) {
85
151
this . _fetchResultURIs = fetchResultURIs ;
86
- this . _pendingQueries = [ ] ;
87
- this . _isProcessing = false ;
88
- this . _processQueries = this . _processQueries . bind ( this ) ;
89
- this . _processQueriesDebounced =
90
- debounce ( this . _processQueries , AGGREGATION_DELAY_MS ) ;
152
+ this . _batchProcessor = new BatchProcessor ( {
153
+ maximumDelayMs : 10 ,
154
+ maximumItems : 500 ,
155
+ concurrency : 25 ,
156
+ } , this . _processKeys . bind ( this ) ) ;
91
157
}
92
158
93
159
}
0 commit comments