QuickFIX/J — 按顺序重新发送所有消息的策略(全天恢复)

问题描述 投票:0回答:1

我们的项目(server/acceptor 端)想要实现通信失败的恢复。这特别包括使用

MsgType.RESEND_REQUEST
BeginSeqNo == 1
响应
EndSeqNo == 0
类型的消息,这是重新发送所有消息的请求。所以我们要按照从1到最新的顺序重新发送所有的消息来响应它。

该功能最初设计用于在应用程序级别处理重发请求(通过在实现

MsgType.RESEND_REQUEST
接口的类的
fromAdmin()
方法中检测
quickfix.Application
类型的消息)并使用适当的可能重复标志和间隙填充再次发送所有消息.

在测试时,我们发现 QuickFIX/J 已经在会话级别具有内置恢复功能,这会干扰我们的自定义实现。恢复由

quickfix.Session
类实现,
RESEND_REQUEST
类型的消息由
next()
方法检测,并由
nextResendRequest()
方法进一步处理。
nextResendRequest()
尝试从
MessageStorage
检索和重新发送消息,或者如果存储无法提供消息,则用 Gap Fills 替换它们。

现在我们想要禁用内置恢复并让我们的应用程序完成整个工作,或者利用内置恢复功能来使用我们的自定义消息源(Kafka 主题)。

禁用的问题是我们看不到合法的方法,没有设置或自定义点。 QuickFIX/J 会话层似乎是为自己的自动重发处理而设计的,除了选择

MessageStorage
实现之外,不提供用于自定义的公共 API。

使用内置机制的问题在于,在某些时候它希望所有消息都作为内存中的

ArrayList
,这不适用于 100k-1M 卷,我们需要一个选项来流式传输数据。

我们看到以下继续实施的选项:

1。为流式传输定制 MessageStore

提供一个

MessageStore
实现,它依赖于存储在 Kafka 中的数据,并根据请求检索一系列消息。 此选项的问题是用于检索消息的 MessageStorage API 是同步的。它希望收到完整的消息列表以在一个块中重新发送 (
ArrayList
),我们无法流式传输消息。由于内存消耗和处理时间,这对于大容量来说是不可接受的。

我们想知道我们是否可以以某种方式调整

MessageStore
以使用类似访问者的来源来提供消息。

2。自定义会话实现

用跳过内部重新发送处理的自定义版本替换

Session
类。没有直接的方法可以做到这一点,因为:

  • Session
    不是为扩展而设计的:相关方法(
    next()
    nextResendRequest()
    )是私有的。
  • 以为定义了一个
    SessionFactory
    接口,它只有一个默认实现,似乎没有办法用其他外部替代品替换
    DefaultSessionFactory

通过反射进行一些黑客攻击是可能的,但这不是一个简单的方法。

3。异常绕过

在应用程序级别可用的方法中抛出一些异常,以取消会话级别的内置处理。如果仔细结合

RejectMessageOnUnhandledException
设置以容忍消息处理失败,这可能会起作用,但存在框架状态被破坏或其他副作用可能出现的风险。

您能否建议使用哪个选项更好?或者建议另一种策略来恢复来自 Kafka 的 100k-1M 消息的完整序列?

quickfix quickfixj
1个回答
0
投票

关于规格,你几乎在正确的位置,但我不得不承认,找到它有点棘手。从您链接到“FIX Application Layer”的页面 -> FIX Latest -> Infrastructure-> Specification.

https://www.fixtrading.org/online-specification/business-area-infrastructure/

或者更具体地说https://www.fixtrading.org/online-specification/business-area-infrastructure/#category-application-sequencing

那里的一些引述对我来说听起来正是您正在寻找的东西:

应用程序排序和恢复为无缝请求和重新发送所需的消息——并且只有所需的消息——提供了条件,同时保留了会话协议的标准行为。它还为接收方提供了将应用程序级消息的恢复推迟到缓慢时期或市场关闭之后的灵活性。

应用程序排序不是在正常的订单路由场景中使用的东西。它在大容量单向连接中具有更大的相关性,在这种连接中,接收方希望具有某种能力来控制在断开连接或数据丢失后重新发送的数据。

当 FIX 连接用于批量传送数据时,它应该只用于管理数据流......

然而,顾名思义,应用程序级排序需要在您的应用程序中实现,即 QFJ 中没有对此的内置支持。

© www.soinside.com 2019 - 2024. All rights reserved.