Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix amqplib flaky dsm tests #5445

Merged
merged 2 commits into from
Mar 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 67 additions & 53 deletions packages/datadog-plugin-amqplib/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

const agent = require('../../dd-trace/test/plugins/agent')
const { ERROR_MESSAGE, ERROR_STACK, ERROR_TYPE } = require('../../dd-trace/src/constants')
const id = require('../../dd-trace/src/id')
const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')

const { expectedSchema, rawExpectedSchema } = require('./naming')

describe('Plugin', () => {
let tracer
let connection
let channel
let queue

describe('amqplib', () => {
withVersions('amqplib', 'amqplib', version => {
beforeEach(() => {
process.env.DD_DATA_STREAMS_ENABLED = 'true'
tracer = require('../../dd-trace')
queue = `test-${id()}`
})

afterEach(() => {
Expand All @@ -40,26 +45,22 @@ describe('Plugin', () => {
})
})

afterEach(() => {
return agent.close({ ritmReset: false })
})

describe('without plugin', () => {
it('should run commands normally', done => {
channel.assertQueue('test', {}, () => { done() })
channel.assertQueue(queue, {}, () => { done() })
})
})

describe('when using a callback', () => {
before(() => {
return agent.load('amqplib')
})

after(() => {
return agent.close({ ritmReset: false })
})

describe('when sending commands', () => {
withPeerService(
() => tracer,
'amqplib',
() => channel.assertQueue('test', {}, () => {}),
() => channel.assertQueue(queue, {}, () => {}),
'localhost',
'out.host'
)
Expand All @@ -70,7 +71,7 @@ describe('Plugin', () => {
const span = traces[0][0]
expect(span).to.have.property('name', expectedSchema.controlPlane.opName)
expect(span).to.have.property('service', expectedSchema.controlPlane.serviceName)
expect(span).to.have.property('resource', 'queue.declare test')
expect(span).to.have.property('resource', `queue.declare ${queue}`)
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('span.kind', 'client')
expect(span.meta).to.have.property('out.host', 'localhost')
Expand All @@ -80,7 +81,7 @@ describe('Plugin', () => {
.then(done)
.catch(done)

channel.assertQueue('test', {}, () => {})
channel.assertQueue(queue, {}, () => {})
})

it('should do automatic instrumentation for queued commands', done => {
Expand All @@ -90,7 +91,7 @@ describe('Plugin', () => {

expect(span).to.have.property('name', expectedSchema.controlPlane.opName)
expect(span).to.have.property('service', expectedSchema.controlPlane.serviceName)
expect(span).to.have.property('resource', 'queue.delete test')
expect(span).to.have.property('resource', `queue.delete ${queue}`)
expect(span).to.not.have.property('type')
expect(span.meta).to.have.property('span.kind', 'client')
expect(span.meta).to.have.property('out.host', 'localhost')
Expand All @@ -100,8 +101,8 @@ describe('Plugin', () => {
.then(done)
.catch(done)

channel.assertQueue('test', {}, () => {})
channel.deleteQueue('test', () => {})
channel.assertQueue(queue, {}, () => {})
channel.deleteQueue(queue, () => {})
})

it('should handle errors', done => {
Expand All @@ -128,7 +129,7 @@ describe('Plugin', () => {
})

withNamingSchema(
() => channel.assertQueue('test', {}, () => {}),
() => channel.assertQueue(queue, {}, () => {}),
rawExpectedSchema.controlPlane
)
})
Expand All @@ -137,7 +138,7 @@ describe('Plugin', () => {
withPeerService(
() => tracer,
'amqplib',
() => channel.assertQueue('test', {}, () => {}),
() => channel.assertQueue(queue, {}, () => {}),
'localhost',
'out.host'
)
Expand Down Expand Up @@ -181,7 +182,7 @@ describe('Plugin', () => {
.catch(done)

try {
channel.sendToQueue('test', 'invalid')
channel.sendToQueue(queue, 'invalid')
} catch (e) {
error = e
}
Expand Down Expand Up @@ -293,7 +294,7 @@ describe('Plugin', () => {
})

it('should run the callback in the parent context', done => {
channel.assertQueue('test', {})
channel.assertQueue(queue, {})
.then(() => {
expect(tracer.scope().active()).to.be.null
done()
Expand All @@ -305,22 +306,32 @@ describe('Plugin', () => {
describe('when data streams monitoring is enabled', function () {
this.timeout(10000)

const expectedProducerHashWithTopic = '16804605750389532869'
const expectedProducerHashWithExchange = '2722596631431228032'

const expectedConsumerHash = '17529824252700998941'

before(() => {
tracer = require('../../dd-trace')
tracer.use('amqplib')
})
let expectedProducerHashWithTopic
let expectedProducerHashWithExchange
let expectedConsumerHash

before(async () => {
return agent.load('amqplib')
})

after(() => {
return agent.close({ ritmReset: false })
beforeEach(() => {
const producerHashWithTopic = computePathwayHash('test', 'tester', [
'direction:out',
'has_routing_key:true',
`topic:${queue}`,
'type:rabbitmq'
], ENTRY_PARENT_HASH)

expectedProducerHashWithTopic = producerHashWithTopic.readBigUInt64BE(0).toString()

expectedProducerHashWithExchange = computePathwayHash('test', 'tester', [
'direction:out',
'exchange:namedExchange',
'has_routing_key:true',
'type:rabbitmq'
], ENTRY_PARENT_HASH).readBigUInt64BE(0).toString()

expectedConsumerHash = computePathwayHash('test', 'tester', [
'direction:in',
`topic:${queue}`,
'type:rabbitmq'
], producerHashWithTopic).readBigUInt64BE(0).toString()
})

it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => {
Expand All @@ -338,13 +349,13 @@ describe('Plugin', () => {
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'has_routing_key:true',
'topic:testDSM',
`topic:${queue}`,
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
Expand Down Expand Up @@ -390,15 +401,16 @@ describe('Plugin', () => {
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
expect(statsPointsReceived.length).to.equal(2)
expect(statsPointsReceived[1].EdgeTags).to.deep.equal(
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
if (err) done(err)
})
Expand All @@ -416,17 +428,17 @@ describe('Plugin', () => {
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived.length).to.equal(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'has_routing_key:true',
'topic:testDSM',
`topic:${queue}`,
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
Expand All @@ -444,23 +456,24 @@ describe('Plugin', () => {
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
expect(statsPointsReceived.length).to.equal(2)
expect(statsPointsReceived[1].EdgeTags).to.deep.equal(
['direction:in', `topic:${queue}`, 'type:rabbitmq'])
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
channel.get(ok.queue, {}, (err, ok) => {
if (err) done(err)
})
})
})

it('Should set pathway hash tag on a span when producing', (done) => {
channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('dsm test'))
Expand All @@ -481,9 +494,10 @@ describe('Plugin', () => {
})

it('Should set pathway hash tag on a span when consuming', (done) => {
channel.assertQueue('testDSM', {}, (err, ok) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('dsm test'))
channel.consume(ok.queue, () => {}, {}, (err, ok) => {
if (err) return done(err)

Expand All @@ -506,7 +520,7 @@ describe('Plugin', () => {
})

describe('with configuration', () => {
after(() => {
afterEach(() => {
return agent.close({ ritmReset: false })
})

Expand All @@ -531,16 +545,16 @@ describe('Plugin', () => {
agent
.use(traces => {
expect(traces[0][0]).to.have.property('service', 'test-custom-service')
expect(traces[0][0]).to.have.property('resource', 'queue.declare test')
expect(traces[0][0]).to.have.property('resource', `queue.declare ${queue}`)
}, 2)
.then(done)
.catch(done)

channel.assertQueue('test', {}, () => {})
channel.assertQueue(queue, {}, () => {})
})

withNamingSchema(
() => channel.assertQueue('test', {}, () => {}),
() => channel.assertQueue(queue, {}, () => {}),
{
v0: {
opName: 'amqp.command',
Expand Down
Loading