Trying to create a akka-http server consuming multi-part messages

I’m currently working on a akka-http server which should consume multi-part messages and send back on of those parts to the client. Currently I struggle a bit with the framework itself and especially lambdas as they’re pretty new to me. Here is my current code:

package de.steffenrumpf.akka.http.webserver;

import java.util.concurrent.CompletionStage;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity.Strict;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;

public class Server extends AllDirectives {

private ActorMaterializer materializer;
private Http http;
private Flow<HttpRequest, HttpResponse, NotUsed> routeFlow;
private CompletionStage<ServerBinding> binding;
private ActorSystem system;

public Server(ActorSystem system) {
ActorMaterializer materializer = ActorMaterializer.create(system);
http = Http.get(system);

routeFlow = createRoute().flow(system, materializer);
binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost("localhost", 8080), materializer);
}

public static void main(String[] args) throws Exception {
// boot up server using the route as defined below
ActorSystem system = ActorSystem.create("routes");

// In order to access all directives we need an instance where the routes are define.
Server server = new Server(system);

System.out.println("Server online at http://localhost:8080/\nPress RETURN to stop...");
System.in.read(); // let it run until user presses return

server.unbind();
}

public void unbind() {
binding.thenCompose(ServerBinding::unbind) // trigger unbinding from the port
.thenAccept(unbound -> system.terminate()); // and shutdown when done
}

private Route createRoute() {

return route(path("test", () -> post(() -> entity(Unmarshaller.entityToMultipartFormData(), formData -> {

final CompletionStage<Strict> response = formData.getParts()
.filter(part -> part.getName()
.equals("text1"))
.mapAsync(1, requestPart -> {

return requestPart.getEntity()
.getDataBytes()
.map(bs -> HttpEntities.create(bs))
.runFold(HttpEntities.create("Empty Response"), (emptyResponse, obj) -> obj, materializer);
})
.runFold(HttpEntities.create("Empty Response 2"), (emptyResponse2, obj) -> obj, materializer);

return onComplete(() -> response, extraction -> complete(extraction.get()));
}))));
}
}

Currently I’m getting an NullPointerException in runFold Method but I don’t know why… I’ll update this posts when I know how to solve it.

This entry was posted in Allgemein, Java, PC-Krams and tagged , , . Bookmark the permalink.