Cloud Stack Ninja

I need to create a "product", but I need to make some calls first:

  1. I need to get(if not cached already) a "Schema" that contains the information about all the fields in the product.
  2. Find all image fields on the request and get it's values that are in Base64, save the files in a storage(in this case MinIO), and modify the request to change the image fields to a url to MinIO.

Then after make the request to create the product.

The problem is that sometimes the request is not changed(it doesn't even enters the method) to change the Base64 to URLs causing an error with the request being too big.

I'm new to RxJava, so I don't know what I'm doing wrong.

  public Single<ProductVersion> createProduct(ProductVersionCreateRequest request) {

    Map<String, Object> extensions = new HashMap<>(request.getExtensions());

    return cacheExtensionSchemaIfNecessary(request.getSchemaFullyQualifiedName()) //calls the schema
        .flatMap(schema -> imageExtensionService
            .saveAllImagesExtensionIfPresent(extensions, request.getName(), schema) //send images to minio
            .onErrorResumeNext(ProductsRxGrpcClient::createMinioError))
        .map(map -> {
          extensions.putAll(map);
          request.setExtensions(extensions); //modifies the request extensions
          return request.toProto(extensionSchemaCache.getOrThrow(request.getSchemaFullyQualifiedName())); //changes the request to protobuf model
        })
        .flatMap(rxProductService::createProduct) //calls the service to create the product
        .map(response -> ProductVersion.builder().id(response.getId()).versionId(response.getVersionId()).build())
        .onErrorResumeNext(ProductsRxGrpcClient::createProductError);
  }


  private Single<ExtensionSchema> cacheExtensionSchemaIfNecessary(String schemaFullyQualifiedName) {
    if (!extensionSchemaCache.containsKey(schemaFullyQualifiedName)) {
      log.debug("Caching schema");
      return rxSchemaClient.getLatestSchemaVersion(schemaFullyQualifiedName)
          .singleOrError()
          .doOnSuccess(extensionSchema -> extensionSchemaCache.put(
                schemaFullyQualifiedName, ExtensionsParser.mapSchemaToFileDescriptorSet(extensionSchema)));
    }
    return Single.just(ExtensionSchema.getDefaultInstance());
  }


  public Single<Map<String, String>> saveAllImagesExtensionIfPresent(
      Map<String, Object> extensions, String requestName,
      ExtensionSchema schema) {

    Map<String, String> mappedImageFields = findImagesExtensionValue(schema, extensions);

    if (mappedImageFields.isEmpty()) {
      log.debug("Returning empty map");
      return Single.just(mappedImageFields);
    }

    List<ImageData> imageDataList = new ArrayList<>();
    mappedImageFields.forEach((key, value) -> {
      ImageData dto = ImageData.builder()
          .name(key.concat(
              requestName != null ? requestName.toLowerCase().replace(" ", "") : ""))
          .base64Content(value)
          .maxSize(findMaxFileSizeConstraint(schema, key)).build();
      imageDataList.add(dto);
    });

    Map<String, String> returnedImages = new HashMap<>();
    return imageDao.storeFiles(imageDataList, TenantHolder.getTenant())
        .doOnSuccess(objectPath -> objectPath.forEach(imageData -> returnedImages.put(
            imageData.getObjectName().replace(requestName.toLowerCase().replace(" ", ""), ""),
            imageData.toUrnString())))
        .map(ignore -> returnedImages)
        .onErrorResumeNext(ImageExtensionService::createMinioError);
  }


  public Single<List<ObjectPath>> storeFiles(List<ImageData> imageDataList, Headers.Tenant tenant) {
    return Single.fromCallable(() -> {
      List<ObjectPath> objectPaths = Collections.synchronizedList(new ArrayList<>());

      imageDataList.parallelStream()
          .forEach(imageData -> {
            ObjectPath objectPath = ObjectPath.builder()
                .bucket(bucket)
                .tenant(tenant.getId())
                .objectName(imageData.getName())
                .build();
            // defaults to 1 MB
            long maxSize = imageData.getMaxSize() != null && imageData.getMaxSize() > 0 ? imageData.getMaxSize() : 1024L * 1024L;
            try (Base64InputStream base64InputStream =
                     new Base64InputStream(
                         new BufferedInputStream(new ByteArrayInputStream(imageData.getBase64Content().getBytes(Charsets.UTF_8))));
                 InputStream inputStream = new BoundedInputStream(base64InputStream, maxSize)) {
              String mimeType = URLConnection.guessContentTypeFromStream(base64InputStream);
              minioClientApi.putObject(objectPath, inputStream, mimeType);
              objectPaths.add(objectPath);
            } catch (Exception e) {
              throw new RuntimeException("Could not save image '" + imageData.getName() + "' to Minio", e);
            }
          });
      return objectPaths;
    }).subscribeOn(Schedulers.io());
  }





Read more here: https://stackoverflow.com/questions/64396442/rxjava2-single-chained-not-being-called

Content Attribution

This content was originally published by Pedro Thales at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: