Azure功能到表存储

问题描述 投票:4回答:2

我有一个Azure功能,我希望它从EventHub获取消息(这是相当简单和有效),然后在运行时使用表绑定将该信息放入表存储。

这是我到目前为止所拥有的:

public static async Task Run(string eventHubMessage, TraceWriter log, Binder binder)
{
   var m = JsonConvert.DeserializeObject<Measurement>(eventHubMessage);
   var attributes = new Attribute[]
    {
        new StorageAccountAttribute("AzureWebJobsTest"),
        new TableAttribute(tableName, m.PartitionKey, m.RowKey)
    };

    using(var output = await binder.BindAsync<MyTableEntity>(attributes)) 
    {
        if(output == null)
           log.Info($"4. output is null");
        else
        {
            output.Minimum = m.Minimum;
            output.Maximum = m.Maximum;
            output.Average = m.Average;
            output.Timestamp = m.Timestamp;
            output.ETag = m.ETag;  

            output.WriteEntity(/* Need an operationContext*/)
        }
    }
}
public class MyTableEntity : TableEntity, IDisposable
{
    public double Average { get; set;}
    public double Minimum { get; set;}
    public double Maximum { get; set;}

    bool disposed = false;
    public void Dispose()
    { 
        Dispose(true);
        GC.SuppressFinalize(this);           
    }

   protected virtual void Dispose(bool disposing)
   {
      if (disposed)
         return; 

      if (disposing) 
      {
      }

      disposed = true;
   }
}

我的问题;

1)输出始终为空。

2)即使输出不为null,我也不知道我对OperationContext需要什么,或者调用ITableEntity.Write()甚至是正确的方法来使它写入表存储。

和JSON绑定:

{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "eventHubMessage",
      "direction": "in",
      "path": "measurements",
      "connection": "MeasurementsConnectionString"
    }
  ],
  "disabled": false
}
c# azure azure-functions azure-table-storage
2个回答
4
投票

要向Table添加新条目,您应该绑定到IAsyncCollector而不是实体本身,然后创建一个新实体并调用AddAsync。以下代码段对我有用:

var attributes = new Attribute[]
{
    new StorageAccountAttribute("..."),
    new TableAttribute("...")
};

var output = await binder.BindAsync<IAsyncCollector<MyTableEntity>>(attributes);     
await output.AddAsync(new MyTableEntity()
{
    PartitionKey = "...",
    RowKey = "...",
    Minimum = ...,
    ...
});

0
投票

如果您想使用DynamicTableEntity,因为在编译时您不知道消息中将包含哪些数据,并且您意识到表绑定不再适用于DynamicTableEntities,您可以使用以下方法:

private static async Task ProcessMessage(string message, DateTime enqueuedTime)
{
    var deviceData = JsonConvert.DeserializeObject<JObject>(message);

    var dynamicTableEntity = new DynamicTableEntity();
    dynamicTableEntity.RowKey = enqueuedTime.ToString("yyyy-MM-dd HH:mm:ss.fff");

    foreach (KeyValuePair<string, JToken> keyValuePair in deviceData)
    {
        if (keyValuePair.Key.Equals("MyPartitionKey"))
        {
            dynamicTableEntity.PartitionKey = keyValuePair.Value.ToString();
        }
        else if (keyValuePair.Key.Equals("Timestamp")) // if you are using a parameter "Timestamp" it has to be stored in a column named differently because the column "Timestamp" will automatically be filled when adding a line to table storage
        {
            dynamicTableEntity.Properties.Add("MyTimestamp", EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
        }
        else
        {
            dynamicTableEntity.Properties.Add(keyValuePair.Key, EntityProperty.CreateEntityPropertyFromObject(keyValuePair.Value));
        }
    }

    CloudStorageAccount storageAccount = CloudStorageAccount.Parse("myStorageConnectionString");
    CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
    CloudTable table = tableClient.GetTableReference("myTableName"); 
    table.CreateIfNotExists();

    var tableOperation = TableOperation.Insert(dynamicTableEntity);
    await table.ExecuteAsync(tableOperation);
}
© www.soinside.com 2019 - 2024. All rights reserved.