Skip to content

Commit e77f059

Browse files
sabrennerjjxctYun-Kim
authored
feat(openai): support streamed responses for completions and chat completions (#4308)
Co-authored-by: Jonathan Chavez <153635462+jjxct@users.noreply.github.com> Co-authored-by: Yun Kim <35776586+Yun-Kim@users.noreply.github.com>
1 parent 95b5a41 commit e77f059

File tree

7 files changed

+530
-40
lines changed

7 files changed

+530
-40
lines changed

packages/datadog-instrumentations/src/openai.js

+148-10
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ const V4_PACKAGE_SHIMS = [
1515
file: 'resources/chat/completions.js',
1616
targetClass: 'Completions',
1717
baseResource: 'chat.completions',
18-
methods: ['create']
18+
methods: ['create'],
19+
streamedResponse: true
1920
},
2021
{
2122
file: 'resources/completions.js',
2223
targetClass: 'Completions',
2324
baseResource: 'completions',
24-
methods: ['create']
25+
methods: ['create'],
26+
streamedResponse: true
2527
},
2628
{
2729
file: 'resources/embeddings.js',
@@ -141,9 +143,130 @@ addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0 <4'] }, expor
141143
return exports
142144
})
143145

146+
function addStreamedChunk (content, chunk) {
147+
return content.choices.map((oldChoice, choiceIdx) => {
148+
const newChoice = oldChoice
149+
const chunkChoice = chunk.choices[choiceIdx]
150+
if (!oldChoice.finish_reason) {
151+
newChoice.finish_reason = chunkChoice.finish_reason
152+
}
153+
154+
// delta exists on chat completions
155+
const delta = chunkChoice.delta
156+
157+
if (delta) {
158+
const content = delta.content
159+
if (content) {
160+
if (newChoice.delta.content) { // we don't want to append to undefined
161+
newChoice.delta.content += content
162+
} else {
163+
newChoice.delta.content = content
164+
}
165+
}
166+
} else {
167+
const text = chunkChoice.text
168+
if (text) {
169+
if (newChoice.text) {
170+
newChoice.text += text
171+
} else {
172+
newChoice.text = text
173+
}
174+
}
175+
}
176+
177+
// tools only exist on chat completions
178+
const tools = delta && chunkChoice.delta.tool_calls
179+
180+
if (tools) {
181+
newChoice.delta.tool_calls = tools.map((newTool, toolIdx) => {
182+
const oldTool = oldChoice.delta.tool_calls[toolIdx]
183+
184+
if (oldTool) {
185+
oldTool.function.arguments += newTool.function.arguments
186+
}
187+
188+
return oldTool
189+
})
190+
}
191+
192+
return newChoice
193+
})
194+
}
195+
196+
function buffersToJSON (chunks = []) {
197+
return Buffer
198+
.concat(chunks) // combine the buffers
199+
.toString() // stringify
200+
.split(/(?=data:)/) // split on "data:"
201+
.map(chunk => chunk.split('\n').join('')) // remove newlines
202+
.map(chunk => chunk.substring(6)) // remove 'data: ' from the front
203+
.slice(0, -1) // remove the last [DONE] message
204+
.map(JSON.parse) // parse all of the returned objects
205+
}
206+
207+
/**
208+
* For streamed responses, we need to accumulate all of the content in
209+
* the chunks, and let the combined content be the final response.
210+
* This way, spans look the same as when not streamed.
211+
*/
212+
function wrapStreamIterator (response, options) {
213+
let processChunksAsBuffers = false
214+
const chunks = []
215+
return function (itr) {
216+
return function () {
217+
const iterator = itr.apply(this, arguments)
218+
shimmer.wrap(iterator, 'next', next => function () {
219+
return next.apply(this, arguments)
220+
.then(res => {
221+
const { done, value: chunk } = res
222+
223+
if (chunk) {
224+
chunks.push(chunk)
225+
if (chunk instanceof Buffer) {
226+
// this operation should be safe
227+
// if one chunk is a buffer (versus a plain object), the rest should be as well
228+
processChunksAsBuffers = true
229+
}
230+
}
231+
232+
if (done) {
233+
let content = chunks.filter(chunk => chunk != null) // filter null or undefined values
234+
235+
if (chunks) {
236+
if (processChunksAsBuffers) {
237+
content = buffersToJSON(content)
238+
}
239+
240+
content = content.reduce((content, chunk) => {
241+
content.choices = addStreamedChunk(content, chunk)
242+
return content
243+
})
244+
}
245+
246+
finishCh.publish({
247+
headers: response.headers,
248+
body: content,
249+
path: response.url,
250+
method: options.method
251+
})
252+
}
253+
254+
return res
255+
})
256+
.catch(err => {
257+
errorCh.publish({ err })
258+
259+
throw err
260+
})
261+
})
262+
return iterator
263+
}
264+
}
265+
}
266+
144267
for (const shim of V4_PACKAGE_SHIMS) {
145-
const { file, targetClass, baseResource, methods } = shim
146-
addHook({ name: 'openai', file, versions: shim.versions || ['>=4'] }, exports => {
268+
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
269+
addHook({ name: 'openai', file, versions: versions || ['>=4'] }, exports => {
147270
const targetPrototype = exports[targetClass].prototype
148271

149272
for (const methodName of methods) {
@@ -152,6 +275,11 @@ for (const shim of V4_PACKAGE_SHIMS) {
152275
return methodFn.apply(this, arguments)
153276
}
154277

278+
// The OpenAI library lets you set `stream: true` on the options arg to any method
279+
// However, we only want to handle streamed responses in specific cases
280+
// chat.completions and completions
281+
const stream = streamedResponse && arguments[arguments.length - 1]?.stream
282+
155283
const client = this._client || this.client
156284

157285
startCh.publish({
@@ -170,12 +298,22 @@ for (const shim of V4_PACKAGE_SHIMS) {
170298
// the original response is wrapped in a promise, so we need to unwrap it
171299
.then(body => Promise.all([this.responsePromise, body]))
172300
.then(([{ response, options }, body]) => {
173-
finishCh.publish({
174-
headers: response.headers,
175-
body,
176-
path: response.url,
177-
method: options.method
178-
})
301+
if (stream) {
302+
if (body.iterator) {
303+
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options))
304+
} else {
305+
shimmer.wrap(
306+
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options)
307+
)
308+
}
309+
} else {
310+
finishCh.publish({
311+
headers: response.headers,
312+
body,
313+
path: response.url,
314+
method: options.method
315+
})
316+
}
179317

180318
return body
181319
})

packages/datadog-plugin-openai/src/index.js

+12-5
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ class OpenApiPlugin extends TracingPlugin {
112112
}
113113
}
114114

115+
if (payload.stream) {
116+
tags['openai.request.stream'] = payload.stream
117+
}
118+
115119
switch (methodName) {
116120
case 'createFineTune':
117121
case 'fine_tuning.jobs.create':
@@ -298,7 +302,8 @@ function retrieveModelRequestExtraction (tags, payload) {
298302
}
299303

300304
function createChatCompletionRequestExtraction (tags, payload, store) {
301-
if (!defensiveArrayLength(payload.messages)) return
305+
const messages = payload.messages
306+
if (!defensiveArrayLength(messages)) return
302307

303308
store.messages = payload.messages
304309
for (let i = 0; i < payload.messages.length; i++) {
@@ -600,18 +605,20 @@ function commonCreateResponseExtraction (tags, body, store) {
600605
tags[`openai.response.choices.${choiceIdx}.text`] = truncateText(choice.text)
601606

602607
// createChatCompletion only
603-
if (choice.message) {
604-
const message = choice.message
608+
const message = choice.message || choice.delta // delta for streamed responses
609+
if (message) {
605610
tags[`openai.response.choices.${choiceIdx}.message.role`] = message.role
606611
tags[`openai.response.choices.${choiceIdx}.message.content`] = truncateText(message.content)
607612
tags[`openai.response.choices.${choiceIdx}.message.name`] = truncateText(message.name)
608613
if (message.tool_calls) {
609614
const toolCalls = message.tool_calls
610615
for (let toolIdx = 0; toolIdx < toolCalls.length; toolIdx++) {
611-
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.name`] =
616+
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.function.name`] =
612617
toolCalls[toolIdx].function.name
613-
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.arguments`] =
618+
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.function.arguments`] =
614619
toolCalls[toolIdx].function.arguments
620+
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.id`] =
621+
toolCalls[toolIdx].id
615622
}
616623
}
617624
}

0 commit comments

Comments
 (0)