博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊openmessaging的MessagingAccessPoint
阅读量:5923 次
发布时间:2019-06-19

本文共 16974 字,大约阅读时间需要 56 分钟。

本文主要研究一下openmessaging的MessagingAccessPoint

MessagingAccessPoint

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java

/** * An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code * Producer}, {@code Consumer}, {@code ResourceManager}, and other facility entities. * 

* For example: *

 * MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east:default_space"); * messagingAccessPoint.startup(); * Producer producer = messagingAccessPoint.createProducer(); * producer.startup(); * producer.send(producer.createBytesMessage("HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); * 
* * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface MessagingAccessPoint extends ServiceLifecycle { /** * Returns the target OMS specification version of the specified vendor implementation. * * @return the OMS version of implementation * @see OMS#specVersion */ String implVersion(); /** * Returns the attributes of this {@code MessagingAccessPoint} instance. *

* There are some standard attributes defined by OMS for {@code MessagingAccessPoint}: *

    *
  • {@link OMSBuiltinKeys#ACCESS_POINTS}, the specified access points. *
  • {@link OMSBuiltinKeys#DRIVER_IMPL}, the fully qualified class name of the specified MessagingAccessPoint's * implementation, the default value is {@literal io.openmessaging.
    .MessagingAccessPointImpl}. *
  • {@link OMSBuiltinKeys#REGION}, the region the resources reside in. *
  • {@link OMSBuiltinKeys#ACCOUNT_ID}, the ID of the specific account system that owns the resource. *
* * @return the attributes */ KeyValue attributes(); /** * Creates a new {@code Producer} for the specified {@code MessagingAccessPoint}. * * @return the created {@code Producer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ Producer createProducer(); /** * Creates a new {@code Producer} for the specified {@code MessagingAccessPoint} * with some preset attributes. * * @param attributes the preset attributes * @return the created {@code Producer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ Producer createProducer(KeyValue attributes); /** * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. * The returned {@code PushConsumer} isn't attached to any queue, * uses {@link PushConsumer#attachQueue(String, MessageListener)} to attach queues. * * @return the created {@code PushConsumer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ PushConsumer createPushConsumer(); /** * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes. * * @param attributes the preset attributes * @return the created {@code PushConsumer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ PushConsumer createPushConsumer(KeyValue attributes); /** * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}. * * @return the created {@code PullConsumer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ PullConsumer createPullConsumer(); /** * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes. * * @param attributes the preset attributes * @return the created {@code PullConsumer} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ PullConsumer createPullConsumer(KeyValue attributes); /** * Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint}. * * @return the created {@code Stream} * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ StreamingConsumer createStreamingConsumer(); /** * Creates a new {@code StreamingConsumer} for the specified {@code MessagingAccessPoint} with some preset * attributes. * * @param attributes the preset attributes * @return the created consumer * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ StreamingConsumer createStreamingConsumer(KeyValue attributes); /** * Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}. * * @return the resource manger * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request * due to some internal error */ ResourceManager resourceManager();}
  • MessagingAccessPoint就类似OMS的工厂方法,聚合了创建各类对象的方法,比如createProducer,createPushConsumer,createPullConsumer,resourceManager

OMS

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/OMS.java

/** * The oms class provides some static methods to create a {@code MessagingAccessPoint} * from the specified OMS driver url and some useful util methods. * 

* The complete OMS driver URL syntax is: *

* {@literal oms:

://[account_id@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/
} *

* The first part of the URL specifies which OMS implementation is to be used, rocketmq is a * optional driver type. *

* The brackets indicate that the extra access points are optional, and a correct OMS driver url * needs at least one access point, which consists of hostname and port, like localhost:8081. * * @version OMS 1.0.0 * @see ResourceManager * @since OMS 1.0.0 */public final class OMS { /** * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url. * * @param url the specified OMS driver url * @return a {@code MessagingAccessPoint} instance * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url * some syntax error or internal error. */ public static MessagingAccessPoint getMessagingAccessPoint(String url) { return getMessagingAccessPoint(url, OMS.newKeyValue()); } /** * Returns a {@code MessagingAccessPoint} instance from the specified OMS driver url * with some preset attributes, which will be passed to MessagingAccessPoint's implementation * class as a unique constructor parameter. * * There are some standard attributes defined by OMS for this method, * the same as {@link MessagingAccessPoint#attributes()} * * @param url the specified OMS driver url * @return a {@code MessagingAccessPoint} instance * @throws OMSRuntimeException if the factory fails to create a {@code MessagingAccessPoint} due to some driver url * some syntax error or internal error. */ public static MessagingAccessPoint getMessagingAccessPoint(String url, KeyValue attributes) { return MessagingAccessPointAdapter.getMessagingAccessPoint(url, attributes); } /** * Returns a default and internal {@code KeyValue} implementation instance. * * @return a {@code KeyValue} instance */ public static KeyValue newKeyValue() { return new DefaultKeyValue(); } /** * The version format is X.Y.Z (Major.Minor.Patch), a pre-release version may be denoted by appending a hyphen and a * series of dot-separated identifiers immediately following the patch version, like X.Y.Z-alpha. * *

* OMS version follows semver scheme partially. * * @see http://semver.org */ public static String specVersion = "UnKnown"; static { InputStream stream = OMS.class.getClassLoader().getResourceAsStream("oms.spec.properties"); try { if (stream != null) { Properties properties = new Properties(); properties.load(stream); specVersion = String.valueOf(properties.get("version")); } } catch (IOException ignore) { } } private OMS() { }}

  • OMS可以理解为总入口,是MessagingAccessPoint的静态工厂方法
  • 它根据url来解析访问地址等信息,比如oms:rocketmq://alice@rocketmq.apache.org/us-east:default_space

