Pythian Blog: Technical Track

Building a custom routing NiFi processor with Scala

In this post we will build a toy example NiFi processor which is still quite efficient and has powerful capabilities. Processor logic is straightforward: it will read incoming files line by line, apply given function to transform each line into key-value pairs, group them by key, write values to output files and transfer them into specified relationships based on group key. As a starting point I used nifi-processor-bundle-scala. You can deploy it right away with sbt. Relationships We have 3 permanent relationships specified in trait RouterRelationships: RelUnmatched
    : here will be transferred all files for the keys that don't have related dynamic relationships configured;
RelFailure
    : here will be transferred incoming flow file if there is any failure happened during the processing;
RelIncompatible
    : we will collect all lines that couldn't be processed into one file and move it here.
In addition to these, we can configure dynamic relationships via properties. Those properties name will be relationship name and value is boolean parameter if relationship should be crealted or not. Properties For properties we have one required property that should be configured with mapping function that will convert string into key-value pair. Here is the simple validator for this property: [code language="scala"] private val CODE_VALIDATOR = new Validator() { override def validate(subject: String, input: String, context: ValidationContext): ValidationResult = { val compiled = LineProcessor.compile(input) val text = compiled match { case Failure(ex) => ex.toString case _ => } new ValidationResult.Builder() .subject(subject) .input(input) .explanation(s"Compilation failed with exception: $text") .valid(compiled.isSuccess) .build() } } [/code] Please note that this validation is only about compilation: we can't really ensure that this function will work correctly. It will just check that the code can be compiled and have String=>(String,String) type. Other properties are dynamic and define relationships as described above. We have to implement getSupportedDynamicPropertyDescriptor and onPropertyModified methods in our processor class. onTrigger Here is where all processing logic lives. Once non-null flow file comes, we try to get compiled processing function from cache or actually compile it: [code language="scala"] val processorCode = context.getProperty(PROCESSOR).getValue val transform = compile(processorCode).get [/code] Then we make preparations for RoutingSink (we will touch it later): create Sink that takes care of creating output flow files, prepare key-value and key classes for routing, and finally create an instance of RoutingSink: [code language="scala"] val groupBy = RoutingSink.create[KeyValue, KeyType, FlowFile](FlowFileSink, _.keyType) [/code] After this we are ready to read line-by-line input file, transform lines into key-value pairs and send them to Sink. [code language="scala"] val br = new BufferedReader(new InputStreamReader(in)) var line: String = null while ( { line = br.readLine() line != null }) { val kv = Try { val (k, v) = transform(line) GoodKeyValue(k, v) }.getOrElse(ErrorKeyValue(line)) groupBy.add(kv) } in.close() [/code] After all we close it, getting list of output flow files and manage file transferring to correspondent relationships: [code language="scala"] val files = groupBy.close() files.foreach { case (ErrorKeyType, file) => session.transfer(file, RelIncompatible) case (DataKeyType(name), file) => val rel = dynamicRelationships.get.getOrElse(name, RelUnmatched) session.transfer(file, rel) } [/code] RoutingSink RoutingSink is the akka actor based hand made implementation of routing worker pool were each worker processes only data for a one key. Worker actor creates an instance of its own Sink and adds all incoming data into it. Router actor performs housekeeping work and communications. Build and deploy In order to build the processor, please run mvn compile test package. It will also execute tests before building nar file. After packaging you should find nar file nifi-akka-bundle/nifi-akka-nar/target/nifi-akka-nar-1.0.nar. To use the processor in NiFi copy this file into lib directory. Make sure you have the same NiFi version that you used for processor building (at the moment of writing this article it was 1.4.0). Start NiFi and try to create something simple as at the picture below. Add 1 one property for dynamic relationship "black", add the following code for json decomposing: [code language="scala"] import pythian.nifi.processors.LineProcessor.GSON import com.google.gson.JsonObject (line: String) => { val jo = GSON.fromJson(line, classOf[JsonObject]) val tpe = jo.get("type").getAsString val payload = jo.get("payload") val data = GSON.toJson(payload) (tpe, data) } [/code] Add a few GetFile and PutFile processors and start it: The use file with following content to test it: [code language="javascript"] {"type":"red","payload":{"id":27,"data":410}} {"type":"black","payload":{"id":28,"data":621}} {"type":"red","payload":{"id":29,"data":614}} {"type":"green","payload":{"id":30,"data":545}} {"type":"black","payload":{"id":31,"data":52}} {"type":"red","payload":{"id":32,"data":932}} {"type":"green","payload":{"id":33,"data":897}} {"type":"red","payload":{"id":34,"data":943}} {"type":"green","payload":{"id":35,"data":314}} {BAD ROW !!! "type":"red","payload":{"id":36,"data":713}} {"type":"black","payload":{"id":37,"data":444}} {"type":"green","payload":{"id":38,"data":972}} {"type":"black","payload":{"id":39,"data":988}} {"type":"black","payload":{"id":40,"data":996}} {"type":"red","payload":{"id":41,"data":741,"SOMETHING":42}} {"type":"red","payload":{"id":42,"data":82}} {"type":"black","payload":{"id":43,"data":616}} {"type":"black","payload":{"id":44,"data":135}} {"type":"black","payload":{"id":45,"data":549}} [/code] A moment after put this file into input directory it should be processed and you can find 2 files for red and green keys under unmatched location, one file for black and one incompatible with only bad row: [code language="javascript"]{BAD ROW !!! "type":"red","payload":{"id":36,"data":713}}[/code] As further development processing can be moved from main thread into akka router with pull of workers. It should be more carefully taken about failures (what if sink creation failed?) and actor restarts.

No Comments Yet

Let us know what you think

Subscribe by email