在java中读取json列表

问题描述 投票:0回答:3

以前我用以下格式读取json数据:

JSON

{
    "CreationTime":"2018-01-12T12:32:31",
    "Id":"08f81fd7-21f1-48ba-a991-08d559b88cc5",
    "Operation":"AddedToGroup",
    "RecordType":14,
    "UserType":0,
    "Version":1,
    "Workload":"OneDrive",
    "ClientIP":"115.186.129.229",
    "UserId":"[email protected]",
    "EventSource":"SharePoint",
    "ItemType":"Web"
}

我正在从kafka主题中读取这个json数据,并对其进行一些流处理并将其传递给另一个主题。在处理过程中,我创建了两个json对象,sendreceived

使用此代码:

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");

        source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("========> o365_user_activity_by_date Log:     " + value);
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = new JSONObject(value);

                    send.put("current_date", getCurrentDate().toString()); // UTC TIME
                    send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
                    send.put("user_id", received.get("UserId"));
                    send.put("operation_type", received.get("Operation"));
                    send.put("app_name", received.get("Workload"));
                    keywords.add(send.toString());
                    // apply regex to value and for each match add it to keywords

                } catch (Exception e) {
                    // TODO: handle exception
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }

                return keywords;
            }
        }).to("o365_user_activity_by_date");

这很简单。现在我有一个带有列表的json数据。

JSON

{
  "CreationTime":"2017-12-27T07:47:46",
  "Id":"10ee505b-90a4-4ac1-b96f-a6dbca939694",
  "Operation":"Add member to role.",
  "OrganizationId":"2f88f444-62da-4aae-b8af-8331a6915801",
  "RecordType":8,
  "ResultStatus":"success",
  "UserKey":"[email protected]",
  "UserType":0,
  "Version":1,
  "Workload":"AzureActiveDirectory",
  "ObjectId":"[email protected]",
  "UserId":"[email protected]",
  "AzureActiveDirectoryEventType":1,
  "ExtendedProperties":[
     {
        "Name":"Role.ObjectID",
        "Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"
     },
     {
        "Name":"Role.DisplayName",
        "Value":"Billing Administrator"
     },
     {
        "Name":"Role.TemplateId",
        "Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"
     },
     {
        "Name":"Role.WellKnownObjectName",
        "Value":"BillingAdmins"
     }
  ],
  "Actor":[
     {
        "ID":"[email protected]",
        "Type":5
     },
     {
        "ID":"10030000A656FE5B",
        "Type":3
     },
     {
        "ID":"User_d03ca514-adfa-4585-a8bd-7182a9a086c7",
        "Type":2
     }
  ],
  "ActorContextId":"2f88f444-62da-4aae-b8af-8331a6915801",
  "InterSystemsId":"6d402a5b-c5de-4d9f-a805-9371c109e55f",
  "IntraSystemId":"a5568d01-f100-497a-b88b-c9731ff31248",
  "Target":[
     {
        "ID":"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d",
        "Type":2
     },
     {
        "ID":"[email protected]",
        "Type":5
     },
     {
        "ID":"1003BFFDA67CCA03",
        "Type":3
     }
  ],
  "TargetContextId":"2f88f444-62da-4aae-b8af-8331a6915801"
}

如何在Stream处理中执行相同的操作?我希望能够针对某些键(包括列表数据键)读取JSON数据。

java java-8 apache-kafka-streams
3个回答
0
投票

为什么不将JSON转换为Object,然后使用Object中的字段进行过滤?


0
投票

你不能这样吗?

send.put("target_0_id", received.get("Target").getJSONObject(0).get("ID"));

0
投票

您可以使用gson库并将json转换为对象,使用getter和setter可以构建所需的输出JSON。您还可以解析输入JSON以获取JSONArray详细信息。以下是如何使用POJO执行此操作的代码。

输入类:

public class Input {


    private String UserType;

    private String TargetContextId;

    private String RecordType;

    private String Operation;

    private String Workload;

    private String UserId;

    private String OrganizationId;

    private String InterSystemsId;

