Siddarth Yadav
4 min readDec 6, 2020

Using DataBuffer with Spring WebFlux, Spring Events

WebFlux is spring framework’s extension to program in accordance with the reactive manifesto.

On a low level, spring framework provides DataBuffer as an abstraction on top of ByteBuffer, as per spring documentation. In this story, we will be looking at DatBuffer abstraction and utilize it’s capabilities in reactive streaming.

Use case covered here is very basic:

  1. A large number of different sized file will arrive at a certain location. A file watcher will react to read the file (when the file write is completed).
  2. File will be read in chunks and transmitted over network to destination location.
  3. Bytes will be aggregated at the target location and recreated as Strings.
  4. High Level Functionalities in WebFlux project will be used for this purpose

Overall thought process here to be reactively streaming data with close watch on memory consumption, and a decoupled, event driven design

As a first step, we set up a watcher service that will look at a directory path of file where new files will be created and create spring events as the file write is completed

Create a File Watcher Using NIO2 Watcher service

watchService = FileSystems.getDefault().newWatchService();

Create a component ApplicationReadyListener

@Override
public void onApplicationEvent(final ApplicationReadyEvent readyEvent) {
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
WatchKey key = null;
while (true) {
if ((key = watchService.take()) == null) break;
for (WatchEvent<?> event : key.pollEvents()) {
fileEventPublisher.publishFileEvent(path.toString() + "\\" + event.context().toString());
}
key.reset();
}

} catch (IOException | InterruptedException e) {

}
}

path variable here is a NIO path to the directory which you want to have under watch. We register ENTRY_CREATE event to be watched.

In short, fileEventPusblisher publishes a fileEvent to the FileEventListener

public class FileEvent extends ApplicationEvent {

public FileEvent(String filePath) {
super(filePath);
}
}

public class FileEventPublisher {

@Autowired
ApplicationEventPublisher applicationEventPublisher;

public void publishFileEvent(String filePath) {
FileEvent fileEvent = new FileEvent(filePath);
applicationEventPublisher.publishEvent(fileEvent);
}
}

Listener class in this will do the heavy lifting of converting the DataBuffer to string and passing to subscribers

@Component
public class FileEventListener implements ApplicationListener<FileEvent> {


@Autowired
FileEventProcessor eventProcessor;

@SneakyThrows
@Override
public void onApplicationEvent(FileEvent fileEvent) {
File files = new File(fileEvent.getSource().toString());
Path p = Paths.get(files.getAbsolutePath());

checkFileGrowth(files);
DataBufferFactory dbf = new DefaultDataBufferFactory();
Flux<DataBuffer> buffer = DataBufferUtils.read(p, dbf, BUFFER_SIZE);
Flux<String> stringFlux = buffer.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return Mono.just(new String(bytes, StandardCharsets.UTF_8));
});
stringFlux.subscribe(k->eventProcessor.getSink().emitNext(k, Sinks.EmitFailureHandler.FAIL_FAST));
stringFlux.blockLast();
}

private void checkFileGrowth(File p) throws InterruptedException {
while ( !p.renameTo(p)) {
try {
System.out.println("waiting");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}


}

}

When a FileEvent is received, the File is read via a AsynchronoueFileChannel in a non-blocking fashion. checkFileGrowth ensures the file is completely written before we start to read it in memory. Buffer size used for this purpose is equal to the limit of the LimitedDataBufferList(as set by app properties). A processing pipeline is setup using Flux<DataBuffer> and converted to a Flux<String>, which is subscribed to a processor (FluxSink could also be utilized for the same purpose).

Now we create a simple controller like below ReaderController with below GetMapping. This will return a event stream of String

@GetMapping(path = "/reader", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> reader() {
return fileEventProcessor.getFlux();
}

The last piece of this intermediate stage in processing pipeline FlieEventProcessor. This exposes a Sinks with multi-subscriber and replay capability

@Component
public class FileEventProcessor {

Sinks.Many replaySink = Sinks.many().multicast().onBackpressureBuffer();

public FileEventProcessor() {
}
public Flux<Object> getFlux() {
return replaySink.asFlux();
}
public Sinks.Many<Object> getSink() {
return replaySink;
}
}

This is just a convenience class to ensure all subscribers are:

  1. Using backPressure and replay the previous items to new subscribers
  2. Recreated on cancel -> In the new Edit ( I moved to using Flux sink) so this is not required

Time to Run it!

Start the application

git clone https://github.com/sinfull1/demoFLux.git

git checkout master

mvn clean install

In the file RenameController, change the path variable to a directory you want to watch for creation of new files

Go to the browser and run http://localhost:8080/reader. This will subscribe to the watcher flux

Now in the path, copy paste one or multiple json file of different sizes

You would see that the data from the files is read reactively and is immediately available on the browser. You can use an EventSource and reduce the data to concrete json object, stream video etc. as per your use case. Same can be done for entity arrays from json files. Will cover that in the next blog.

Just to go over what’s happening, lets look at below test case.

public class TestCase {


@Test
public void test() throws JSONException {

WebTestClient client= WebTestClient.bindToController(new TestController()).build();
WebTestClient.ResponseSpec resp = client.get().uri(uriBuilder -> uriBuilder
.path("/test")
.queryParam("fileName", "generated.json")
.build()).exchange();
Flux<DataBuffer> buffer= resp.returnResult(DataBuffer.class).getResponseBody();

Flux<String> stringFlux = buffer.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String jkh = new String(bytes, StandardCharsets.UTF_8);
return Mono.just(jkh);
});

stringFlux.blockLast();


}

@RestController
public class TestController {

String basePath = "watch-path//";


@GetMapping(path = "/test")
public Flux<DataBuffer> test(@RequestParam("fileName") String fileName) {
Path p = Paths.get(new File(basePath + fileName).getAbsolutePath());
DataBufferFactory dbf = new DefaultDataBufferFactory();
Flux<DataBuffer> flux = DataBufferUtils.read(p, dbf, 256 * 256);
return flux;
}
}

}

This test case can be used to play around with the buffer size. It uses WebTestClient functionality to acheive similiar functionality

No responses yet