Apache Spark is pretty good in detecting schema for your data sources while processing data in batch manner. But when it comes to spark streaming or structured streaming automatically schema generation usually fails. Probably such behaviour somehow depends on streaming processing nature, so you have to know what you are consuming before you start to receive any data.

Infer schema for AVRO files streamed from HDFS

Let’s look at next situation:

  • data stored in files in HDFS
  • files using avro format instead more widely used parquet
  • you want to read this files as stream

You are already familiar with this dataset and was able to load it for batch processing like this.

Dataset<Row> batchedDs = spark
    .read()
    .format("avro")
    // .format("com.databricks.spark.avro") // for older spark versions
    .option("inferSchema", true)
    .load(filesLocaionUri);

Streaming from filesystem works in a little bit different way. You have to provide path to directory and spark would monitor and process all new files in this directory.

To make everything work you could do next trick:

  • select small file which match your streaming subset
  • put this file into HDFS or other store available for spark
  • load this file via batch api with option inferSchema = true
  • use schema() method to get schema object of type StructType
  • now you apply this schema object to construct streaming source
StructType schema = spark.read().format("avro")
    .option("inferSchema", true).load(exampleFileUri).schema();

Dataset<Row> streamedDs = spark
    .readStream()
    .format("avro")
    .schema(schema)
    .option("path", directoryUri)
    .load();

This solution would work for any data source where you can infer schema in batching mode.

Infer schema for JSON topic in Kafka

When you deal with Kafka you could save some messages from topic to new line delimited JSON file and try to use it to infer schema.

    StructType schema = ss.read().json("hdfs://name-node:9000/path/to/new/line/delimited.json").schema();
    Dataset<Row> datasetFromKafka = sparkContext
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("startingOffsets", "earliest")
            .option("subscribe", kafkaTopic)
            .schema(schema)
            .load();

Unfortunately it would not work this way and you will get next error.

Exception in thread "main" org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.;
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:222)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
        at sjob.App.weatherHotelDs(App.java:65)
        at sjob.App.doProcessing(App.java:252)
        at sjob.App.main(App.java:280)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This is because by default Kafka input already has it’s own schema. Lets see what debug show() method would output.

+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
|                 key|               value|        topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
|[39 67 33 6E 64 7...|[7B 22 6C 6E 67 2...|kfk-topic-name|        0|     0|1970-01-19 07:50:...|            0|
|[39 67 33 71 38 6...|[7B 22 6C 6E 67 2...|kfk-topic-name|        0|     1|1970-01-19 07:50:...|            0|
|[39 67 33 71 39 6...|[7B 22 6C 6E 67 2...|kfk-topic-name|        0|     2|1970-01-19 07:50:...|            0|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+

As you can guess printSchema() output would show us exact schema that is unified for all Kafka inputs without schema registry.

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Looks like Kafka input records are wrapped with some metadata. The values you are interested in are located in “key” and “value” property and they are in binary format. Now it is obvious that we have to read “value” as string first than convert this string to JSON.

Converting to JSON with schema using from_json() function

// Infer schema
StructType schema = ss.read().json("hdfs://name-node:9000/path/to/new/line/delimited.json").schema();

// Getting Kafka Dataset<Row>
// ...

datasetFromKafka
    // Here we selecting only "value" property and cast it as String
    .select(col("value").cast(DataTypes.StringType).as("value"))
    // Now we are applying from_json to our string value with infered schema
    .select(from_json(col("value"), schema).as("json"))
    // print our schema for debug purpose
    .printSchema();

Now if you will try to printSchema() the output will be like this.

root
 |-- json: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- geohash: string (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lng: double (nullable = true)
 |    |-- avg_tmpr: double (nullable = true)
 |    |-- hotel: struct (nullable = true)
 |    |    |-- Address: string (nullable = true)
 |    |    |-- City: string (nullable = true)
 |    |    |-- Country: string (nullable = true)
 |    |    |-- Name: string (nullable = true)

Note that our JSON object nested under column named “json”.

To make everything correct you should just call another select(col("json.*")) and after that your printSchema() will produce:

root
 |-- date: string (nullable = true)
 |-- geohash: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- hotel: struct (nullable = true)
 |    |-- Address: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Name: string (nullable = true)

Beware of get_json_object function

Looking at this function name you may think that it is what you really need. Because it says that it read object and it does not require any schema. But this is not actually true. This function takes input JSON as string and could read some part of this JSON located at specific JsonPath, but it returns it as string. Even if you will read everything from root (JsonPath = “$”) you will get just same JSON string.