ResourceManager

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/ResourceManager.java

/** * The {@code ResourceManager} is to provide a unified interface of resource management, * allowing developers to manage the namespace, queue and routing resources. * 

* Create, set, get and delete are the four basic operations of {@code ResourceManager}. *

* {@code ResourceManager} also supports dynamic fetch and update of resource attributes. *

* {@link MessagingAccessPoint#resourceManager()} ()} is the unique method to obtain a {@code ResourceManager} * instance. Changes made through this instance will immediately apply to the message-oriented middleware (MOM) behind * {@code MessagingAccessPoint}. * * @version OMS 1.0.0 * @since OMS 1.0.0 */public interface ResourceManager { /** * Creates a {@code Namespace} resource with some preset attributes. *

* A namespace wraps the OMS resources in an abstract concept that makes it appear to the users * within the namespace that they have their own isolated instance of the global OMS resources. * * @param nsName the name of the new namespace * @param attributes the preset attributes */ void createNamespace(String nsName, KeyValue attributes); /** * Sets the attributes of the specific namespace, the old attributes will be replaced * by the provided attributes, only the provided key will be updated. * * @param nsName the specific namespace * @param attributes the new attributes * @throws OMSResourceNotExistException if the specific namespace does not exist */ void setNamespaceAttributes(String nsName, KeyValue attributes) throws OMSResourceNotExistException; /** * Gets the attributes of the specific namespace. * * @param nsName the specific namespace * @return the attributes of namespace * @throws OMSResourceNotExistException if the specific namespace does not exist */ KeyValue getNamespaceAttributes(String nsName) throws OMSResourceNotExistException; /** * Deletes an existing namespace resource. * * @param nsName the namespace needs to be deleted * @throws OMSResourceNotExistException if the specified namespace does not exist */ void deleteNamespace(String nsName) throws OMSResourceNotExistException; /** * Gets the namespace list in the current {@code MessagingAccessPoint}. * * @return the list of all namespaces */ List

listNamespaces(); /** * Creates a {@code Queue} resource in the configured namespace with some preset attributes. *

* The standard OMS {@code Queue} schema must start with the {@code Namespace} prefix: *

* {@literal

://
} * * @param queueName the name of the new queue * @param attributes the preset attributes * @throws OMSResourceNotExistException if the configured namespace does not exist or specified queue name is * not available */ void createQueue(String queueName, KeyValue attributes) throws OMSResourceNotExistException; /** * Sets the attributes of the specified queue, the old attributes will be replaced * by the provided attributes, only the provided key will be updated. * * @param queueName the queue name * @param attributes the new attributes * @throws OMSResourceNotExistException if the specified queue or namespace does not exist */ void setQueueAttributes(String queueName, KeyValue attributes) throws OMSResourceNotExistException; /** * Gets the attributes of the specified queue. * * @param queueName the queue name * @return the attributes of namespace * @throws OMSResourceNotExistException if the specified queue or namespace does not exist */ KeyValue getQueueAttributes(String queueName) throws OMSResourceNotExistException; /** * Deletes an existing queue resource. * * @param queueName the queue needs to be deleted * @throws OMSResourceNotExistException if the specified queue or namespace does not exist */ void deleteQueue(String queueName) throws OMSResourceNotExistException; /** * Gets the queue list in the specific namespace. * * @param nsName the specific namespace * @return the list of all queues * @throws OMSResourceNotExistException if the specific namespace does not exist */ List
listQueues(String nsName) throws OMSResourceNotExistException; /** * Creates a {@code Routing} resource in the configured namespace with some preset attributes. *

* The standard OMS {@code Routing} schema must start with the {@code Namespace} prefix: *

* {@literal

://
} * * @param routingName the name of the new routing * @param attributes the preset attributes * @throws OMSResourceNotExistException if the configured namespace does not exist or specified routing name is not * available */ void createRouting(String routingName, KeyValue attributes) throws OMSResourceNotExistException; /** * Sets the attributes of the specified routing, the old attributes will be replaced * by the provided attributes, only the provided key will be updated. * * @param routingName the routing name * @param attributes the new attributes * @throws OMSResourceNotExistException if the specified routing or namespace does not exist */ void setRoutingAttributes(String routingName, KeyValue attributes) throws OMSResourceNotExistException; /** * Gets the attributes of the specified routing. * * @param routingName the routing name * @return the attributes of routing * @throws OMSResourceNotExistException if the specified routing or namespace does not exist */ KeyValue getRoutingAttributes(String routingName) throws OMSResourceNotExistException; /** * Deletes an existing routing resource. * * @param routingName the routing needs to be deleted * @throws OMSResourceNotExistException if the specified routing or namespace does not exist */ void deleteRouting(String routingName) throws OMSResourceNotExistException; /** * Gets the routing list in the specific namespace. * * @param nsName the specific namespace * @return the list of all routings * @throws OMSResourceNotExistException if the specific namespace does not exist */ List
listRoutings(String nsName) throws OMSResourceNotExistException; /** * Gets the stream list behind the specified queue. * * @param queueName the queue name * @return the list of all streams * @throws OMSResourceNotExistException if the specified queue or namespace does not exist */ List
listStreams(String queueName) throws OMSResourceNotExistException; /** * Updates some system headers of a message in the configured namespace. *

* Below system headers are allowed to be changed dynamically: *

    *
  • {@link Message.BuiltinKeys#START_TIME}
  • *
  • {@link Message.BuiltinKeys#STOP_TIME}
  • *
  • {@link Message.BuiltinKeys#TIMEOUT}
  • *
  • {@link Message.BuiltinKeys#PRIORITY}
  • *
  • {@link Message.BuiltinKeys#SCHEDULE_EXPRESSION}
  • *
* * @param queueName the specific queue the message resides in * @param messageId the id of message * @param headers the new headers * @throws OMSResourceNotExistException if the specified queue, namespace or message does not exist */ void updateMessage(String queueName, String messageId, KeyValue headers) throws OMSResourceNotExistException;}
  • 提供了namespace、queue、routing等的操作方法
  • namespace相关的操作有createNamespace、setNamespaceAttributes、deleteNamespace、listNamespaces、getNamespaceAttributes
  • queue相关的操作有createQueue、setQueueAttributes、getQueueAttributes、deleteQueue、listQueues
  • routing相关的操作有createRouting、setRoutingAttributes、getRoutingAttributes、deleteRouting、listRoutings
  • 通过也提供了listStreams以及updateMessage方法