    private ExtendedProperties[] ExtendedProperties;

    private String ActorContextId;

    private String CreationTime;

    private String IntraSystemId;

    private Target[] Target;

    private Actor[] Actor;

    private String Id;

    private String Version;

    private String ResultStatus;

    private String ObjectId;

    private String AzureActiveDirectoryEventType;

    private String UserKey;

    public String getUserType ()
    {
        return UserType;
    }

    public void setUserType (String UserType)
    {
        this.UserType = UserType;
    }

    public String getTargetContextId ()
    {
        return TargetContextId;
    }

    public void setTargetContextId (String TargetContextId)
    {
        this.TargetContextId = TargetContextId;
    }

    public String getRecordType ()
    {
        return RecordType;
    }

    public void setRecordType (String RecordType)
    {
        this.RecordType = RecordType;
    }

    public String getOperation ()
    {
        return Operation;
    }

    public void setOperation (String Operation)
    {
        this.Operation = Operation;
    }

    public String getWorkload ()
    {
        return Workload;
    }

    public void setWorkload (String Workload)
    {
        this.Workload = Workload;
    }

    public String getUserId ()
    {
        return UserId;
    }

    public void setUserId (String UserId)
    {
        this.UserId = UserId;
    }

    public String getOrganizationId ()
    {
        return OrganizationId;
    }

    public void setOrganizationId (String OrganizationId)
    {
        this.OrganizationId = OrganizationId;
    }

    public String getInterSystemsId ()
    {
        return InterSystemsId;
    }

    public void setInterSystemsId (String InterSystemsId)
    {
        this.InterSystemsId = InterSystemsId;
    }

    public ExtendedProperties[] getExtendedProperties ()
    {
        return ExtendedProperties;
    }

    public void setExtendedProperties (ExtendedProperties[] ExtendedProperties)
    {
        this.ExtendedProperties = ExtendedProperties;
    }

    public String getActorContextId ()
    {
        return ActorContextId;
    }

    public void setActorContextId (String ActorContextId)
    {
        this.ActorContextId = ActorContextId;
    }

    public String getCreationTime ()
    {
        return CreationTime;
    }

    public void setCreationTime (String CreationTime)
    {
        this.CreationTime = CreationTime;
    }

    public String getIntraSystemId ()
    {
        return IntraSystemId;
    }

    public void setIntraSystemId (String IntraSystemId)
    {
        this.IntraSystemId = IntraSystemId;
    }

    public Target[] getTarget ()
    {
        return Target;
    }

    public void setTarget (Target[] Target)
    {
        this.Target = Target;
    }

    public Actor[] getActor ()
    {
        return Actor;
    }

    public void setActor (Actor[] Actor)
    {
        this.Actor = Actor;
    }

    public String getId ()
    {
        return Id;
    }

    public void setId (String Id)
    {
        this.Id = Id;
    }

    public String getVersion ()
    {
        return Version;
    }

    public void setVersion (String Version)
    {
        this.Version = Version;
    }

    public String getResultStatus ()
    {
        return ResultStatus;
    }

    public void setResultStatus (String ResultStatus)
    {
        this.ResultStatus = ResultStatus;
    }

    public String getObjectId ()
    {
        return ObjectId;
    }

    public void setObjectId (String ObjectId)
    {
        this.ObjectId = ObjectId;
    }

    public String getAzureActiveDirectoryEventType ()
    {
        return AzureActiveDirectoryEventType;
    }

    public void setAzureActiveDirectoryEventType (String AzureActiveDirectoryEventType)
    {
        this.AzureActiveDirectoryEventType = AzureActiveDirectoryEventType;
    }

    public String getUserKey ()
    {
        return UserKey;
    }

    public void setUserKey (String UserKey)
    {
        this.UserKey = UserKey;
    }

