parralell storage/remote

Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
pipiland2612 2025-08-06 20:11:03 +03:00 committed by Ayoub Mrini
parent de93387f0b
commit fe1bb53372
1 changed files with 11 additions and 0 deletions

View File

@ -68,6 +68,7 @@ func newHighestTimestampMetric() *maxTimestamp {
}
func TestBasicContentNegotiation(t *testing.T) {
t.Parallel()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
@ -199,6 +200,7 @@ func TestBasicContentNegotiation(t *testing.T) {
}
func TestSampleDelivery(t *testing.T) {
t.Parallel()
// Let's create an even number of send batches, so we don't run into the
// batch timeout case.
n := 3
@ -405,6 +407,7 @@ func TestWALMetadataDelivery(t *testing.T) {
}
func TestSampleDeliveryTimeout(t *testing.T) {
t.Parallel()
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
@ -433,6 +436,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}
func TestSampleDeliveryOrder(t *testing.T) {
t.Parallel()
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
ts := 10
@ -466,6 +470,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
}
func TestShutdown(t *testing.T) {
t.Parallel()
deadline := 1 * time.Second
c := NewTestBlockedWriteClient()
@ -521,6 +526,7 @@ func TestSeriesReset(t *testing.T) {
}
func TestReshard(t *testing.T) {
t.Parallel()
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
size := 10 // Make bigger to find more races.
@ -559,6 +565,7 @@ func TestReshard(t *testing.T) {
}
func TestReshardRaceWithStop(t *testing.T) {
t.Parallel()
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
c := NewTestWriteClient(protoMsg)
@ -597,6 +604,7 @@ func TestReshardRaceWithStop(t *testing.T) {
}
func TestReshardPartialBatch(t *testing.T) {
t.Parallel()
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
samples, series := createTimeseries(1, 10)
@ -751,6 +759,7 @@ func TestShouldReshard(t *testing.T) {
// TestDisableReshardOnRetry asserts that resharding should be disabled when a
// recoverable error is returned from remote_write.
func TestDisableReshardOnRetry(t *testing.T) {
t.Parallel()
onStoredContext, onStoreCalled := context.WithCancel(context.Background())
defer onStoreCalled()
@ -1999,6 +2008,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
}
func TestDropOldTimeSeries(t *testing.T) {
t.Parallel()
// Test both v1 and v2 remote write protocols
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
@ -2034,6 +2044,7 @@ func TestIsSampleOld(t *testing.T) {
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
t.Parallel()
maxSamplesPerSend := 10
sampleAgeLimit := time.Second