Trying to create a akka-http server consuming multi-part messages

I’m currently working on a akka-http server which should consume multi-part messages and send back on of those parts to the client. Currently I struggle a bit with the framework itself and especially lambdas as they’re pretty new to me. Here is my current code:

package de.steffenrumpf.akka.http.webserver;
 
import java.util.concurrent.CompletionStage;
 
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity.Strict;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
 
public class Server extends AllDirectives {
 
private ActorMaterializer materializer;
private Http http;
private Flow<HttpRequest, HttpResponse, NotUsed> routeFlow;
private CompletionStage<ServerBinding> binding;
private ActorSystem system;
 
public Server(ActorSystem system) {
ActorMaterializer materializer = ActorMaterializer.create(system);
http = Http.get(system);
 
routeFlow = createRoute().flow(system, materializer);
binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost("localhost", 8080), materializer);
}
 
public static void main(String[] args) throws Exception {
// boot up server using the route as defined below
ActorSystem system = ActorSystem.create("routes");
 
// In order to access all directives we need an instance where the routes are define.
Server server = new Server(system);
 
System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
System.in.read(); // let it run until user presses return
 
server.unbind();
}
 
public void unbind() {
binding.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}
 
private Route createRoute() {
 
return route(path("test", () -> post(() -> entity(Unmarshaller.entityToMultipartFormData(), formData -> {
 
final CompletionStage<Strict> response = formData.getParts()
.filter(part -> part.getName()
.equals("text1"))
.mapAsync(1, requestPart -> {
 
return requestPart.getEntity()
.getDataBytes()
.map(bs -> HttpEntities.create(bs))
.runFold(HttpEntities.create("Empty Response"), (emptyResponse, obj) -> obj, materializer);
})
.runFold(HttpEntities.create("Empty Response 2"), (emptyResponse2, obj) -> obj, materializer);
 
return onComplete(() -> response, extraction -> complete(extraction.get()));
}))));
}
}

Currently I’m getting an NullPointerException in runFold Method but I don’t know why… I’ll update this posts when I know how to solve it.

This entry was posted in Allgemein, Java, PC-Krams and tagged , , . Bookmark the permalink.