    @Override
    public String toString()
    {
        return "ClassPojo [UserType = "+UserType+", TargetContextId = "+TargetContextId+", RecordType = "+RecordType+", Operation = "+Operation+", Workload = "+Workload+", UserId = "+UserId+", OrganizationId = "+OrganizationId+", InterSystemsId = "+InterSystemsId+", ExtendedProperties = "+ExtendedProperties+", ActorContextId = "+ActorContextId+", CreationTime = "+CreationTime+", IntraSystemId = "+IntraSystemId+", Target = "+Target+", Actor = "+Actor+", Id = "+Id+", Version = "+Version+", ResultStatus = "+ResultStatus+", ObjectId = "+ObjectId+", AzureActiveDirectoryEventType = "+AzureActiveDirectoryEventType+", UserKey = "+UserKey+"]";
    }}

目标类:

public class Target {
private String Type;

private String ID;

public String getType() {
    return Type;
}

public void setType(String Type) {
    this.Type = Type;
}

public String getID() {
    return ID;
}

public void setID(String ID) {
    this.ID = ID;
}

@Override
public String toString() {
    return "ClassPojo [Type = " + Type + ", ID = " + ID + "]";
}}

演员班:

public class Actor {
private String Type;

private String ID;

public String getType() {
    return Type;
}

public void setType(String Type) {
    this.Type = Type;
}

public String getID() {
    return ID;
}

public void setID(String ID) {
    this.ID = ID;
}

@Override
public String toString() {
    return "ClassPojo [Type = " + Type + ", ID = " + ID + "]";
}}

ExtendedProperties类:

public class ExtendedProperties {
private String Name;

private String Value;

public String getName() {
    return Name;
}

public void setName(String Name) {
    this.Name = Name;
}

public String getValue() {
    return Value;
}

public void setValue(String Value) {
    this.Value = Value;
}

@Override
public String toString() {
    return "ClassPojo [Name = " + Name + ", Value = " + Value + "]";
}}

主要课程:

public class Stack {

public static void main(String[] args) {
    doIt();
}

private static void doIt() {
    String received = "{\"CreationTime\":\"2017-12-27T07:47:46\",\"Id\":\"10ee505b-90a4-4ac1-b96f-a6dbca939694\",\"Operation\":\"Add member to role.\",\"OrganizationId\":\"2f88f444-62da-4aae-b8af-8331a6915801\",\"RecordType\":8,\"ResultStatus\":\"success\",\"UserKey\":\"[email protected]\",\"UserType\":0,\"Version\":1,\"Workload\":\"AzureActiveDirectory\",\"ObjectId\":\"[email protected]\",\"UserId\":\"[email protected]\",\"AzureActiveDirectoryEventType\":1,\"ExtendedProperties\":[{\"Name\":\"Role.ObjectID\",\"Value\":\"b0f54661-2d74-4c50-afa3-1ec803f12efe\"},{\"Name\":\"Role.DisplayName\",\"Value\":\"Billing Administrator\"},{\"Name\":\"Role.TemplateId\",\"Value\":\"b0f54661-2d74-4c50-afa3-1ec803f12efe\"},{\"Name\":\"Role.WellKnownObjectName\",\"Value\":\"BillingAdmins\"}],\"Actor\":[{\"ID\":\"[email protected]\",\"Type\":5},{\"ID\":\"10030000A656FE5B\",\"Type\":3},{\"ID\":\"User_d03ca514-adfa-4585-a8bd-7182a9a086c7\",\"Type\":2}],\"ActorContextId\":\"2f88f444-62da-4aae-b8af-8331a6915801\",\"InterSystemsId\":\"6d402a5b-c5de-4d9f-a805-9371c109e55f\",\"IntraSystemId\":\"a5568d01-f100-497a-b88b-c9731ff31248\",\"Target\":[{\"ID\":\"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d\",\"Type\":2},{\"ID\":\"[email protected]\",\"Type\":5},{\"ID\":\"1003BFFDA67CCA03\",\"Type\":3}],\"TargetContextId\":\"2f88f444-62da-4aae-b8af-8331a6915801\"}";
    JSONObject send = new JSONObject();
    Gson gson = new Gson();
    Input inputObject = gson.fromJson(received, Input.class);

    // you can add values here and customize the output JSON
    send.put("userId", inputObject.getUserId());
    send.put("Workload", inputObject.getWorkload());

    // read Actor list
    Actor[] arr = inputObject.getActor();
    for (int i = 0; i < arr.length; i++) {
        // write your logic here how you want to handle the Actor list
        // values
        System.out.println(arr[i].getID() + " : " + arr[i].getType());
    }

    // read ExtendedProperties list
    ExtendedProperties[] extendedProperties = inputObject.getExtendedProperties();
    for (int j = 0; j < extendedProperties.length; j++) {
        // write your logic here how you want to handle the
        // ExtendedProperties list values
        System.out.println(extendedProperties[j].getName() + " : " + extendedProperties[j].getValue());
    }

    System.out.println("*************");
}}

