Hi i'm trying to write a RESTful service with Kafka. I'm writing my code to make it store the messages received from kafka by the consumer into an open search index every time i make a GET or a POST request, but i'm constantly receiving this error:
Result: {"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}
This is my consumer:
public class Main {
static Client client = new Client();
public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException {
String servers = "localhost:9092";
String groupId = "id";
String topic = "mytopic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
Logger logger = LoggerFactory.getLogger(Main.class.getName());
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> record : records) {"Key:" + record.key() + ", Value:" + record.value());"Partition:" + record.partition() + ", Offset:" + record.offset());
String result ="https://localhost:9200/index3/_create/" + record.offset(), "admin", "admin", record);
System.out.println("Result: " + result);
Is there a way to avoid this??
My message is "Request":"valid", i tried to write it like a key-value json but i'm wrong
EDIT: i tried to set the Content type to "application/json" but it throws the same error!
This my client:
public String run(String url, String username, String password, ConsumerRecord<String, String> record) throws IOException, NoSuchAlgorithmException, KeyManagementException {
//TrustManager per accettare tutti i certificati
TrustManager[] trustAllCerts = new TrustManager[]{new Client()};
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new SecureRandom());
OkHttpClient client = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0])
.hostnameVerifier((hostname, session) -> true)
String credentials = username + ":" + password;
String encode = Base64.getEncoder().encodeToString(credentials.getBytes());
String authHeader = "Basic " + encode;
RequestBody requestBody = new FormBody.Builder()
.add("Message:", "n°" + record.offset())
Request request = new Request.Builder()
.header("Authorization", authHeader)
.header("Content-Type", "application/json")
.post(requestBody) //Body della post
try (Response response = client.newCall(request).execute()) {
return response.body().string();
Upvotes: 0
Views: 294
I solved making my request body a JsonObject using GSon dependencies:
And that's the code:
JsonObject json = new JsonObject();
json.addProperty("Message n°", record.offset());
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), json.toString());
Request request = new Request.Builder()
.header("Authorization", authHeader)
Upvotes: 0