ServerController.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.registry.controllers;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.cloud.stream.schema.registry.config.SchemaServerProperties;
import org.springframework.cloud.stream.schema.registry.model.Schema;
import org.springframework.cloud.stream.schema.registry.repository.SchemaRepository;
import org.springframework.cloud.stream.schema.registry.support.InvalidSchemaException;
import org.springframework.cloud.stream.schema.registry.support.SchemaDeletionNotAllowedException;
import org.springframework.cloud.stream.schema.registry.support.SchemaNotFoundException;
import org.springframework.cloud.stream.schema.registry.support.SchemaValidator;
import org.springframework.cloud.stream.schema.registry.support.UnsupportedFormatException;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.util.UriComponentsBuilder;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
/**
* @author Vinicius Carvalho
* @author Ilayaperumal Gopinathan
* @author Jeff Maxwell
* @author Christian Tzolov
* @author Omer Celik
*/
@RestController
@RequestMapping(path = "${spring.cloud.stream.schema.server.path:}")
public class ServerController {
private final SchemaRepository repository;
private final Map<String, SchemaValidator> validators;
private final SchemaServerProperties schemaServerProperties;
private static final ReentrantLock lock = new ReentrantLock();
public ServerController(SchemaRepository repository, Map<String, SchemaValidator> validators,
SchemaServerProperties schemaServerProperties) {
Assert.notNull(repository, "cannot be null");
Assert.notEmpty(validators, "cannot be empty");
this.repository = repository;
this.validators = validators;
this.schemaServerProperties = schemaServerProperties;
}
@RequestMapping(method = RequestMethod.POST, path = "/", consumes = "application/json", produces = "application/json")
public ResponseEntity<Schema> register(@RequestBody Schema schema, UriComponentsBuilder builder) {
try {
lock.lock();
SchemaValidator validator = this.validators.get(schema.getFormat());
if (validator == null) {
throw new UnsupportedFormatException(String.format("Invalid format, supported types are: %s",
StringUtils.collectionToCommaDelimitedString(this.validators.keySet())));
}
validator.validate(schema.getDefinition());
Schema result;
List<Schema> registeredEntities =
this.repository.findBySubjectAndFormatOrderByVersion(schema.getSubject(), schema.getFormat());
if (registeredEntities.isEmpty()) {
schema.setVersion(1);
result = this.repository.save(schema);
}
else {
result = validator.match(registeredEntities, schema.getDefinition());
if (result == null) {
schema.setVersion(registeredEntities.get(registeredEntities.size() - 1).getVersion() + 1);
result = this.repository.save(schema);
}
}
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.LOCATION, builder.path("/{subject}/{format}/v{version}")
.buildAndExpand(result.getSubject(), result.getFormat(), result.getVersion())
.toString());
ResponseEntity<Schema> response = new ResponseEntity<>(result, headers, HttpStatus.CREATED);
return response;
}
finally {
lock.unlock();
}
}
@RequestMapping(method = RequestMethod.GET, produces = "application/json", path = "/{subject}/{format}/v{version}")
public ResponseEntity<Schema> findOne(@PathVariable("subject") String subject,
@PathVariable("format") String format,
@PathVariable("version") Integer version) {
Schema schema = this.repository.findOneBySubjectAndFormatAndVersion(subject, format, version);
if (schema == null) {
throw new SchemaNotFoundException(
String.format("Could not find Schema by subject: %s, format: %s, version %s",
subject, format, version));
}
return new ResponseEntity<>(schema, HttpStatus.OK);
}
@RequestMapping(method = RequestMethod.GET, produces = "application/json", path = "/schemas/{id}")
public ResponseEntity<Schema> findOne(@PathVariable("id") Integer id) {
Optional<Schema> schema = this.repository.findById(id);
if (!schema.isPresent()) {
throw new SchemaNotFoundException(String.format("Could not find Schema by id: %s", id));
}
return new ResponseEntity<>(schema.get(), HttpStatus.OK);
}
/**
* Find by {@link Schema#getSubject() subject} and {@link Schema#getFormat() format}.
*
* @param subject the {@link Schema#getSubject() subject}, must not be
* {@literal null}.
* @param format the {@link Schema#getFormat() format}, must not be {@literal null}.
* @return An {@link HttpStatus#OK} response populated with the list of {@link Schema
* Schemas}, in ascending order by {@link Schema#getVersion() version}, that matched
* the supplied {@link Schema#getSubject() subject} and {@link Schema#getFormat()
* format}.
*
* @since 3.0.0
*/
@GetMapping(produces = APPLICATION_JSON_VALUE, path = "/{subject}/{format}")
@NonNull
public ResponseEntity<List<Schema>> findBySubjectAndFormat(@NonNull @PathVariable("subject") final String subject,
@NonNull @PathVariable("format") final String format) {
return findBySubjectAndFormatOrderByVersionAsc(subject, format);
}
@RequestMapping(value = "/{subject}/{format}/v{version}", method = RequestMethod.DELETE)
public void delete(@PathVariable("subject") String subject,
@PathVariable("format") String format,
@PathVariable("version") Integer version) {
if (this.schemaServerProperties.isAllowSchemaDeletion()) {
Schema schema = this.repository.findOneBySubjectAndFormatAndVersion(subject, format, version);
if (schema == null) {
throw new SchemaNotFoundException(
String.format("Could not find Schema by subject: %s, format: %s, version %s",
subject, format, version));
}
deleteSchema(schema);
}
else {
throw new SchemaDeletionNotAllowedException(String.format("Not permitted deletion of Schema by " +
"subject: %s, format: %s, version %s", subject, format, version));
}
}
@RequestMapping(value = "/schemas/{id}", method = RequestMethod.DELETE)
public void delete(@PathVariable("id") Integer id) {
if (this.schemaServerProperties.isAllowSchemaDeletion()) {
Optional<Schema> schema = this.repository.findById(id);
if (!schema.isPresent()) {
throw new SchemaNotFoundException(String.format("Could not find Schema by id: %s", id));
}
deleteSchema(schema.get());
}
else {
throw new SchemaDeletionNotAllowedException(String.format("Not permitted deletion of Schema by id: %s", id));
}
}
@RequestMapping(value = "/{subject}", method = RequestMethod.DELETE)
public void delete(@PathVariable("subject") String subject) {
if (this.schemaServerProperties.isAllowSchemaDeletion()) {
for (Schema schema : this.repository.findAll()) {
if (schema.getSubject().equals(subject)) {
deleteSchema(schema);
}
}
}
else {
throw new SchemaDeletionNotAllowedException(String.format("Not permitted deletion of Schema by " +
"subject: %s", subject));
}
}
@NonNull
public final ResponseEntity<List<Schema>> findBySubjectAndFormatOrderByVersionAsc(@NonNull final String subject,
@NonNull final String format) {
List<Schema> schemas = this.repository.findBySubjectAndFormatOrderByVersion(subject, format);
if (schemas.isEmpty()) {
throw new SchemaNotFoundException(
String.format("No schemas found for subject %s and format %s", subject, format));
}
return new ResponseEntity<>(schemas, HttpStatus.OK);
}
private void deleteSchema(Schema schema) {
if (schema == null) {
throw new SchemaNotFoundException("Could not find Schema");
}
this.repository.delete(schema);
}
@ExceptionHandler(UnsupportedFormatException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ResponseBody
public String onUnsupportedFormat(UnsupportedFormatException e) {
return errorMessage("Format not supported", e);
}
@ExceptionHandler(InvalidSchemaException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ResponseBody
public String onInvalidSchema(InvalidSchemaException e) {
return errorMessage("Invalid Schema", e);
}
@ExceptionHandler(SchemaNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
@ResponseBody
public String schemaNotFound(SchemaNotFoundException ex) {
return errorMessage("Schema not found", ex);
}
@ExceptionHandler(SchemaDeletionNotAllowedException.class)
@ResponseStatus(HttpStatus.METHOD_NOT_ALLOWED)
@ResponseBody
public String schemaDeletionNotPermitted(SchemaDeletionNotAllowedException ex) {
return errorMessage("Schema deletion is not permitted", ex);
}
private String errorMessage(String prefix, Throwable e) {
return prefix + (StringUtils.hasText(e.getMessage()) ? ": " + e.getMessage() : "");
}
}