Fedora Message Aggregator/Router

These routes listen to the central ActiveMQ messaging queue, aggregate messages of a common PID and determine whether this is an update or delete operation. As long as the methodName header isn't purgeObject, the object FoXML is retrieved and then sent on to all of the downstream services. Because I connect to ActiveMQ over SSL, the <sslContextParameters/> node must be included.

<camel:sslContextParameters id="sslContextParameters">
  <camel:keyManagers keyPassword="truststore">
    <camel:keyStore
          resource="/path/to/truststore"
          password="..."/>
  </camel:keyManagers>
  <camel:trustManagers>
    <camel:keyStore
          resource="/path/to/truststore"
          password="..."/>
  </camel:trustManagers>
</camel:sslContextParameters>

One may also choose different parameters for aggregation, but a 10 second delay seems to work fine. Inter-context routes are connected with the seda component, which provides an asynchronous, in-memory queue between components.

<camelContext id="fedora-routes"
    xmlns="http://camel.apache.org/schema/spring"
    xmlns:foxml="info:fedora/fedora-system:def/foxml#">
  <route id="fedora-aggregator">
    <description>Fedora Message aggregator (10 sec inactivity timeout)</description>
    <from uri="activemq:queue:fedora.apim.update"/>
    <log message="Fedora Object: ${header.pid}, method: ${header.methodName}"/>
    <aggregate strategyRef="aggregatorStrategy" completionTimeout="10000">
      <correlationExpression>
        <simple>${header.pid}</simple>
      </correlationExpression>
      <to uri="seda:aggregated"/>
    </aggregate>
  </route>
 
  <route id="fedora-routing">
    <description>Fedora message routing</description>
    <from uri="seda:aggregated"/>
    <choice>
      <when>
        <simple>${header.methodName} == 'purgeObject'</simple>
        <to uri="seda:fedora.delete"/>
      </when>
      <otherwise>
        <setHeader headerName="Exchange.HTTP_METHOD">
          <constant>GET</constant>
        </setHeader>
        <setHeader headerName="Exchange.HTTP_PATH">
          <simple>/fedora/objects/${header.pid}/objectXML</simple>
        </setHeader>
        <log message="Getting foxml ${header.pid}"/>
        <to uri="http4://fedora-host:8080/?authUsername=...&amp;authPassword=..."/>
        <convertBodyTo type="org.w3c.dom.Document"/>
        <filter>
          <xpath>/foxml:digitalObject/foxml:datastream[@ID='MODS']</xpath>
          <to uri="seda:fedora.insert"/>
        </filter>
      </otherwise>
    </choice>
  </route>
 
  <route id="fedora-insert-multicaster">
    <description>Fedora Message insert multicaster</description>
    <from uri="seda:fedora.insert"/>
    <choice>
      <when>
        <xpath>/foxml:digitalObject/foxml:objectProperties/foxml:property[@NAME = 'info:fedora/fedora-system:def/model#state' and @VALUE = 'Active']</xpath>
        <multicast>
          <to uri="vm:solr.update"/>
          <to uri="vm:jena.update"/>
          <to uri="vm:oai.update"/>
        </multicast>
      </when>
      <otherwise>
        <to uri="seda:fedora.delete"/>
      </otherwise>
    </choice>
  </route>
 
  <route id="fedora-delete-multiplexer">
    <description>Fedora Message delete multiplexer</description>
    <from uri="seda:fedora.delete"/>
    <multicast>
      <to uri="vm:solr.delete"/>
      <to uri="vm:jena.delete"/>
      <to uri="vm:oai.delete"/>
    </multicast>
  </route>
</camelContext>
 
<bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
 
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
  <property name="brokerURL">
    <value>failover:(ssl://host1:port,ssl://host2:port)?randomize=true</value>
  </property>
  <property name="userName" value="..."/>
  <property name="password" value="..."/>
</bean>
fedora-routes.txt · Last modified: 2013/06/05 13:25 by acoburn@amherst.edu
 
Except where otherwise noted, content on this wiki is licensed under the following license: CC Attribution-Share Alike 4.0 International