不使用POJO的备用主类。这里使用org.json库来解析输入JSON。

public class Test {

public static void main(String[] args) {
    doIt();
}

private static void doIt() {
    String received = "{\"CreationTime\":\"2017-12-27T07:47:46\",\"Id\":\"10ee505b-90a4-4ac1-b96f-a6dbca939694\",\"Operation\":\"Add member to role.\",\"OrganizationId\":\"2f88f444-62da-4aae-b8af-8331a6915801\",\"RecordType\":8,\"ResultStatus\":\"success\",\"UserKey\":\"[email protected]\",\"UserType\":0,\"Version\":1,\"Workload\":\"AzureActiveDirectory\",\"ObjectId\":\"[email protected]\",\"UserId\":\"[email protected]\",\"AzureActiveDirectoryEventType\":1,\"ExtendedProperties\":[{\"Name\":\"Role.ObjectID\",\"Value\":\"b0f54661-2d74-4c50-afa3-1ec803f12efe\"},{\"Name\":\"Role.DisplayName\",\"Value\":\"Billing Administrator\"},{\"Name\":\"Role.TemplateId\",\"Value\":\"b0f54661-2d74-4c50-afa3-1ec803f12efe\"},{\"Name\":\"Role.WellKnownObjectName\",\"Value\":\"BillingAdmins\"}],\"Actor\":[{\"ID\":\"[email protected]\",\"Type\":5},{\"ID\":\"10030000A656FE5B\",\"Type\":3},{\"ID\":\"User_d03ca514-adfa-4585-a8bd-7182a9a086c7\",\"Type\":2}],\"ActorContextId\":\"2f88f444-62da-4aae-b8af-8331a6915801\",\"InterSystemsId\":\"6d402a5b-c5de-4d9f-a805-9371c109e55f\",\"IntraSystemId\":\"a5568d01-f100-497a-b88b-c9731ff31248\",\"Target\":[{\"ID\":\"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d\",\"Type\":2},{\"ID\":\"[email protected]\",\"Type\":5},{\"ID\":\"1003BFFDA67CCA03\",\"Type\":3}],\"TargetContextId\":\"2f88f444-62da-4aae-b8af-8331a6915801\"}";
    JSONObject send = new JSONObject();

    JSONObject input = new JSONObject(received);

    // you can add values here and customize the output JSON
    send.put("userId", input.getString("UserId"));
    send.put("Workload", input.getString("Workload"));

    // read Actor list
    JSONArray actorArray = input.getJSONArray("Actor");
    for (int i = 0; i < actorArray.length(); i++) {
        // write your logic here how you want to handle the Actor list
        // values
        System.out.println(
                actorArray.getJSONObject(i).getString("ID") + ":" + actorArray.getJSONObject(i).getInt("Type"));
    }

    // read ExtendedProperties list
    JSONArray extendedProperties = input.getJSONArray("ExtendedProperties");
    for (int j = 0; j < extendedProperties.length(); j++) {
        // write your logic here how you want to handle the
        // ExtendedProperties list values
        System.out.println(extendedProperties.getJSONObject(j).getString("Name") + " : "
                + extendedProperties.getJSONObject(j).getString("Value"));
    }

    System.out.println("*************");
}}
© www.soinside.com 2019 - 2024. All rights reserved.