ConfluentSchemaRegistryClient.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.schema.registry.client;

import tools.jackson.databind.ObjectMapper;
import org.springframework.cloud.stream.schema.registry.SchemaNotFoundException;
import org.springframework.cloud.stream.schema.registry.SchemaReference;
import org.springframework.cloud.stream.schema.registry.SchemaRegistrationResponse;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author Vinicius Carvalho
 * @author Marius Bogoevici
 * @author Jon Archer
 * @author Tengzhou Dong
 */
public class ConfluentSchemaRegistryClient implements SchemaRegistryClient {

	private static final List<String> ACCEPT_HEADERS = Arrays.asList(
			"application/vnd.schemaregistry.v1+json",
			"application/vnd.schemaregistry+json", "application/json");

	private RestTemplate template;

	private String endpoint = "http://localhost:8081";

	private ObjectMapper mapper;

	public ConfluentSchemaRegistryClient() {
		this(new RestTemplate());
	}

	public ConfluentSchemaRegistryClient(RestTemplate template) {
		this(template, new ObjectMapper());
	}

	public ConfluentSchemaRegistryClient(RestTemplate template, ObjectMapper mapper) {
		this.template = template;
		this.mapper = mapper;
	}

	public void setEndpoint(String endpoint) {
		this.endpoint = endpoint;
	}

	@Override
	public SchemaRegistrationResponse register(String subject, String format, String schema) {
		Assert.isTrue("avro".equals(format), "Only Avro is supported");
		HttpHeaders headers = new HttpHeaders();
		headers.put("Accept", ACCEPT_HEADERS);
		headers.add("Content-Type", "application/json");
		Integer version = null;
		Integer id = null;
		String payload = null;
		Map<String, String> maps = new HashMap<>();
		maps.put("schema", schema);
		payload = this.mapper.writeValueAsString(maps);
		try {
			HttpEntity<String> request = new HttpEntity<>(payload, headers);
			ResponseEntity<Map> response = this.template.exchange(
					this.endpoint + "/subjects/" + subject + "/versions", HttpMethod.POST, request, Map.class);
			id = (Integer) response.getBody().get("id");
		}
		catch (HttpStatusCodeException httpException) {
			throw new RuntimeException(String.format("Failed to register subject %s, server replied with status %d",
					subject, httpException.getStatusCode().value()), httpException);
		}

		try {
			ResponseEntity<List> response = this.template.getForEntity(
					this.endpoint + "/schemas/ids/" + id + "/versions", List.class);

			final List body = response.getBody();
			if (!CollectionUtils.isEmpty(body)) {
				// Assume only a single version is registered for this ID
				version = (Integer) ((Map<String, Object>) body.get(0)).get("version");
			}
		}
		catch (HttpStatusCodeException httpException) {
			throw new RuntimeException(String.format("Failed to register subject %s, server replied with status %d",
					subject, httpException.getStatusCode().value()), httpException);
		}

		SchemaRegistrationResponse schemaRegistrationResponse = new SchemaRegistrationResponse();
		schemaRegistrationResponse.setId(id);
		schemaRegistrationResponse.setSchemaReference(new SchemaReference(subject, version, "avro"));
		return schemaRegistrationResponse;
	}

	@Override
	public String fetch(SchemaReference schemaReference) {
		String path = String.format("/subjects/%s/versions/%d", schemaReference.getSubject(), schemaReference.getVersion());
		HttpHeaders headers = new HttpHeaders();
		headers.put("Accept", ACCEPT_HEADERS);
		headers.add("Content-Type", "application/vnd.schemaregistry.v1+json");
		HttpEntity<String> request = new HttpEntity<>("", headers);
		try {
			ResponseEntity<Map> response = this.template.exchange(this.endpoint + path,
					HttpMethod.GET, request, Map.class);
			return (String) response.getBody().get("schema");
		}
		catch (HttpStatusCodeException e) {
			if (e.getStatusCode() == HttpStatus.NOT_FOUND) {
				throw new SchemaNotFoundException(String.format("Could not find schema for reference: %s", schemaReference));
			}
			else {
				throw e;
			}
		}
	}

	@Override
	public String fetch(int id) {
		String path = String.format("/schemas/ids/%d", id);
		HttpHeaders headers = new HttpHeaders();
		headers.put("Accept", ACCEPT_HEADERS);
		headers.add("Content-Type", "application/vnd.schemaregistry.v1+json");
		HttpEntity<String> request = new HttpEntity<>("", headers);
		try {
			ResponseEntity<Map> response = this.template.exchange(this.endpoint + path,
					HttpMethod.GET, request, Map.class);
			return (String) response.getBody().get("schema");
		}
		catch (HttpStatusCodeException e) {
			if (e.getStatusCode() == HttpStatus.NOT_FOUND) {
				throw new SchemaNotFoundException(String.format("Could not find schema with id: %s", id));
			}
			else {
				throw e;
			}
		}
	}
}