小结

openmessaging的MessagingAccessPoint聚合了相关访问操作入口,可以理解为一个facade,另外OMS是MessagingAccessPoint的静态工厂,而ResourceManager则类似kafkaAdmin,用于操作管理namespace、queue、routing等。

doc

转载地址:http://pzsvx.baihongyu.com/

你可能感兴趣的文章
行列式的乘法定理
查看>>
Java_Hbase Timeout issue
查看>>
有1000瓶水,3个瓶子可以再换1瓶,一共可以喝多少瓶?
查看>>
Search in Rotated Sorted Array ||
查看>>
matlab函数
查看>>
用群晖ds218play下载人人影视中的视频
查看>>
div中嵌套div速度将会同样很慢
查看>>
公选网站作业4_2.php
查看>>
NUC_HomeWork1 -- POJ2067(最短路)
查看>>
JavaWeb第六周作业
查看>>
『2019/4/8 TGDay1模拟赛 反思与总结』
查看>>
动态生成多选框
查看>>
Codefoeces dp练习题
查看>>
查看hive参数
查看>>
玩转Java对象和XML相互转换
查看>>
解析Hash表算法
查看>>
Linux学习笔记二:tar命令使用
查看>>
杭电2187--悼念512汶川大地震遇难同胞——老人是真饿了
查看>>
element-ui 日期选择范围限制,只允许选择上下浮动一个月内的日期
查看>>
CSS 盒子模型
查看>>